Node.js实现的MQTT客户端模块mqtt.js

 2016年01月10日    12881     声明


MQTT.js是JavaScript编写的,实现了MQTT协议客户端功能的模块,可以在Node.js或浏览器环境中使用。在Node.js中使用时,即可以-g全局安装以命令行的形式使用,又可以将其集成到项目中调用。


1. 安装与使用

1.1 在项目中安装

MQTT.jsnpm包名为mqtt,安装命令如下:

npm install mqtt --save

使用示例

使用MQTT.js实现一个消息发布者及订阅者:

var mqtt    = require('mqtt');
var client  = mqtt.connect('mqtt://test.mosquitto.org');
 
client.on('connect', function () {
  //订阅presence主题
  client.subscribe('presence');
  //向presence主题发布消息
  client.publish('presence', 'Hello mqtt');
});
 
client.on('message', function (topic, message) {
  //收到的消息是一个Buffer
  console.log(message.toString());
  client.end();
});

输出:

Hello mqtt

在上例中test.mosquitto.org是官方提供的MQTT测试服务器,还可以使用test.mosca.io网址进行测试。

了解更多关于MQTT相关介绍请参考:MQTT协议-MQTT协议简介及协议原理


1.2 做为命令行工具使用

MQTT.js支持以命令行模式与服务端通信,以命令行模式使用时,模块需要全局安装:

npm install mqtt -g

安装后,就可以在控制台使用:

mqtt sub -t 'hello' -h 'test.mosquitto.org' -v

下面是另一种使用方式:

mqtt pub -t 'hello' -h 'test.mosquitto.org' -m 'from MQTT.js'

更多命令行使用方法可以执行mqtt help 命令查看。


1.3 在浏览器中使用

在浏览器环境中什么MQTT.js首先需要按AMDCommandJs规范导出。

Browserify导出

Browserify模块可以将MQTT.js绑定为浏览器可用的,也可以导出为一个单独的浏览器端模块。其导出的模块兼容AMDCMD标准,且会在添加对象到全局空间。

npm install -g browserify // 安装 browserify 
cd node_modules/mqtt
npm install . // 安装 dev dependencies 
browserify mqtt.js -s mqtt > browserMqtt.js // 需要在客户端使用的mqtt


Webpack导出

Browserify类似,该模块也会导出一个浏览器端使用的类库,且会在添加对象到全局空间,导出后可以像var mqtt = xxx这样使用MQTT.js。导出时可以设置AMDCMD或其它导出格式。

npm install -g webpack // install webpack 
 
cd node_modules/mqtt
npm install . // install dev dependencies 
webpack mqtt.js ./browserMqtt.js --output-library mqtt

完成导出后,就可以像在Node中那样使用mqtt.js的API:

<html>
<head>
  <title>test Ws mqtt.js</title>
</head>
<body>
<script src="./browserMqtt.js"></script>
<script>
  var client = mqtt.connect(); // you add a ws:// url here
  client.subscribe("mqtt/demo");

  client.on("message", function(topic, payload) {
    alert([topic, payload].join(": "));
    client.end();
  });

  client.publish("mqtt/demo", "hello world!");
</script>
</body>
</html>


2. MQTT.js相关API


mqtt.connect([url], options)

通过指定的url和options连接到mqtt代理,并返回一个Client对象

URL可以支持协议有:'mqtt'、'mqtts'、'tcp'、'tls'、'ws'、'wss'。也可以是一个由Url.parse()返回的对象,这时可以将url和options合并为一个对象。


mqtt.Client(streamBuilder, options)

Client类包装了一个到代理(服务器)的客户端连接,可以支持任何形式的传输协议(TCP、TLS、WebSocket等)

Client类会自动做以下处理:

  • 到服务器的定时ping
  • QoS流
  • 自动重连
  • 在连接前开始消息发布

参数说明:

  • streamBuilder:返回支持连接事件的类的子类函数。一个典型的net.socket
  • options客户端连接选项。默认值:
    • keepalive:默认10秒,设为0时将禁用
    • clientId'mqttjs_' + Math.random().toString(16).substr(2, 8)
    • protocolId'MQTT'
    • protocolVersion:4
    • cleantrue。设置为false时,收到QoS为1和2的消息时,将下线
    • reconnectPeriod1000毫秒,表示两次重连间隔
    • connectTimeout30*1000毫秒,表示等待收到CONNACK确认包的时间
    • username:代理(服务器)所需要的用户名,仅需要时
    • password:代理(服务器)所需要的密码,仅需要时
    • incomingStore:将要到达包的Store
    • outgoingStore:将要发出包的Store
    • will:客户端连接失效时自动发送给服务器的消息。格式如下:
      • topic:发布消息的主题
      • payload:发布的消息内容
      • qos:QoS
      • retain:retain标识

如果要使用mqtts(使用tls的mqtt),相关设置通过options对象传给 tls.connect().。如果使用自签名证书,需要将rejectUnauthorized设置为false

如果所要连接的服务器只支持MQTT 3.1(非V3.1.1),需要如下设置:

{
  protocolId: 'MQIsdp',
  protocolVersion: 3
}


事件:'connect'

function(connack) {}

连接成功时会发送此事件

  • connack收到的connack包,当clean选项为false且服务器有一个之前的clientId连接选项,这时connack.sessionPresent选项为true


事件:'reconnect'

function() {}

重新连接时触发


事件:'close'

function() {}

断开连接后触发


事件:'offline'

function() {}

客户端下线时触发


事件:'error'

function(error) {}

连接失败或发生错误时触发


事件:'message'

function(topic, message, packet) {}

客户端收到消息时触发

  • topicString收到消息的主题
  • messageStringBuffer消息的内容
  • packet:收到的包


mqtt.Client#publish(topic, message, [options], [callback])

发布消息

  • topicString发布消息的主题
  • messageStringBuffer发布消息的内容
  • options:消息选项,包括:
    • qosQoS级别,默认0
    • retainretain标识,默认false
  • callback:回调会在QoS处理完成或在下次处理前发生(QoS为0时)


mqtt.Client#subscribe(topic/topic array/topic object, [options], [callback])

订阅一个或一组主题(Topic)

  • topicStringArray表示要订阅的主题
  • options:订阅主题的选项:
    • qosQoS级别,默认0
  • callbackfunction(err, granted)回调函数,suback时触发
    • err:订阅时发生的错误
    • granted:订阅成功时触发[topic, qos]


mqtt.Client#unsubscribe(topic/topic array, [options], [callback])

退订一个或一组主题(Topic)

  • topicString要取消订阅的主题
  • callback:回调函数,退订成功后触发


mqtt.Client#end([force], [cb])

关闭客户端连接,选项如下:

  • force:可选,强制关闭而无需等待传送中的消息完成
  • cb:回调函数,连接关闭后触发,可选


mqtt.Client#handleMessage(packet, callback)

关闭客户端连接,选项如下:


mqtt.Store()

实现消息存储


mqtt.Store#put(packet, callback)

添加一个消息到消息存储中,数据包表示有messageId属性的消息。回调函数在存储完成后触发。


mqtt.Store#del(packet, cb)

从消息存储中移除一个数据包。回调函数在移除完成后触发。


mqtt.Store#createStream()

在所有包存储完成后创建一个流


mqtt.Store#close(cb)

关闭消息存储