最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

谷歌云PubSub的不是ACK消息

运维笔记admin10浏览0评论

谷歌云PubSub的不是ACK消息

谷歌云PubSub的不是ACK消息

我们根据GCP PubSub的发布者和预订系统的系统。用户处理单个消息相当长,约1分钟。我们已经设定的用户确认截止时间为600秒(10分钟)(最大的一个),以确保,即发布订阅将不会启动再分发过于早,因为基本上我们已长期运行的操作这里。

我看到的PubSub的这种行为。而代码发送ACK和监视器确认PubSub的确认请求已被接受和确认本身与成功状态,UNACKED消息总数仍然是相同的完成。

上显示的金额相同的图表指标,统计和平均聚合对齐。在上述对准画面均值和启用没有减速。

我使用谷歌@云/发布订阅的Node.js库。不同的版本都已经试过(0.18.1,0.22.2,0.24.1),但我想在他们发行不。

下面的类可以用来检查。

打字稿3.1.1,节点8.x.x - 10.x.x

import { exponential, Backoff } from "backoff";

const pubsub = require("@google-cloud/pubsub");

export interface IMessageHandler {
    handle (message): Promise<void>;
}

export class PubSubSyncListener {
    private readonly client;

    private listener: Backoff;

    private runningOperations: Promise<unknown>[] = [];

    constructor (
        private readonly handler: IMessageHandler,
        private readonly options: {
            /**
             * Maximal messages number to be processed simultaniosly.
             * Listener will try to keep processing number as close to provided value
             * as possible.
             */
            maxMessages: number;
            /**
             * Formatted full subscrption name /projects/{projectName}/subscriptions/{subscriptionName}
             */
            subscriptionName: string;
            /**
             * In milliseconds
             */
            minimalListenTimeout?: number;
            /**
             * In milliseconds
             */
            maximalListenTimeout?: number;
        }
    ) {
        this.client = new pubsub.v1.SubscriberClient();

        this.options = Object.assign({
            minimalListenTimeout: 300,
            maximalListenTimeout: 30000
        }, this.options);
    }

    public async listen () {
        this.listener = exponential({
            maxDelay: this.options.maximalListenTimeout,
            initialDelay: this.options.minimalListenTimeout
        });

        this.listener.on("ready", async () => {
            if (this.runningOperations.length < this.options.maxMessages) {
                const [response] = await this.client.pull({
                    subscription: this.options.subscriptionName,
                    maxMessages: this.options.maxMessages - this.runningOperations.length
                });

                for (const m of response.receivedMessages) {
                    this.startMessageProcessing(m);
                }
                this.listener.reset();
                this.listener.backoff();
            } else {
                this.listener.backoff();
            }
        });

        this.listener.backoff();
    }

    private startMessageProcessing (message) {
        const index = this.runningOperations.length;

        const removeFromRunning = () => {
            this.runningOperations.splice(index, 1);
        };

        this.runningOperations.push(
            this.handler.handle(this.getHandlerMessage(message))
                .then(removeFromRunning, removeFromRunning)
        );
    }

    private getHandlerMessage (message) {
        message.message.ack = async () => {
            const ackRequest = {
                subscription: this.options.subscriptionName,
                ackIds: [message.ackId]
            };

            await this.client.acknowledge(ackRequest);
        };

        return message.message;
    }

    public async stop () {
        this.listener.reset();
        this.listener = null;
        await Promise.all(
            this.runningOperations
        );
    }
}

这基本上是部分实现的异步消息和即时确认的拉动。由于提出的解决方案之一是在同步牵引的使用。

我发现了类似报告的问题在Java库,如果我没有在这个问题的症状误。

这里的一个细节是,确认似乎在低数量的请求的工作。在情况下,如果我在发布订阅火单个消息,然后立即处理它,未传送的消息数量减少(下降到0,因为只有一个消息在那里之前)。

这个问题本身 - 正在发生什么,以及为什么UNACKED留言数,因为它应该在已接收到ACK没有减少?

回答如下:

要the documentation,认购/ num_undelivered_messages指标,您使用的是引用“的订阅未确认的消息数(又名积压的消息)。采样每60秒。取样后,数据不是长达120秒可见。”

你不应该指望这个指标立即减少在ACKING的消息。此外,它的声音,如果你想使用发布 - 订阅一个正好一次交付的情况下,试图以确保不会再次发送邮件。 Cloud发布/订阅不提供这些语义。它提供了至少一次语义。换句话说,即使你已经收到了价值,ACKED它,收到的ACK响应,并看到从1公吨下降到0,它仍然是可能的,正确的为相同人员或其他接收该消息的精确副本。虽然在实践中这是不可能的,你应该专注于建立一个系统,是重复的宽容,而不是试图以确保您的ACK得手让你的邮件将不会被交还。

与本文相关的文章

发布评论

评论列表(0)

  1. 暂无评论