菜单
开源 RSS

ReadableStream

ReadableStream 类型表示一个可读的数据流。

构造 ReadableStream

构造函数创建一个新的 ReadableStream 对象。

它接受两个**可选**参数

  • underlyingsource: 定义数据的底层源。
  • queuingStrategy: 采用的排队策略。
JavaScript
import { ReadableStream } from 'k6/experimental/streams';

new ReadableStream(
  {
    start(controller) {
      // Perform any setup tasks
    },

    pull(controller) {
      // Fetch and queue data into the stream
    },

    cancel(reason) {
      // Perform any cleanup tasks
    },

    type: 'default',
  },
  {
    highWaterMark: 1,
    size(chunk) {
      return 1;
    },
  }
);

构造函数参数

underlyingSource (可选)

underlyingSource 参数是一个对象,定义了流的数据源。它可以是一个包含以下属性的对象

  • start(controller): 一个**可选**函数,在流创建时调用。可用于执行任何设置任务。此方法的内容由用户定义。传递给此方法的 controller 参数是一个 ReadableStreamDefaultController 对象。
  • pull(controller): 一个**可选**函数,重复调用以获取数据并将其排队到流中,直到达到其高水位线。如果 pull() 返回一个 promise,则在 promise 解决之前不会再次调用它。传递给此方法的 controller 参数是一个 ReadableStreamDefaultController 对象。
  • cancel(reason): 一个**可选**函数,由用户定义,在流取消时调用。可用于释放对流源的访问并执行任何清理任务。传递给此方法的 reason 参数是一个可选的、人类可读的值,表示取消流的原因。
  • type: 一个**可选**字符串,指定底层源的类型。当前只能接收值 'default',这是其默认值。

queuingStrategy 参数 (可选)

queuingStrategy 参数是一个对象,定义了流要采用的排队策略。它可以是一个包含以下属性的对象

  • highWaterMark: 一个**可选**数字,表示流在其内部队列中可以容纳的最大块数。默认值为 1
  • size(chunk): 一个**可选**函数,返回作为参数传递的块的大小。默认值是返回 1 的函数。

尽管您可以定义自己的自定义排队策略,但使用 ReadableStream 的默认行为和推荐方式是使用 CountQueuingStrategy 对象。

方法

名称描述
cancel(reason)关闭流并指示关闭原因。
getReader()返回一个 ReadableStreamDefaultReader 对象。

示例

使用 ReadableStream 的最简单的说明性示例是创建一个数字流。

JavaScript
import { ReadableStream } from 'k6/experimental/streams';
import { setTimeout } from 'k6/timers';

function numbersStream() {
  let currentNumber = 0;

  return new ReadableStream({
    start(controller) {
      const fn = () => {
        if (currentNumber < 10) {
          controller.enqueue(++currentNumber);
          setTimeout(fn, 1000);
          return;
        }

        controller.close();
      };
      setTimeout(fn, 1000);
    },
  });
}

export default async function () {
  const stream = numbersStream();
  const reader = stream.getReader();

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;
    console.log(`received number ${value} from stream`);
  }

  console.log('we are done');
}

一个更有用的定义 ReadableStream 的说明是读取文件中的行。

JavaScript
import { open } from 'k6/experimental/fs';
import { ReadableStream } from 'k6/experimental/streams';

// Open a csv file containing the data to be read
let file;
(async function () {
  file = await open('./data.csv');
})();

export default async function () {
  let lineReaderState;

  // Define a ReadableStream that reads lines from the file
  // and parses them into objects with name and color properties.
  const fileLinesStream = new ReadableStream({
    // The start function is called when the readable stream is
    // created. In here, you can connect to the data source
    // and perform administrative tasks.
    async start(controller) {
      lineReaderState = {
        buffer: new Uint8Array(1024),
        remaining: '',
      };
    },

    // The pull function is called repeatedly to get data, while the
    // internal high water mark is not reached.
    async pull(controller) {
      const line = await getNextLine(file, lineReaderState);
      if (line === null) {
        controller.close();
        return;
      }

      const [name, color] = line.split(',');
      controller.enqueue({ name, color });
    },
  });

  // Obtain and lock a reader to the stream
  const reader = fileLinesStream.getReader();

  try {
    // Read and process each item from the stream
    while (true) {
      const { done, value } = await reader.read();
      if (done) {
        break;
      }

      console.log(value);
    }
  } catch (error) {
    console.error('Stream reading failed: ', error);
  }
}

// getNextLine reads the next line from the file and returns it.
//
// It reads the file in chunks and buffers the remaining data
// to handle partial lines. It returns null when there are no
// more lines to read.
async function getNextLine(file, state) {
  while (true) {
    if (state.remaining.includes('\n')) {
      const lineEndIndex = state.remaining.indexOf('\n');
      const line = state.remaining.substring(0, lineEndIndex).trim();

      state.remaining = state.remaining.substring(lineEndIndex + 1);

      if (line) {
        return line;
      }
    } else {
      const bytesRead = await file.read(state.buffer);
      if (bytesRead === null) {
        // EOF

        if (state.remaining) {
          const finalLine = state.remaining.trim();

          // Clear remaining to signal the end
          state.remaining = '';

          // Return the last non-empty line
          return finalLine;
        }

        // Indicate that there are no more lines to read
        return null;
      }

      state.remaining += String.fromCharCode.apply(
        null,
        new Uint8Array(state.buffer.slice(0, bytesRead))
      );
    }
  }
}