最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

带有数组的Node js流

网站源码admin23浏览0评论

带有数组的Node js流

带有数组的Node js流

第一次使用节点流,我正在尝试将数组流式传输到Algolia。 Algolia提供的示例使用json文件。 /?language=javascript#example

我尝试对数组进行字符串化,然后像Algolia示例所描述的那样使用它。我不确定什么是最好的方法。我应该对数组进行字符串化,还是需要遍历数组并将其推入流中?后一种方法是否仍使用fs?这将在firebase函数上运行,因此存在资源限制。

const algoliasearch = require('algoliasearch')
const fs = require('fs');
const StreamArray = require('stream-json/streamers/StreamArray');

const client = algoliasearch('999999', '999999');
//const index = client.initIndex('d_DASH');
const index = client.initIndex('t_DASH');



exports.dashStream = async function (listings) {
    let jsdoc = JSON.stringify(listings);
    const stream = fs.createReadStream(jsdoc).pipe(StreamArray.withParser());


    let chunks = [];
    stream
        .on('data', ({ value }) => {
            console.log("on data...")
            chunks.push(value);
            if (chunks.length === 10000) {
                stream.pause();
                index
                    .saveObjects(chunks)
                    .then(res => {
                        chunks = [];
                        stream.resume();
                    })
                    .catch(err => console.error(err));
            }
        })
        .on('end', () => {
            console.log("on end...")
            if (chunks.length) {
                console.log(`stream over?`)
                index.saveObjects(chunks,function (err, content){
                    return content.taskID.toString();
                })
                .catch(err => console.error(err));
            }
        })
        .on('error', err => console.error(err));
}

代码需要完成对Algolia的写入并从Algolia响应中返回taskID。

回答如下:

这为时已晚,但也许会对其他人有所帮助。我建议使用其他流和管道

buffer.stream.ts

将数据收集成大块并将其推送到写入流的流

import { Transform } from "stream";

// Using transform stream to collect your data into chunks
export class BufferStream extends Transform {
  private readonly buffer: object[] = [];

  constructor() {
    super({ objectMode: true });
  }

  _transform(data: object, _encoding: string, callback: () => void) {
    this.buffer.push(data);
    // You chunk size goes here. I find myself usually using a 1000
    if (this.buffer.length >= 3) {
      this.push(this.buffer.splice(0));
    }
    callback();
  }

  _final(callback: () => void) {
    // Pushing leftovers
    this.push(this.buffer);
    callback();
  }
}

write.stream.ts

将处理写入目的地的流

在对流使用异步回调时必须格外小心

import { Writable } from 'stream';

export class WriteStream extends Writable {
  constructor() {
    super({ objectMode: true });
  }

  async _write(chunk: object[], _encoding: string, callback: () => void) {
    // You have to handle your errors yourself in an asynchronous callback
    try {
      // await save(chunk);
      console.log('Chunk:', chunk)
      callback();
    } catch (error) {
      // nextTick to escape current stack
      process.nextTick(() => this.emit('error', error));
    }
  }
}

全部粘在一起

import stream, { PassThrough } from "stream";
import { promisify } from "util";
import { BufferStream } from "./buffer.stream";
import { WriteStream } from "./write.stream";

const pipelineAsync = promisify(stream.pipeline);

(async () => {
  // Imitating a stream with some data
  const stream = new PassThrough({ objectMode: true });
  for (let i = 0; i < 10; i++) {
    stream.push(i);
  }
  stream.end();

  await pipelineAsync(stream, new BufferStream(), new WriteStream());
})();

输出

Chunk: [ 0, 1, 2 ]
Chunk: [ 3, 4, 5 ]
Chunk: [ 6, 7, 8 ]
Chunk: [ 9 ] <-- Last chunk

根据我的经验,它可以自行处理背压。您不需要调用stream.pause()和.resume()。从总体上看,它看起来更干净

与本文相关的文章

发布评论

评论列表(0)

  1. 暂无评论