基于Node.js实现的MQTT代理(服务端)模块mosca

 2016年01月10日    5798     声明


moscaMQTT官方推荐的MQTT代理(服务端)软件之一。mosca是JavaScript编写Node.js模块,即可以全局安装做为MQTT服务端使用,又可以将其集成到项目中,以实现更多的个性化功能。

  1. 做为独立MQTT服务端
  2. 集成到项目中


1. 做为独立MQTT服务端

mosca是一个Node.js模块,要在服务器单独使用,至少应安装 Node运行环境。

安装mosca

使用npm安装:

npm install mosca bunyan -g

也可以直接使用git安装:

git clone git://github.com/mcollina/mosca.git
cd mosca
npm install


使用

mosca单独使用时,可以像下面这样运行,并开始接受客户端连接:

mosca -v | bunyan


配置

执行mosca --help可以查看使用帮助:

mosca --help

  Usage: mosca [options] [command]

  Commands:

    adduser <user> <pass>  添加用户到认证文件
    rmuser <user>          从认证文件中移除用户
    start                 开始服务 (可选)

  Options:

    -h, --help                       显示使用信息 information
    -V, --version                    显示版本号
    -p, --port <n>             监听的端口号
    --host <IP>                      监听的主机各
    --parent-port <n>                要连接的父端口号
    --parent-host <s>                要连接的父主机名
    --parent-prefix <s>              在父代理中使用的前缀
    --credentials <file>             认证文件
    --authorize-publish <pattern>    指定发布主题时的认证方式(pattern)
    --authorize-subscribe <pattern>  指定用户订阅主题时的参数(pattern)
    --key <file>                     服务器的私钥(使用tls时)
    --cert <file>                    服务器的证书(使用tls时)
    --secure-port <n>               监听的TLS端口号
    --non-secure                      同时启动安全和非安装两种服务
    --http-port <n>                  在指定的端口启webSocket服务
    --https-port <n>                在指定的端口启安全的webSocket服务
    --http-static <directory>        为某些静态文件启动webSocket服务
    --https-static <directory>       为某些静态文件启动安全的webSocket服务
    --http-bundle                   为HTTP客户端绑定/mqtt.js客户端库
    --https-bundle                  为HTTPs客户端绑定/mqtt.js客户端库
    --only-http                      仅启动HTTPwebSocket服务
    --disable-stats                  禁$SYS的发布状态
    --broker-id <id>                 指定$SYS/<id>命名空间
    -c, --config <c>                 要使用的配置文件
    -d, --db <path>                  指定消息存储
    -v, --verbose                    设置 bunyan 日志到INFO输出
    --very-verbose                   设置 bunyan 日志到DEBUG输出 

充分利用mosca,需要为其指定一个配置文件。下面是一个基于Redis存储的配置文件:

var mosca = require('mosca');

module.exports = {
  port: 4883,
  // host: "127.0.0.1", //指定要绑定的主机名
  id: 'mymosca', //在发布时使用的主题命名空间ID $SYS/id 
  stats: true,  //在主题命名空间使用的状态 $SYS/id
  logger: {
    level: 'info'
  },
  backend: {
    type: 'redis',
    port: 6379,
    host: 'localhost',
    return_buffers: true
  },
  persistence: {
    factory: mosca.persistence.Redis,
    port: 6379,
    host: 'localhost'
  },
  secure: {
    keyPath: "/path/to/key",
    certPath: "/path/to/cert"
  }
};

mosca基于Ascoltatori模块开发,可以支持基于redis、MongoDB、AMQP、ZeroMQ和MQTT代理等方式的消息持久化。


认证

mosca支持将认证用户添加到指定的json文件中,可以命令行中使用如下方式创建文件:

// 添加用户
$ mosca adduser <user> <pass> --credentials ./credentials.json

// 添加用户到具体的一个主题
$ mosca adduser myuser mypass --credentials ./credentials.json \
  --authorize-publish 'hello/*' --authorize-subscribe 'hello/*'

// 移除用户
$ mosca rmuser myuser --credentials ./credentials.json

// 从认证文件启用Mosca认证
$ mosca --credentials ./credentials.json


2. 集成到项目中

mosca同样支持将其集成到现有的Node.js项目中,集成到现有项目后,需要编写实现MQTT服务端的代码。


安装设置

npm install mosca --save


获取模块引用、配置pub/sub设置

var mosca = require('mosca');

var pubsubsettings = {
  //using ascoltatore
  type: 'mongo',        
  url: 'mongodb://localhost:27017/mqtt',
  pubsubCollection: 'ascoltatori',
  mongo: {}
};


应用设置

var moscaSettings = {
  port: 1883,           //mqtt端口
  backend: pubsubsettings   //应用上在的pub/set
};

var server = new mosca.Server(moscaSettings);  // 启动mosca
server.on('ready', setup);  //初始化完成后启动setup()

// 准备完成后启动服务
function setup() {
  console.log('Mosca server is up and running')
}


向客户端发送数据

MQTT标准一致,发送数据使用publish方法。发送数据时可以指定:topic、payload、qos及标识retain标识

var message = {
  topic: '/hello/world',
  payload: 'abcde', // or a Buffer
  qos: 0, // 0, 1, or 2
  retain: false // or true
};

server.publish(message, function() {
  console.log('done!');
});


接收客户端数据

mosca继承了EventEmitter,其接收客户端数据是基于事件监听的。

// 消息发布后触发
server.on('published', function(packet, client) {
  console.log('Published', packet);
  console.log('Client', client);
});
// 客户端连接后触发
server.on('clientConnected', function(client) {
  console.log('Client Connected:', client.id);
});

// 客户端断开连接后触发
server.on('clientDisconnected', function(client) {
  console.log('Client Disconnected:', client.id);
});


完整代码

完整代码整理如下:

var mosca = require('mosca')

var ascoltatore = {
  //using ascoltatore
  type: 'mongo',        
  url: 'mongodb://localhost:27017/mqtt',
  pubsubCollection: 'ascoltatori',
  mongo: {}
};

var moscaSettings = {
  port: 1883,
  backend: ascoltatore,
  persistence: {
    factory: mosca.persistence.Mongo,
    url: 'mongodb://localhost:27017/mqtt'
  }
};

var server = new mosca.Server(moscaSettings);
server.on('ready', setup);

server.on('clientConnected', function(client) {
    console.log('client connected', client.id);     
});

// 收到消息后触发
server.on('published', function(packet, client) {
  console.log('Published', packet.payload);
});

// MQTT服务端准备完成后触发
function setup() {
  console.log('Mosca server is up and running')
}


下面是一个基于Redis的示例:

var mosca = require('mosca')

var ascoltatore = {
  type: 'redis',
  redis: require('redis'),
  db: 12,
  port: 6379,
  return_buffers: true, // 处理二进制的payload
  host: "localhost"
};

var moscaSettings = {
  port: 1883,
  backend: ascoltatore,
  persistence: {
    factory: mosca.persistence.Redis
  }
};

var server = new mosca.Server(moscaSettings);
server.on('ready', setup);

server.on('clientConnected', function(client) {
    console.log('client connected', client.id);     
});

// 收到消息时触发
server.on('published', function(packet, client) {
  console.log('Published', packet.payload);
});

// MQTT服务端准备完成后触发
function setup() {
  console.log('Mosca server is up and running')
}

注意,如果使用是非标准的Redis或Redis与MQTT不在同一台服务器,需要像下面这样设置persistence

persistence: {
  factory: mosca.persistence.Redis,
  host: 'your redis host',
  port: 'your redis port'
}


关于数据持久化的更多高级用法,请参考:Mosca advanced usage