最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

为什么当一个流有错误时flatMap没有输出?

运维笔记admin19浏览0评论

为什么当一个流有错误时flatMap没有输出?

为什么当一个流有错误时flatMap没有输出?

我试着用highland.js编写一个程序来下载几个文件,解压缩它们并解析成对象,然后通过flatMap将对象流合并为一个流并打印出来。

function download(url) {
    return _(request(url))
        .through(zlib.createGunzip())
        .errors((err) => console.log('Error in gunzip', err))
        .through(toObjParser)
        .errors((err) => console.log('Error in OsmToObj', err));
}  

const urlList = ['url_1', 'url_2', 'url_3'];

_(urlList)
    .flatMap(download)
    .each(console.log);

当所有URL都有效时,它可以正常工作。如果URL无效,则没有下载文件,则gunzip报告错误。我怀疑发生错误时流会关闭。我希望flatMap将继续其他流,但程序不下载其他文件,没有任何打印出来。

什么是处理流中的错误的正确方法以及如何在一个流出错后使flatMap不停止?

在命令式编程中,我可以添加调试日志来跟踪发生错误的位置。如何调试流代码?

PS。 toObjParser是一个节点转换流。它需要一个可读的OSM XML流,并输出与Overpass OSM JSON兼容的对象流。见

2017-12-19更新:

我试图在push打电话给errors,因为@amsross建议。为了验证push是否真的有效,我推送了一个XML文档,它由解析器解析,我从输出中看到它。但是,流仍然停止,并且未下载url_3。

function download(url) {
    console.log('download', url);
    return _(request(url))
        .through(zlib.createGunzip())
        .errors((err, push) => {
            console.log('Error in gunzip', err);
            push(null, Buffer.from(`<?xml version='1.0' encoding='UTF-8'?>
<osmChange version="0.6">
<delete>
<node id="1" version="2" timestamp="2008-10-15T10:06:55Z" uid="5553" user="foo" changeset="1" lat="30.2719406" lon="120.1663723"/>
</delete>
</osmChange>`));
        })
        .through(new OsmToObj())
        .errors((err) => console.log('Error in OsmToObj', err));
}

const urlList = ['url_1_correct', 'url_2_wrong', 'url_3_correct'];

_(urlList)
    .flatMap(download)
    .each(console.log);
回答如下:

更新12/19/2017:好的,所以我不能给你一个好的原因,但是我可以告诉你,从downloadsequence消耗的流转换到merge'ing它们可能会给你带来结果你在追求。不幸的是(或不是?),您将不再以任何规定的顺序获得结果。

const request = require('request')
const zlib = require('zlib')
const h = require('highland')

// just so you can see there isn't some sort of race
const rnd = (min, max) => Math.floor((Math.random() * (max - min))) + min
const delay = ms => x => h(push => setTimeout(() => {
  push(null, x)
  push(null, h.nil)
}, ms))

const download = url => h(request(url))
  .flatMap(delay(rnd(0, 2000)))
  .through(zlib.createGunzip())

h(['urlh1hcorrect', 'urlh2hwrong', 'urlh3hcorrect'])
  .map(download).merge()
  // vs .flatMap(download) or .map(download).sequence()
  .errors(err => h.log(err))
  .each(h.log)

更新12/03/2017:当在流上遇到错误时,它会结束该流。为避免这种情况,您需要处理错误。您目前正在使用errors报告错误,但未处理错误。你可以这样做,继续前进到流中的下一个值:

.errors((err, push) => {
  console.log(err)
  push(null) // push no error forward
})

原文:如果不知道toObjParser的输入和输出类型,很难回答。

因为through将值流传递给提供的函数并且期望返回一个值流,所以您的问题可能存在于toObjParser中,其签名类似于Stream -> ObjectStream -> Stream Object,其中错误发生在内部流上,不会发出任何错误,直到它被消耗。

.each(console.log)的输出是多少?如果它正在记录流,那很可能是您的问题。

发布评论

评论列表(0)

  1. 暂无评论