说说Node.js的流模块Stream之Stream模块的可读流类:stream.Readable

 2015年11月17日    371     声明


可读流(Readable stream)接口是对你正在读取数据来源的抽象,换言之,数据出自一个 Readable流。在stream模块中,可读流有两种模式:流动模式(flowing mode)暂停模式(paused mode)。流动模式时,数据由底层系统读出,并尽可能快地提供给应用程序;当处于暂停模式时,必须显示地调用stream.read()方法来读取若干数据块。

  1. 可读流
  2. 可读流中的事件
  3. 可读流中的方法


1. 可读流

流默认处于暂停模式,在处理流对象时,如果没有绑定'data'事件处理器,且没有pipe()目标,同时流被切换到流动模式,这样会造成数据的丢失。

暂停流切换到流动流

处理暂时模式的流可以切换为流动模式的流,流从暂停模式切换到流动模式可以使用以下几种方式:

  • 通过添加'data' 事件监听器来启动数据监听。
  • 调用resume()方法启动数据流。
  • 调用pipe()方法将数据转接到另一个Writable对象。

流动流切换到暂停流

同样,处于流动模式的流可以切换为暂停模式的流,流从流动模式切换到暂停模式可以使用以下几种方式:

  • 在流没有pipe()转接时,调用pause()方法可以将流暂停。
  • 如果有有pipe()转接时,需要移除所有'data'事件的监听器,再调用unpipe()方法移除流的所有导向目标。


2. 可读流中的事件

可读流中包含以下事件:

事件:'readable'

当一个数据块可以从流中被读出时,会触发'readable'事件。某些情况下,如果没有准备好,监听'readable'事件将会导致一些数据从底层系统读出到内部缓存。

当内部缓冲区被排空后,当有更多数据时,'readable'事件会被再次触发。

var readable = getReadableStreamSomehow();
readable.on('readable', function() {
  // 现在有数据可以读取了
})


事件:'data'

当可读流中有数据读出时会触发'data'事件,绑定'data'事件监听器,监听器绑定后如果流是暂停模式将会被切换到流动模式。其监听函数callback(chunk)中有一个chunk参数,表示收到的数据块:

  • chunk {Buffer | String} 数据块

'data'事件监听器绑定后,数据会被尽可能地传递,如果你想从流尽快的取出所有数据,这是最理想的方式。

var readable = getReadableStreamSomehow();
readable.on('data', function(chunk) {
  console.log('收到了 %d 字节的数据', chunk.length);
})


事件:'end'

当可读流中数据读取完毕后会触发'data'事件。

需要注意的是,除非数据被消费完,否则'end'事件不会被触发

var readable = getReadableStreamSomehow();
readable.on('data', function(chunk) {
  console.log('收到了 %d 字节的数据', chunk.length);
})
readable.on('end', function() {
  console.log('读取完毕。');
});


事件:'close'

当底层数据源(如,源文件的文件描述符)被关闭时触发。请注意,并不是所有流都会触发这个事件。


事件:'error'

当接收数据时发生错误触发,其回调函数中有一个参数{Error Object},表示错误信息。


3. 可读流中的方法

拉取数据:readable.read()

readable.read([size])

read()方法会从内部缓冲区中拉取并返回若干数据。当没有更多可用数据时,会返回null。其参数及返回值如下:

  • size-{Number} 可选参数,指定需要读取的数据量
  • 返回:{String | Buffer | null}

使用read()方法读取数据时,如果传入了size参数,那么它会返回指定字节的数据;当指定的size字节不可用时,则返回null。如果没有指定size参数,那么会返回内部缓冲区中的所有数据

该方法仅应在暂停模式时被调用。在流动模式中,该方法会被自动调用直到内部缓冲区被清空。

当手动调用该方法返回一个数据块时,'data'同时也会被触发。

var readable = getReadableStreamSomehow();
readable.on('readable', function() {
  var chunk;
  while (null !== (chunk = readable.read())) {
    console.log('收到了 %d 字节的数据', chunk.length);
  }
});


指定数据编码:readable.setEncoding()

readable.setEncoding(encoding)
  • encoding {String} 要使用的编码
  • 返回:this

setEncoding()方法用于设置读取数据的编码方式,设置后会以指定编码的字符串返回数据,若不指定将返回Buffer

该方法有一个好处,就是能够正确处理多字节字符。如果不这么做,而是取出Buffer并调用buf.toString(encoding)方法转换为字符串,那么很可能会出现字节错位。

var readable = getReadableStreamSomehow();
readable.setEncoding('utf8');
readable.on('data', function(chunk) {
  assert.equal(typeof chunk, 'string');
  console.log('收到了 %d 个字符的字符串数据', chunk.length);
})


切换为流动流:readable.resume()

readable.resume()
  • 返回:this

resume()方法会将流从暂停模式切换到流动模式,并让可读流继续触发'data'事件。如果你不想从可读流中消费数据,也可以调用resume()方法触发一个'end'事件。

var readable = getReadableStreamSomehow();
readable.resume();
readable.on('end', function(chunk) {
  console.log('到达末端,但并未读取任何东西');
})


切换为暂停流:readable.pause()

readable.pause()
  • 返回:this

pause()方法会将流从流动模式切换到暂停模式,并让可读流停止触发'data'事件,让后续可用数据留在内部缓冲区中。

var readable = getReadableStreamSomehow();
readable.on('data', function(chunk) {
  console.log('得到 %d 字节数据', chunk.length);
  readable.pause();
  console.log('接下来 1 秒内不会有数据');
  setTimeout(function() {
    console.log('现在数据会再次开始流动');
    readable.resume();
  }, 1000);
})


是否是暂停流:readable.isPaused()

readable.isPaused()
  • 返回: Boolean

该方法用于检测可读流是否被明确暂停过,即:客户端代码是否调用过readable.pause()方法。

var readable = new stream.Readable

readable.isPaused() // === false
readable.pause()
readable.isPaused() // === true
readable.resume()
readable.isPaused() // === false


切换为流动流:readable.pipe()

readable.pipe(destination, [options])
  • destination {Writable Stream} 写入数据的目标
  • options {Object} 导流选项
    • end {Boolean} 读取到结束符时,结束写入。默认为true

pause()方法可以从可读流(Readable stream)中拉取所有数据,并将数据写入到所提供的写入目标中,写入目标是一个可写流(Writable stream)。该方法能自动控制流量,以避免目标被快速读取的可读流所淹没。

pause()方法可以导流到多个目标。

var readable = getReadableStreamSomehow();
var writable = fs.createWriteStream('file.txt');
// 所有 readable 的数据会被写入到 'file.txt'
readable.pipe(writable);

pause()方法导流后返回目标流, 因此你可以建立导流链:

var r = fs.createReadStream('file.txt');
var z = zlib.createGzip();
var w = fs.createWriteStream('file.txt.gz');
r.pipe(z).pipe(w);

模拟 Unix 的cat命令

process.stdin.pipe(process.stdout);


默认情况下,当来源流触发'end'事件时,目标流(Writable stream)的end()会被调用,所以这时 destination将不再可写。将options设置为{ end: false },可以让目标流保持开启状态。

reader.pipe(writer, { end: false });
reader.on('end', function() {
  writer.end('Goodbye\n');
});


解除流导向:readable.unpipe()

readable.unpipe([destination])
  • destination {Writable Stream}可选,解指定的导向流

该方法会移除之前调用pipe()所设定的导向,如果没有指定destination,那么所有的导向pipe都会被移除。如果指定了目标,但并没有与之建立导流,那么什么事情都不会发生。

var readable = getReadableStreamSomehow();
var writable = fs.createWriteStream('file.txt');
// 来自 readable 的所有数据都会被写入 'file.txt',
// 但仅发生在第 1 秒
readable.pipe(writable);
setTimeout(function() {
  console.log('停止写入到 file.txt');
  readable.unpipe(writable);
  console.log('自行关闭文件流');
  writable.end();
}, 1000);


数据块插回来源流:readable.unshift()

readable.unshift(chunk)
  • chunk {Buffer | String} 要插回读取队列开头的数据块

该方法可以将读取的数据插回来源流。 该方法在许多场景中都很有用,如:一个流正在被一个解析器消费,解析器可能需要将某些刚拉取数据“逆消费”回来源流中,以便流能将它传递给其它消费者。但是,如果程序中频繁调用stream.unshift(chunk),就应该考虑实现一个Transform流。

// 取出以 \n\n 分割的头部并将多余部分 unshift() 回去
// callback 以 (error, header, stream) 形式调用
var StringDecoder = require('string_decoder').StringDecoder;
function parseHeader(stream, callback) {
  stream.on('error', callback);
  stream.on('readable', onReadable);
  var decoder = new StringDecoder('utf8');
  var header = '';
  function onReadable() {
    var chunk;
    while (null !== (chunk = stream.read())) {
      var str = decoder.write(chunk);
      if (str.match(/\n\n/)) {
        // 找到头部边界
        var split = str.split(/\n\n/);
        header += split.shift();
        var remaining = split.join('\n\n');
        var buf = new Buffer(remaining, 'utf8');
        if (buf.length)
          stream.unshift(buf);
        stream.removeListener('error', callback);
        stream.removeListener('readable', onReadable);
        // 现在可以从流中读取消息的主体了
        callback(null, header, stream);
      } else {
        // 仍在读取头部
        header += str;
      }
    }
  }
}