可读流(Readable stream)
接口是对你正在读取数据来源的抽象,换言之,数据出自一个 Readable流
。在stream
模块中,可读流
有两种模式:流动模式(flowing mode) 和 暂停模式(paused mode)。流动模式时,数据由底层系统读出,并尽可能快地提供给应用程序;当处于暂停模式时,必须显示地调用stream.read()
方法来读取若干数据块。
1. 可读流
流默认处于暂停模式,在处理流对象时,如果没有绑定'data'
事件处理器,且没有pipe()
目标,同时流被切换到流动模式,这样会造成数据的丢失。
暂停流切换到流动流
处理暂时模式的流可以切换为流动模式的流,流从暂停模式切换到流动模式可以使用以下几种方式:
-
通过添加
'data'
事件监听器来启动数据监听。 -
调用
resume()
方法启动数据流。 -
调用
pipe()
方法将数据转接到另一个Writable
对象。
流动流切换到暂停流
同样,处于流动模式的流可以切换为暂停模式的流,流从流动模式切换到暂停模式可以使用以下几种方式:
-
在流没有
pipe()
转接时,调用pause()
方法可以将流暂停。 -
如果有有
pipe()
转接时,需要移除所有'data'
事件的监听器,再调用unpipe()
方法移除流的所有导向目标。
2. 可读流中的事件
可读流中包含以下事件:
事件:'readable'
当一个数据块可以从流中被读出时,会触发'readable'
事件。某些情况下,如果没有准备好,监听'readable'
事件将会导致一些数据从底层系统读出到内部缓存。
当内部缓冲区被排空后,当有更多数据时,'readable'
事件会被再次触发。
var readable = getReadableStreamSomehow(); readable.on('readable', function() { // 现在有数据可以读取了 })
事件:'data'
当可读流中有数据读出时会触发'data'
事件,绑定'data'
事件监听器,监听器绑定后如果流是暂停模式将会被切换到流动模式。其监听函数callback(chunk)
中有一个chunk
参数,表示收到的数据块:
-
chunk
{Buffer | String} 数据块
'data'
事件监听器绑定后,数据会被尽可能地传递,如果你想从流尽快的取出所有数据,这是最理想的方式。
var readable = getReadableStreamSomehow(); readable.on('data', function(chunk) { console.log('收到了 %d 字节的数据', chunk.length); })
事件:'end'
当可读流中数据读取完毕后会触发'data'
事件。
需要注意的是,除非数据被消费完,否则'end'
事件不会被触发。
var readable = getReadableStreamSomehow(); readable.on('data', function(chunk) { console.log('收到了 %d 字节的数据', chunk.length); }) readable.on('end', function() { console.log('读取完毕。'); });
事件:'close'
当底层数据源(如,源文件的文件描述符)被关闭时触发。请注意,并不是所有流都会触发这个事件。
事件:'error'
当接收数据时发生错误触发,其回调函数中有一个参数{Error Object}
,表示错误信息。
3. 可读流中的方法
拉取数据:readable.read()
readable.read([size])
read()
方法会从内部缓冲区中拉取并返回若干数据。当没有更多可用数据时,会返回null
。其参数及返回值如下:
-
size
-{Number} 可选参数,指定需要读取的数据量 - 返回:{String | Buffer | null}
使用read()
方法读取数据时,如果传入了size
参数,那么它会返回指定字节的数据;当指定的size
字节不可用时,则返回null
。如果没有指定size
参数,那么会返回内部缓冲区中的所有数据
该方法仅应在暂停模式时被调用。在流动模式中,该方法会被自动调用直到内部缓冲区被清空。
当手动调用该方法返回一个数据块时,'data'
同时也会被触发。
var readable = getReadableStreamSomehow(); readable.on('readable', function() { var chunk; while (null !== (chunk = readable.read())) { console.log('收到了 %d 字节的数据', chunk.length); } });
指定数据编码:readable.setEncoding()
readable.setEncoding(encoding)
-
encoding
{String} 要使用的编码 -
返回:
this
setEncoding()
方法用于设置读取数据的编码方式,设置后会以指定编码的字符串返回数据,若不指定将返回Buffer。
该方法有一个好处,就是能够正确处理多字节字符。如果不这么做,而是取出Buffer
并调用buf.toString(encoding)
方法转换为字符串,那么很可能会出现字节错位。
var readable = getReadableStreamSomehow(); readable.setEncoding('utf8'); readable.on('data', function(chunk) { assert.equal(typeof chunk, 'string'); console.log('收到了 %d 个字符的字符串数据', chunk.length); })
切换为流动流:readable.resume()
readable.resume()
-
返回:
this
resume()
方法会将流从暂停模式切换到流动模式,并让可读流继续触发'data'
事件。如果你不想从可读流中消费数据,也可以调用resume()
方法触发一个'end'
事件。
var readable = getReadableStreamSomehow(); readable.resume(); readable.on('end', function(chunk) { console.log('到达末端,但并未读取任何东西'); })
切换为暂停流:readable.pause()
readable.pause()
-
返回:
this
pause()
方法会将流从流动模式切换到暂停模式,并让可读流停止触发'data'
事件,让后续可用数据留在内部缓冲区中。
var readable = getReadableStreamSomehow(); readable.on('data', function(chunk) { console.log('得到 %d 字节数据', chunk.length); readable.pause(); console.log('接下来 1 秒内不会有数据'); setTimeout(function() { console.log('现在数据会再次开始流动'); readable.resume(); }, 1000); })
是否是暂停流:readable.isPaused()
readable.isPaused()
-
返回:
Boolean
该方法用于检测可读流是否被明确暂停过,即:客户端代码是否调用过readable.pause()
方法。
var readable = new stream.Readable readable.isPaused() // === false readable.pause() readable.isPaused() // === true readable.resume() readable.isPaused() // === false
切换为流动流:readable.pipe()
readable.pipe(destination, [options])
-
destination
{Writable Stream} 写入数据的目标 -
options
{Object} 导流选项-
end
{Boolean} 读取到结束符时,结束写入。默认为true
-
pause()
方法可以从可读流(Readable stream)中拉取所有数据,并将数据写入到所提供的写入目标中,写入目标是一个可写流(Writable stream)。该方法能自动控制流量,以避免目标被快速读取的可读流所淹没。
pause()
方法可以导流到多个目标。
var readable = getReadableStreamSomehow(); var writable = fs.createWriteStream('file.txt'); // 所有 readable 的数据会被写入到 'file.txt' readable.pipe(writable);
pause()
方法导流后返回目标流, 因此你可以建立导流链:
var r = fs.createReadStream('file.txt'); var z = zlib.createGzip(); var w = fs.createWriteStream('file.txt.gz'); r.pipe(z).pipe(w);
模拟 Unix 的cat
命令
process.stdin.pipe(process.stdout);
默认情况下,当来源流触发'end'
事件时,目标流(Writable stream)的end()
会被调用,所以这时 destination
将不再可写。将options
设置为{ end: false }
,可以让目标流保持开启状态。
reader.pipe(writer, { end: false }); reader.on('end', function() { writer.end('Goodbye\n'); });
解除流导向:readable.unpipe()
readable.unpipe([destination])
-
destination
{Writable Stream}可选,解指定的导向流
该方法会移除之前调用pipe()
所设定的导向,如果没有指定destination
,那么所有的导向pipe
都会被移除。如果指定了目标,但并没有与之建立导流,那么什么事情都不会发生。
var readable = getReadableStreamSomehow(); var writable = fs.createWriteStream('file.txt'); // 来自 readable 的所有数据都会被写入 'file.txt', // 但仅发生在第 1 秒 readable.pipe(writable); setTimeout(function() { console.log('停止写入到 file.txt'); readable.unpipe(writable); console.log('自行关闭文件流'); writable.end(); }, 1000);
数据块插回来源流:readable.unshift()
readable.unshift(chunk)
-
chunk
{Buffer | String} 要插回读取队列开头的数据块
该方法可以将读取的数据插回来源流。
该方法在许多场景中都很有用,如:一个流正在被一个解析器消费,解析器可能需要将某些刚拉取数据“逆消费”回来源流中,以便流能将它传递给其它消费者。但是,如果程序中频繁调用stream.unshift(chunk)
,就应该考虑实现一个Transform
流。
// 取出以 \n\n 分割的头部并将多余部分 unshift() 回去 // callback 以 (error, header, stream) 形式调用 var StringDecoder = require('string_decoder').StringDecoder; function parseHeader(stream, callback) { stream.on('error', callback); stream.on('readable', onReadable); var decoder = new StringDecoder('utf8'); var header = ''; function onReadable() { var chunk; while (null !== (chunk = stream.read())) { var str = decoder.write(chunk); if (str.match(/\n\n/)) { // 找到头部边界 var split = str.split(/\n\n/); header += split.shift(); var remaining = split.join('\n\n'); var buf = new Buffer(remaining, 'utf8'); if (buf.length) stream.unshift(buf); stream.removeListener('error', callback); stream.removeListener('readable', onReadable); // 现在可以从流中读取消息的主体了 callback(null, header, stream); } else { // 仍在读取头部 header += str; } } } }