ReadableStream
ReadableStream
类型表示一个可读的数据流。
构造 ReadableStream
构造函数创建一个新的 ReadableStream
对象。
它接受两个**可选**参数
underlyingsource
: 定义数据的底层源。queuingStrategy
: 采用的排队策略。
import { ReadableStream } from 'k6/experimental/streams';
new ReadableStream(
{
start(controller) {
// Perform any setup tasks
},
pull(controller) {
// Fetch and queue data into the stream
},
cancel(reason) {
// Perform any cleanup tasks
},
type: 'default',
},
{
highWaterMark: 1,
size(chunk) {
return 1;
},
}
);
构造函数参数
underlyingSource (可选)
underlyingSource
参数是一个对象,定义了流的数据源。它可以是一个包含以下属性的对象
start(controller)
: 一个**可选**函数,在流创建时调用。可用于执行任何设置任务。此方法的内容由用户定义。传递给此方法的controller
参数是一个 ReadableStreamDefaultController 对象。pull(controller)
: 一个**可选**函数,重复调用以获取数据并将其排队到流中,直到达到其高水位线。如果pull()
返回一个 promise,则在 promise 解决之前不会再次调用它。传递给此方法的controller
参数是一个 ReadableStreamDefaultController 对象。cancel(reason)
: 一个**可选**函数,由用户定义,在流取消时调用。可用于释放对流源的访问并执行任何清理任务。传递给此方法的reason
参数是一个可选的、人类可读的值,表示取消流的原因。type
: 一个**可选**字符串,指定底层源的类型。当前只能接收值'default'
,这是其默认值。
queuingStrategy 参数 (可选)
queuingStrategy
参数是一个对象,定义了流要采用的排队策略。它可以是一个包含以下属性的对象
highWaterMark
: 一个**可选**数字,表示流在其内部队列中可以容纳的最大块数。默认值为1
。size(chunk)
: 一个**可选**函数,返回作为参数传递的块的大小。默认值是返回1
的函数。
尽管您可以定义自己的自定义排队策略,但使用 ReadableStream
的默认行为和推荐方式是使用 CountQueuingStrategy 对象。
方法
名称 | 描述 |
---|---|
cancel(reason) | 关闭流并指示关闭原因。 |
getReader() | 返回一个 ReadableStreamDefaultReader 对象。 |
示例
使用 ReadableStream
的最简单的说明性示例是创建一个数字流。
import { ReadableStream } from 'k6/experimental/streams';
import { setTimeout } from 'k6/timers';
function numbersStream() {
let currentNumber = 0;
return new ReadableStream({
start(controller) {
const fn = () => {
if (currentNumber < 10) {
controller.enqueue(++currentNumber);
setTimeout(fn, 1000);
return;
}
controller.close();
};
setTimeout(fn, 1000);
},
});
}
export default async function () {
const stream = numbersStream();
const reader = stream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
console.log(`received number ${value} from stream`);
}
console.log('we are done');
}
一个更有用的定义 ReadableStream
的说明是读取文件中的行。
import { open } from 'k6/experimental/fs';
import { ReadableStream } from 'k6/experimental/streams';
// Open a csv file containing the data to be read
let file;
(async function () {
file = await open('./data.csv');
})();
export default async function () {
let lineReaderState;
// Define a ReadableStream that reads lines from the file
// and parses them into objects with name and color properties.
const fileLinesStream = new ReadableStream({
// The start function is called when the readable stream is
// created. In here, you can connect to the data source
// and perform administrative tasks.
async start(controller) {
lineReaderState = {
buffer: new Uint8Array(1024),
remaining: '',
};
},
// The pull function is called repeatedly to get data, while the
// internal high water mark is not reached.
async pull(controller) {
const line = await getNextLine(file, lineReaderState);
if (line === null) {
controller.close();
return;
}
const [name, color] = line.split(',');
controller.enqueue({ name, color });
},
});
// Obtain and lock a reader to the stream
const reader = fileLinesStream.getReader();
try {
// Read and process each item from the stream
while (true) {
const { done, value } = await reader.read();
if (done) {
break;
}
console.log(value);
}
} catch (error) {
console.error('Stream reading failed: ', error);
}
}
// getNextLine reads the next line from the file and returns it.
//
// It reads the file in chunks and buffers the remaining data
// to handle partial lines. It returns null when there are no
// more lines to read.
async function getNextLine(file, state) {
while (true) {
if (state.remaining.includes('\n')) {
const lineEndIndex = state.remaining.indexOf('\n');
const line = state.remaining.substring(0, lineEndIndex).trim();
state.remaining = state.remaining.substring(lineEndIndex + 1);
if (line) {
return line;
}
} else {
const bytesRead = await file.read(state.buffer);
if (bytesRead === null) {
// EOF
if (state.remaining) {
const finalLine = state.remaining.trim();
// Clear remaining to signal the end
state.remaining = '';
// Return the last non-empty line
return finalLine;
}
// Indicate that there are no more lines to read
return null;
}
state.remaining += String.fromCharCode.apply(
null,
new Uint8Array(state.buffer.slice(0, bytesRead))
);
}
}
}