谷歌云PubSub的不是ACK消息
我们根据GCP PubSub的发布者和预订系统的系统。用户处理单个消息相当长,约1分钟。我们已经设定的用户确认截止时间为600秒(10分钟)(最大的一个),以确保,即发布订阅将不会启动再分发过于早,因为基本上我们已长期运行的操作这里。
我看到的PubSub的这种行为。而代码发送ACK和监视器确认PubSub的确认请求已被接受和确认本身与成功状态,UNACKED消息总数仍然是相同的完成。
上显示的金额相同的图表指标,统计和平均聚合对齐。在上述对准画面均值和启用没有减速。
我使用谷歌@云/发布订阅的Node.js库。不同的版本都已经试过(0.18.1,0.22.2,0.24.1),但我想在他们发行不。
下面的类可以用来检查。
打字稿3.1.1,节点8.x.x - 10.x.x
import { exponential, Backoff } from "backoff";
const pubsub = require("@google-cloud/pubsub");
export interface IMessageHandler {
handle (message): Promise<void>;
}
export class PubSubSyncListener {
private readonly client;
private listener: Backoff;
private runningOperations: Promise<unknown>[] = [];
constructor (
private readonly handler: IMessageHandler,
private readonly options: {
/**
* Maximal messages number to be processed simultaniosly.
* Listener will try to keep processing number as close to provided value
* as possible.
*/
maxMessages: number;
/**
* Formatted full subscrption name /projects/{projectName}/subscriptions/{subscriptionName}
*/
subscriptionName: string;
/**
* In milliseconds
*/
minimalListenTimeout?: number;
/**
* In milliseconds
*/
maximalListenTimeout?: number;
}
) {
this.client = new pubsub.v1.SubscriberClient();
this.options = Object.assign({
minimalListenTimeout: 300,
maximalListenTimeout: 30000
}, this.options);
}
public async listen () {
this.listener = exponential({
maxDelay: this.options.maximalListenTimeout,
initialDelay: this.options.minimalListenTimeout
});
this.listener.on("ready", async () => {
if (this.runningOperations.length < this.options.maxMessages) {
const [response] = await this.client.pull({
subscription: this.options.subscriptionName,
maxMessages: this.options.maxMessages - this.runningOperations.length
});
for (const m of response.receivedMessages) {
this.startMessageProcessing(m);
}
this.listener.reset();
this.listener.backoff();
} else {
this.listener.backoff();
}
});
this.listener.backoff();
}
private startMessageProcessing (message) {
const index = this.runningOperations.length;
const removeFromRunning = () => {
this.runningOperations.splice(index, 1);
};
this.runningOperations.push(
this.handler.handle(this.getHandlerMessage(message))
.then(removeFromRunning, removeFromRunning)
);
}
private getHandlerMessage (message) {
message.message.ack = async () => {
const ackRequest = {
subscription: this.options.subscriptionName,
ackIds: [message.ackId]
};
await this.client.acknowledge(ackRequest);
};
return message.message;
}
public async stop () {
this.listener.reset();
this.listener = null;
await Promise.all(
this.runningOperations
);
}
}
这基本上是部分实现的异步消息和即时确认的拉动。由于提出的解决方案之一是在同步牵引的使用。
我发现了类似报告的问题在Java库,如果我没有在这个问题的症状误。
这里的一个细节是,确认似乎在低数量的请求的工作。在情况下,如果我在发布订阅火单个消息,然后立即处理它,未传送的消息数量减少(下降到0,因为只有一个消息在那里之前)。
这个问题本身 - 正在发生什么,以及为什么UNACKED留言数,因为它应该在已接收到ACK没有减少?
回答如下:要the documentation,认购/ num_undelivered_messages指标,您使用的是引用“的订阅未确认的消息数(又名积压的消息)。采样每60秒。取样后,数据不是长达120秒可见。”
你不应该指望这个指标立即减少在ACKING的消息。此外,它的声音,如果你想使用发布 - 订阅一个正好一次交付的情况下,试图以确保不会再次发送邮件。 Cloud发布/订阅不提供这些语义。它提供了至少一次语义。换句话说,即使你已经收到了价值,ACKED它,收到的ACK响应,并看到从1公吨下降到0,它仍然是可能的,正确的为相同人员或其他接收该消息的精确副本。虽然在实践中这是不可能的,你应该专注于建立一个系统,是重复的宽容,而不是试图以确保您的ACK得手让你的邮件将不会被交还。