streams
注意
这是一个实验性模块。
虽然我们致力于保持实验性模块的稳定,但可能需要引入突破性更改。这可能发生在未来的 k6 版本中,直到模块完全稳定并成为 k6 核心模块。欲了解更多信息,请参阅扩展毕业流程。
实验性模块保持高度稳定,并遵循常规维护和安全措施。欢迎您提交问题,如果您有任何反馈或建议。
k6 streams 实验性模块提供了 Streams API 规范的实现,提供了一种在测试脚本中定义和使用数据流的方式。它目前实现了完整规范的子集,并支持定义和使用可读流。
概念和用法
流式处理涉及将要处理或使用的资源分解成较小的块,这些块可以递增地处理或使用。这在处理大文件或数据源时特别有用,因为它允许您以较小的、更易管理的方式处理数据。
借助 k6 对 Streams API 的支持,您可以立即使用 Javascript 一点一点地开始处理原始数据,而无需在内存中生成完整的数据表示。这有助于减少内存使用并提高性能,尤其是在处理大型数据集时。
API 概述
类 | 描述 |
---|---|
ReadableStream | 表示一个可读的数据流。 |
示例
import { open } from 'k6/experimental/fs';
import { ReadableStream } from 'k6/experimental/streams';
// Open a csv file containing the data to be read
const file = await open('./data.csv');
export default async function () {
let lineReaderState;
// Define a ReadableStream that reads lines from the file
// and parses them into objects with name and color properties.
const fileLinesStream = new ReadableStream({
// The start function is called when the readable stream is
// created. In here, you can connect to the data source
// and perform administrative tasks.
async start(controller) {
lineReaderState = {
buffer: new Uint8Array(1024),
remaining: '',
};
},
// The pull function is called repeatedly to get data, while the
// internal high water mark is not reached.
async pull(controller) {
const line = await getNextLine(file, lineReaderState);
if (line === null) {
controller.close();
return;
}
const [name, color] = line.split(',');
controller.enqueue({ name, color });
},
});
// Obtain and lock a reader to the stream
const reader = fileLinesStream.getReader();
try {
// Read and process each item from the stream
while (true) {
const { done, value } = await reader.read();
if (done) {
break;
}
console.log(value);
}
} catch (error) {
console.error('Stream reading failed: ', error);
}
}
// getNextLine reads the next line from the file and returns it.
//
// It reads the file in chunks and buffers the remaining data
// to handle partial lines. It returns null when there are no
// more lines to read.
async function getNextLine(file, state) {
while (true) {
if (state.remaining.includes('\n')) {
const lineEndIndex = state.remaining.indexOf('\n');
const line = state.remaining.substring(0, lineEndIndex).trim();
state.remaining = state.remaining.substring(lineEndIndex + 1);
if (line) {
return line;
}
} else {
const bytesRead = await file.read(state.buffer);
if (bytesRead === null) {
// EOF
if (state.remaining) {
const finalLine = state.remaining.trim();
// Clear remaining to signal the end
state.remaining = '';
// Return the last non-empty line
return finalLine;
}
// Indicate that there are no more lines to read
return null;
}
state.remaining += String.fromCharCode.apply(
null,
new Uint8Array(state.buffer.slice(0, bytesRead))
);
}
}
}