批量处理AWS LAMBDA消息
我想知道的东西,我真的无法找到关于它的信息。也许这是不是要走的路,但我只是想知道。
它是关于LAMBDA分批工作。我知道我可以设置LAMBDA消费批次的消息。在我的lambda函数我遍历每个消息,如果一个失败,LAMBDA退出。并再次开始循环。
我想知道关于略有不同的方法让我们假设我有三条消息:A,B和C.我也把他们分批。现在,如果消息B失败(例如API调用失败),I返回消息B至SQS和保持处理消息C.
可能吗?如果是,这是一个好的方法吗?因为我知道我需要实施一些LAMBDA额外的复杂性,什么不是。
谢谢
回答如下:这里有一个很好的文章here。对您的相关部分...
- 使用1 BATCHSIZE,这样的消息成功或失败的自己。
- 确保你的处理是幂等,所以重新处理的消息是无害的,额外的处理成本之外。
- 你的函数代码内的错误,也许是通过捕捉它们,将消息发送到死信队列进行进一步的处理。
- 成功处理消息后,手动在函数中调用DeleteMessage可以API。
最后一颗子弹的一点是如何我已经成功地处理了同样的问题。而是立即返回错误,存储它们或注意,发生了错误,但再继续处理消息的休息间歇。在处理结束后,返回或者使得SQS引发错误 - >拉姆达触发知道不删除失败的消息。所有成功的消息将已经被你的拉姆达处理程序删除。
sqs = boto3.client('sqs')
def handler(event, context):
failed = False
for msg in event['Records']:
try:
# Do something with the message.
handle_message(msg)
except Exception:
# Ok it failed, but allow the loop to finish.
logger.exception('Failed to handle message')
failed = True
else:
# The message was handled successfully. We can delete it now.
sqs.delete_message(
QueueUrl=<queue_url>,
ReceiptHandle=msg['receiptHandle'],
)
# It doesn't matter what the error is. You just want to raise here
# to ensure the trigger doesn't delete any of the failed messages.
if failed:
raise RuntimeError('Failed to process one or more messages')
def handle_msg(msg):
...