最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

Bull队列:当作业失败时,如何停止队列处理剩余的作业?

网站源码admin19浏览0评论

Bull队列:当作业失败时,如何停止队列处理剩余的作业?

Bull队列:当作业失败时,如何停止队列处理剩余的作业?

我正在使用bull队列来处理一些作业。在当前情况下,每个作业都是某种操作。因此,只要队列中一系列操作中的一个操作(作业)失败,队列就必须停止处理其余作业(操作)。

到目前为止我尝试了什么?

因此,我尝试在特定作业失败时暂停队列。接下来,队列耗尽时将继续。现在,当它恢复时,队列不是从失败的作业开始,而是接下一个作业。

var Queue = require('bull');

let redisOptions = {
  redis: { port: 6379, host: '127.0.0.1' },
  limiter: { max: 1, duration: 1000 }
}
var myQueue = new Queue('Linear-Queue', redisOptions);

myQueue.process('Type-1', function (job, done) {
  setTimeout(() => {
    done(job.data.error);
  }, job.data.time);
});

let options = {
  attempts: 3,
  removeOnComplete: false, // removes job from queue on success
  removeOnFail: false // removes job from queue on failure
}

setTimeout(() => {
  myQueue.add('Type-1', { time: 10000, description: "Type-1 One", error: false }, options);
}, 1 * 1000);

setTimeout(() => {
  myQueue.add('Type-1', { time: 5000, description: "Type-1 two", error: true }, options);
}, 2 * 1000);

setTimeout(() => {
  myQueue.add('Type-1', { time: 3000, description: "Type-1 three", error: false }, options);
}, 3 * 1000);


myQueue.on('completed', function (job, result) {
  console.log("Completed: " + job.data.description);
});

myQueue.on('failed', async function (job, error) {
  console.log("Failed: " + job.data.description);
  try {
    await myQueue.pause();
  } catch (error) {
    console.log(error);
  }
});

myQueue.on('drained', async function () {
  try {
    await myQueue.resume();
  } catch (error) {
    console.log(error);
  }
});

当前输出:

预期输出:如果Type-1 two在第三次尝试中成功完成。

Completed: Type-1 One
Failed: Type-1 two
Failed: Type-1 two
Completed: Type-1 two
Completed: Type-1 three

预期的输出:如果Type-1 two在第三次尝试中也失败。

Completed: Type-1 One
Failed: Type-1 two
Failed: Type-1 two
Failed: Type-1 two

我想要的是队列必须停止处理新作业,直到当前作业完成而没有任何失败。如果发生任何故障,则失败的作业必须运行一些x时间。在x+1尝试中,它必须清除(删除所有作业)队列。那么如何在队列中实现这种线性行为。

回答如下:
var Queue = require('bull');

let redisOptions = {
  redis: {
    port: 6379,
    host: '127.0.0.1'
  },
  // Maximum one job is processed every 5 seconds.
  limiter: {
    max: 1,
    duration: 5000
  },
  settings: {
    backoffStrategies: {
      // Custom backoff strategy.
      myStrategy: function (attemptsMade, error) {
        return error.delay || 60 * 1000;
      }
    }
  }
}
var myQueue = new Queue('Linear-Queue', redisOptions);

myQueue.process('Type-1', function (job, done) {
  setTimeout(() => {
    // compare attemptsMade with 3, to test 'on-all-attempts-fail' scenario.
    if (job.attemptsMade == 2) {
      done(false);
    } else {
      done(job.data.error);
    }
  }, job.data.time);
});

let options = {
  attempts: 3,
  backoff: {
    type: 'myStrategy'
  },
  removeOnComplete: false, // Set to true if job has to be removed on success.
  removeOnFail: false // Set to true if job has to be removed on failure.
}

for (let i = 1; i <= 10; i++) {
  let error = false;

  if (i == 2) {
    error = true
  }

  myQueue.add('Type-1', { time: i * 1000, description: "Type-1 Job-" + i, error: error }, options);

  // You can also add job with some time gap.
  // setTimeout(i => {
  //   myQueue.add('Type-1', { time: i * 1000, description: "Type-1 Job-" + i, error: error }, options);
  // }, 1000, i);
}

myQueue.on('completed', async function (job, result) {
  console.log("Completed: Job " + job.id);
});

myQueue.on('failed', async function (job, error) {
  console.log("Failed: Job " + job.id + " on attempt " + job.attemptsMade);
  handelFailure(job);
});

myQueue.on('error', function (error) {
  console.log("Queue: on error");
  console.log(error);
});

async function handelFailure(currentJob) {
  if (currentJob.opts.attempts == currentJob.attemptsMade) {
    // Remove all jobs in queue and clan the redis.
    await myQueue.clean(70 * 1000, 'wait');
    await myQueue.clean(70 * 1000, 'active');
    await myQueue.clean(70 * 1000, 'failed');
    await myQueue.clean(70 * 1000, 'paused');
    await myQueue.clean(70 * 1000, 'delayed');
    await myQueue.empty();
    return;
  }

  let pendingJobs = await myQueue.getJobs(['waiting', 'active', 'failed', 'paused', 'delayed'], 0, -1, true);
  console.log("Failing all remaining " + pendingJobs.length + " jobs...");

  for (let i = 0; i < pendingJobs.length; i++) {
    if (pendingJobs[i].id == currentJob.id) {
      continue;
    }
    let errorInfo = {
      delay: (70 * 1000) + (i * 5 * 1000),
      message: "Moving " + pendingJobs[i].id + " to failed queue."
    }
    await pendingJobs[i].moveToFailed(errorInfo, true);
  }
}

输出1:当作业2在第三次尝试中成功完成时。

输出2:当作业2的所有3次尝试均失败时(队列按预期停止处理剩余的作业)。

发布评论

评论列表(0)

  1. 暂无评论