mosca
是MQTT
官方推荐的MQTT
代理(服务端)软件之一。mosca
是JavaScript编写Node.js模块,即可以全局安装做为MQTT
服务端使用,又可以将其集成到项目中,以实现更多的个性化功能。
1. 做为独立MQTT
服务端
mosca
是一个Node.js模块,要在服务器单独使用,至少应安装 Node运行环境。
安装mosca
使用npm安装:
npm install mosca bunyan -g
也可以直接使用git安装:
git clone git://github.com/mcollina/mosca.git cd mosca npm install
使用
mosca
单独使用时,可以像下面这样运行,并开始接受客户端连接:
mosca -v | bunyan
配置
执行mosca --help
可以查看使用帮助:
mosca --help Usage: mosca [options] [command] Commands: adduser <user> <pass> 添加用户到认证文件 rmuser <user> 从认证文件中移除用户 start 开始服务 (可选) Options: -h, --help 显示使用信息 information -V, --version 显示版本号 -p, --port <n> 监听的端口号 --host <IP> 监听的主机各 --parent-port <n> 要连接的父端口号 --parent-host <s> 要连接的父主机名 --parent-prefix <s> 在父代理中使用的前缀 --credentials <file> 认证文件 --authorize-publish <pattern> 指定发布主题时的认证方式(pattern) --authorize-subscribe <pattern> 指定用户订阅主题时的参数(pattern) --key <file> 服务器的私钥(使用tls时) --cert <file> 服务器的证书(使用tls时) --secure-port <n> 监听的TLS端口号 --non-secure 同时启动安全和非安装两种服务 --http-port <n> 在指定的端口启webSocket服务 --https-port <n> 在指定的端口启安全的webSocket服务 --http-static <directory> 为某些静态文件启动webSocket服务 --https-static <directory> 为某些静态文件启动安全的webSocket服务 --http-bundle 为HTTP客户端绑定/mqtt.js客户端库 --https-bundle 为HTTPs客户端绑定/mqtt.js客户端库 --only-http 仅启动HTTPwebSocket服务 --disable-stats 禁$SYS的发布状态 --broker-id <id> 指定$SYS/<id>命名空间 -c, --config <c> 要使用的配置文件 -d, --db <path> 指定消息存储 -v, --verbose 设置 bunyan 日志到INFO输出 --very-verbose 设置 bunyan 日志到DEBUG输出
充分利用mosca
,需要为其指定一个配置文件。下面是一个基于Redis存储的配置文件:
var mosca = require('mosca'); module.exports = { port: 4883, // host: "127.0.0.1", //指定要绑定的主机名 id: 'mymosca', //在发布时使用的主题命名空间ID $SYS/id stats: true, //在主题命名空间使用的状态 $SYS/id logger: { level: 'info' }, backend: { type: 'redis', port: 6379, host: 'localhost', return_buffers: true }, persistence: { factory: mosca.persistence.Redis, port: 6379, host: 'localhost' }, secure: { keyPath: "/path/to/key", certPath: "/path/to/cert" } };
mosca
基于Ascoltatori
模块开发,可以支持基于redis、MongoDB、AMQP、ZeroMQ和MQTT代理等方式的消息持久化。
认证
mosca
支持将认证用户添加到指定的json文件中,可以命令行中使用如下方式创建文件:
// 添加用户 $ mosca adduser <user> <pass> --credentials ./credentials.json // 添加用户到具体的一个主题 $ mosca adduser myuser mypass --credentials ./credentials.json \ --authorize-publish 'hello/*' --authorize-subscribe 'hello/*' // 移除用户 $ mosca rmuser myuser --credentials ./credentials.json // 从认证文件启用Mosca认证 $ mosca --credentials ./credentials.json
2. 集成到项目中
mosca
同样支持将其集成到现有的Node.js项目中,集成到现有项目后,需要编写实现MQTT服务端的代码。
安装设置
npm install mosca --save
获取模块引用、配置pub/sub设置
var mosca = require('mosca'); var pubsubsettings = { //using ascoltatore type: 'mongo', url: 'mongodb://localhost:27017/mqtt', pubsubCollection: 'ascoltatori', mongo: {} };
应用设置
var moscaSettings = { port: 1883, //mqtt端口 backend: pubsubsettings //应用上在的pub/set }; var server = new mosca.Server(moscaSettings); // 启动mosca server.on('ready', setup); //初始化完成后启动setup() // 准备完成后启动服务 function setup() { console.log('Mosca server is up and running') }
向客户端发送数据
与MQTT标准一致,发送数据使用publish
方法。发送数据时可以指定:topic、payload、qos及标识retain标识
var message = { topic: '/hello/world', payload: 'abcde', // or a Buffer qos: 0, // 0, 1, or 2 retain: false // or true }; server.publish(message, function() { console.log('done!'); });
接收客户端数据
mosca
继承了EventEmitter,其接收客户端数据是基于事件监听的。
// 消息发布后触发 server.on('published', function(packet, client) { console.log('Published', packet); console.log('Client', client); }); // 客户端连接后触发 server.on('clientConnected', function(client) { console.log('Client Connected:', client.id); }); // 客户端断开连接后触发 server.on('clientDisconnected', function(client) { console.log('Client Disconnected:', client.id); });
完整代码
完整代码整理如下:
var mosca = require('mosca') var ascoltatore = { //using ascoltatore type: 'mongo', url: 'mongodb://localhost:27017/mqtt', pubsubCollection: 'ascoltatori', mongo: {} }; var moscaSettings = { port: 1883, backend: ascoltatore, persistence: { factory: mosca.persistence.Mongo, url: 'mongodb://localhost:27017/mqtt' } }; var server = new mosca.Server(moscaSettings); server.on('ready', setup); server.on('clientConnected', function(client) { console.log('client connected', client.id); }); // 收到消息后触发 server.on('published', function(packet, client) { console.log('Published', packet.payload); }); // MQTT服务端准备完成后触发 function setup() { console.log('Mosca server is up and running') }
下面是一个基于Redis的示例:
var mosca = require('mosca') var ascoltatore = { type: 'redis', redis: require('redis'), db: 12, port: 6379, return_buffers: true, // 处理二进制的payload host: "localhost" }; var moscaSettings = { port: 1883, backend: ascoltatore, persistence: { factory: mosca.persistence.Redis } }; var server = new mosca.Server(moscaSettings); server.on('ready', setup); server.on('clientConnected', function(client) { console.log('client connected', client.id); }); // 收到消息时触发 server.on('published', function(packet, client) { console.log('Published', packet.payload); }); // MQTT服务端准备完成后触发 function setup() { console.log('Mosca server is up and running') }
注意,如果使用是非标准的Redis或Redis与MQTT不在同一台服务器,需要像下面这样设置persistence
:
persistence: { factory: mosca.persistence.Redis, host: 'your redis host', port: 'your redis port' }
关于数据持久化的更多高级用法,请参考:Mosca advanced usage