MQTT.js
是JavaScript编写的,实现了MQTT
协议客户端功能的模块,可以在Node.js或浏览器环境中使用。在Node.js中使用时,即可以-g
全局安装以命令行的形式使用,又可以将其集成到项目中调用。
1. 安装与使用
1.1 在项目中安装
MQTT.js
npm包名为mqtt
,安装命令如下:
npm install mqtt --save
使用示例
使用MQTT.js
实现一个消息发布者及订阅者:
var mqtt = require('mqtt'); var client = mqtt.connect('mqtt://test.mosquitto.org'); client.on('connect', function () { //订阅presence主题 client.subscribe('presence'); //向presence主题发布消息 client.publish('presence', 'Hello mqtt'); }); client.on('message', function (topic, message) { //收到的消息是一个Buffer console.log(message.toString()); client.end(); });
输出:
Hello mqtt
在上例中test.mosquitto.org
是官方提供的MQTT
测试服务器,还可以使用test.mosca.io
网址进行测试。
了解更多关于MQTT
相关介绍请参考:MQTT协议-MQTT协议简介及协议原理
1.2 做为命令行工具使用
MQTT.js
支持以命令行模式与服务端通信,以命令行模式使用时,模块需要全局安装:
npm install mqtt -g
安装后,就可以在控制台使用:
mqtt sub -t 'hello' -h 'test.mosquitto.org' -v
下面是另一种使用方式:
mqtt pub -t 'hello' -h 'test.mosquitto.org' -m 'from MQTT.js'
更多命令行使用方法可以执行mqtt help
命令查看。
1.3 在浏览器中使用
在浏览器环境中什么MQTT.js
首先需要按AMD
或CommandJs规范导出。
Browserify导出
Browserify
模块可以将MQTT.js
绑定为浏览器可用的,也可以导出为一个单独的浏览器端模块。其导出的模块兼容AMD
或CMD
标准,且会在添加对象到全局空间。
npm install -g browserify // 安装 browserify cd node_modules/mqtt npm install . // 安装 dev dependencies browserify mqtt.js -s mqtt > browserMqtt.js // 需要在客户端使用的mqtt
Webpack导出
与Browserify
类似,该模块也会导出一个浏览器端使用的类库,且会在添加对象到全局空间,导出后可以像var mqtt = xxx
这样使用MQTT.js
。导出时可以设置AMD
、CMD
或其它导出格式。
npm install -g webpack // install webpack cd node_modules/mqtt npm install . // install dev dependencies webpack mqtt.js ./browserMqtt.js --output-library mqtt
完成导出后,就可以像在Node中那样使用mqtt.js的API:
<html> <head> <title>test Ws mqtt.js</title> </head> <body> <script src="./browserMqtt.js"></script> <script> var client = mqtt.connect(); // you add a ws:// url here client.subscribe("mqtt/demo"); client.on("message", function(topic, payload) { alert([topic, payload].join(": ")); client.end(); }); client.publish("mqtt/demo", "hello world!"); </script> </body> </html>
2. MQTT.js
相关API
-
mqtt.connect()
-
mqtt.Client()
-
mqtt.Client#publish()
-
mqtt.Client#subscribe()
-
mqtt.Client#unsubscribe()
-
mqtt.Client#end()
-
mqtt.Client#handleMessage()
-
mqtt.Store()
-
mqtt.Store#put()
-
mqtt.Store#del()
-
mqtt.Store#createStream()
-
mqtt.Store#close()
mqtt.connect([url], options)
通过指定的url和options连接到mqtt代理,并返回一个Client对象
URL可以支持协议有:'mqtt'、'mqtts'、'tcp'、'tls'、'ws'、'wss'。也可以是一个由Url.parse()返回的对象,这时可以将url和options合并为一个对象。
mqtt.Client(streamBuilder, options)
Client
类包装了一个到代理(服务器)的客户端连接,可以支持任何形式的传输协议(TCP、TLS、WebSocket等)
Client
类会自动做以下处理:
- 到服务器的定时ping
- QoS流
- 自动重连
- 在连接前开始消息发布
参数说明:
-
streamBuilder
:返回支持连接事件的流
类的子类函数。一个典型的net.socket
。 -
options
客户端连接选项。默认值: -
keepalive
:默认10秒,设为0时将禁用 -
clientId
:'mqttjs_' + Math.random().toString(16).substr(2, 8)
-
protocolId
:'MQTT'
-
protocolVersion
:4 -
clean
:true
。设置为false时,收到QoS为1和2的消息时,将下线 -
reconnectPeriod
:1000
毫秒,表示两次重连间隔 -
connectTimeout
:30*1000
毫秒,表示等待收到CONNACK确认包的时间 -
username
:代理(服务器)所需要的用户名,仅需要时 -
password
:代理(服务器)所需要的密码,仅需要时 -
incomingStore
:将要到达包的Store -
outgoingStore
:将要发出包的Store -
will
:客户端连接失效时自动发送给服务器的消息。格式如下: -
topic
:发布消息的主题 -
payload
:发布的消息内容 -
qos
:QoS -
retain
:retain标识
如果要使用mqtts
(使用tls的mqtt),相关设置通过options对象传给 tls.connect().。如果使用自签名证书,需要将rejectUnauthorized
设置为false
如果所要连接的服务器只支持MQTT 3.1(非V3.1.1),需要如下设置:
{ protocolId: 'MQIsdp', protocolVersion: 3 }
事件:'connect'
function(connack) {}
连接成功时会发送此事件
-
connack
收到的connack包,当clean
选项为false
且服务器有一个之前的clientId
连接选项,这时connack.sessionPresent
选项为true
事件:'reconnect'
function() {}
重新连接时触发
事件:'close'
function() {}
断开连接后触发
事件:'offline'
function() {}
客户端下线时触发
事件:'error'
function(error) {}
连接失败或发生错误时触发
事件:'message'
function(topic, message, packet) {}
客户端收到消息时触发
-
topic
:String
收到消息的主题 -
message
:String
或Buffer
消息的内容 -
packet
:收到的包
mqtt.Client#publish(topic, message, [options], [callback])
发布消息
-
topic
:String
发布消息的主题 -
message
:String
或Buffer
发布消息的内容 -
options
:消息选项,包括:-
qos
:QoS
级别,默认0 -
retain
:retain
标识,默认false
-
-
callback
:回调会在QoS处理完成或在下次处理前发生(QoS为0时)
mqtt.Client#subscribe(topic/topic array/topic object, [options], [callback])
订阅一个或一组主题(Topic)
-
topic
:String
或Array
表示要订阅的主题 -
options
:订阅主题的选项:-
qos
:QoS
级别,默认0
-
-
callback
:function(err, granted)
回调函数,suback时触发-
err
:订阅时发生的错误 -
granted
:订阅成功时触发[topic, qos]
-
mqtt.Client#unsubscribe(topic/topic array, [options], [callback])
退订一个或一组主题(Topic)
-
topic
:String
要取消订阅的主题 -
callback
:回调函数,退订成功后触发
mqtt.Client#end([force], [cb])
关闭客户端连接,选项如下:
-
force
:可选,强制关闭而无需等待传送中的消息完成 -
cb
:回调函数,连接关闭后触发,可选
mqtt.Client#handleMessage(packet, callback)
关闭客户端连接,选项如下:
mqtt.Store()
实现消息存储
mqtt.Store#put(packet, callback)
添加一个消息到消息存储中,数据包表示有messageId
属性的消息。回调函数在存储完成后触发。
mqtt.Store#del(packet, cb)
从消息存储中移除一个数据包。回调函数在移除完成后触发。
mqtt.Store#createStream()
在所有包存储完成后创建一个流
mqtt.Store#close(cb)
关闭消息存储