最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

使用 Spring AMQP 进行错误处理

网站源码admin1浏览0评论

使用 Spring AMQP 进行错误处理

1. 简介

异步消息传递是一种松散耦合的分布式通信,在实现事件驱动体系结构方面越来越流行。幸运的是,Spring框架提供了 SpringAMQP 项目,允许我们构建基于AMQP的消息传递解决方案。

另一方面,在此类环境中处理错误可能是一项艰巨的任务。因此,在本教程中,我们将介绍处理错误的不同策略。

2. 环境设置

在本教程中,我们将使用实现 AMQP 标准的RabbitMQ。此外,Spring AMQP提供了spring-rabbit模块,这使得集成变得非常容易。

让我们将 RabbitMQ 作为独立服务器运行。我们将通过执行以下命令在Docker 容器中运行它:

代码语言:javascript代码运行次数:0运行复制
docker run -d -p 5672:5672 -p 15672:15672 --name my-rabbit rabbitmq:3-managementCopy

有关详细的配置和项目依赖项设置,请参阅我们的Spring AMQP文章。

3. 故障场景

通常,与其分布式性质,与单体或单打包应用程序相比,基于消息传递的系统可能发生更多类型的错误。

我们可以指出一些类型的异常:

  • 网络I/O 相关 – 网络连接和 I/O 操作的常规故障
  • 与协议或基础结构相关的错误,通常表示消息传递基础结构的配置错误
  • 代理相关 – 警告客户端与 AMQP 代理之间配置不正确的故障。例如,达到定义的限制或阈值、身份验证或无效的策略配置
  • 应用程序和消息相关 – 通常表示违反某些业务或应用程序规则的异常

当然,此故障列表并不详尽,但包含最常见的错误类型。

我们应该注意,Spring AMQP开箱即用地处理与连接相关的低级问题,例如通过应用重试或重新排队策略。此外,大多数故障和错误都转换为AmqpException或其子类之一。

在接下来的部分中,我们将主要关注特定于应用程序的错误和高级错误,然后介绍全局错误处理策略。

4. 项目设置

现在,让我们定义一个简单的队列和交换配置来开始:

代码语言:javascript代码运行次数:0运行复制
public static final String QUEUE_MESSAGES = "baeldung-messages-queue";
public static final String EXCHANGE_MESSAGES = "baeldung-messages-exchange";

@Bean
Queue messagesQueue() {
    return QueueBuilder.durable(QUEUE_MESSAGES)
      .build();
}
 
@Bean
DirectExchange messagesExchange() {
    return new DirectExchange(EXCHANGE_MESSAGES);
}
 
@Bean
Binding bindingMessages() {
    return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(QUEUE_MESSAGES);
}Copy

接下来,让我们创建一个简单的生产者:

代码语言:javascript代码运行次数:0运行复制
public void sendMessage() {
    rabbitTemplate
      .convertAndSend(SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES,
        SimpleDLQAmqpConfiguration.QUEUE_MESSAGES, "Some message id:" + messageNumber++);
}Copy

最后,引发异常的使用者:

代码语言:javascript代码运行次数:0运行复制
@RabbitListener(queues = SimpleDLQAmqpConfiguration.QUEUE_MESSAGES)
public void receiveMessage(Message message) throws BusinessException {
    throw new BusinessException();
}Copy

默认情况下,所有失败的消息将立即在目标队列的头部一遍又一遍地重新排队。

让我们通过执行下一个 Maven 命令来运行我们的示例应用程序:

代码语言:javascript代码运行次数:0运行复制
mvn spring-boot:run -Dstart-class=com.baeldung.springamqp.errorhandling.ErrorHandlingAppCopy

现在我们应该看到类似的结果输出:

代码语言:javascript代码运行次数:0运行复制
WARN 22260 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler :
  Execution of Rabbit message listener failed.
Caused by: com.baeldung.springamqp.errorhandling.errorhandler.BusinessException: nullCopy

因此,默认情况下,我们将在输出中看到无限数量的此类消息。

要更改此行为,我们有两个选项:

  • 在侦听器端将默认重新排队拒绝选项设置为falsespring.rabbitmq.listener.simple.default-requeue-rejected=false
  • 抛出一个AmqpRejectAndDontRequeueException –对于将来没有意义的消息,他可能很有用,因此可以丢弃它们。

现在,让我们了解如何以更智能的方式处理失败的消息。

5. 死信队列

死信队列 (DLQ) 是保存未送达或失败邮件的队列。DLQ允许我们处理错误或错误消息,监控故障模式并从系统中的异常中恢复。

更重要的是,这有助于防止队列中的无限循环,这些循环不断处理错误消息并降低系统性能。

总的来说,有两个主要概念:死信交换(DLX)和死信队列(DLQ)本身。实际上,DLX是一种正常的交换,我们可以将其定义为常见类型之一:直接主题扇出

了解生产者对队列一无所知非常重要。它只知道交换,所有生成的消息都根据交换配置和消息路由密钥进行路由。

现在让我们看看如何通过应用死信队列方法来处理异常。

5.1. 基本配置

为了配置 DLQ,我们需要在定义队列时指定其他参数:

代码语言:javascript代码运行次数:0运行复制
@Bean
Queue messagesQueue() {
    return QueueBuilder.durable(QUEUE_MESSAGES)
      .withArgument("x-dead-letter-exchange", "")
      .withArgument("x-dead-letter-routing-key", QUEUE_MESSAGES_DLQ)
      .build();
}
 
@Bean
Queue deadLetterQueue() {
    return QueueBuilder.durable(QUEUE_MESSAGES_DLQ).build();
}Copy

在上面的示例中,我们使用了两个额外的参数:x-dead-letter-exchangex-dead-letter-routeing-keyx-dead-letter-exchange选项的空字符串值告诉代理使用默认交换。

第二个参数与为简单消息设置路由密钥同样重要。此选项更改消息的初始路由密钥,以便 DLX 进一步路由。

5.2. 失败的消息路由

因此,当消息无法传递时,它将被路由到死信交换。但正如我们已经指出的,DLX是一种正常的交换。因此,如果失败的邮件路由密钥与交换不匹配,则不会将其传递到 DLQ。

代码语言:javascript代码运行次数:0运行复制
Exchange: (AMQP default)
Routing Key: baeldung-messages-queue.dlqCopy

因此,如果我们在示例中省略x-dead-letter-routeing-key参数,失败的消息将卡在无限重试循环中。

此外,消息的原始元信息可在x-death标头中找到:

代码语言:javascript代码运行次数:0运行复制
x-death:
  count: 1
  exchange: baeldung-messages-exchange
  queue: baeldung-messages-queue 
  reason: rejected
  routing-keys: baeldung-messages-queue 
  time: 1571232954
Copy

上述信息在 RabbitMQ 管理控制台中提供,该控制台通常在端口 15672 上本地运行。

除了这个配置,如果我们使用Spring Cloud Stream,我们甚至可以通过利用配置属性republishToDlq和autoBindDlq来简化配置过程。

5.3. 死信交换

在上一节中,我们已经看到当消息路由到死信交换时,路由密钥会发生变化。但这种行为并不总是可取的。我们可以通过自己配置 DLX 并使用扇出类型定义它来更改它:

代码语言:javascript代码运行次数:0运行复制
public static final String DLX_EXCHANGE_MESSAGES = QUEUE_MESSAGES + ".dlx";
 
@Bean
Queue messagesQueue() {
    return QueueBuilder.durable(QUEUE_MESSAGES)
      .withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES)
      .build();
}
 
@Bean
FanoutExchange deadLetterExchange() {
    return new FanoutExchange(DLX_EXCHANGE_MESSAGES);
}
 
@Bean
Queue deadLetterQueue() {
    return QueueBuilder.durable(QUEUE_MESSAGES_DLQ).build();
}
 
@Bean
Binding deadLetterBinding() {
    return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange());
}Copy

这次我们定义了扇出类型的自定义交换,因此消息将发送到所有有界队列。此外,我们将x-dead-letter-exchange参数的值设置为我们的 DLX 的名称。同时,我们删除了x-dead-letter-routeing-key参数。

现在,如果我们运行示例,失败的消息应该传递到 DLQ,但不更改初始路由密钥:

代码语言:javascript代码运行次数:0运行复制
Exchange: baeldung-messages-queue.dlx
Routing Key: baeldung-messages-queue
Copy

5.4. 处理死信队列消息

当然,我们之所以将它们移动到死信队列,是为了让它们可以在其他时间重新处理。

让我们为死信队列定义一个侦听器:

代码语言:javascript代码运行次数:0运行复制
@RabbitListener(queues = QUEUE_MESSAGES_DLQ)
public void processFailedMessages(Message message) {
    log.info("Received failed message: {}", message.toString());
}Copy

如果我们现在运行我们的代码示例,我们应该看到日志输出:

代码语言:javascript代码运行次数:0运行复制
WARN 11752 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler :
  Execution of Rabbit message listener failed.
INFO 11752 --- [ntContainer#1-1] c.b.s.e.consumer.SimpleDLQAmqpContainer  : 
  Received failed message:Copy

我们收到了一条失败的消息,但下一步应该怎么做?答案取决于特定的系统要求、异常类型或消息类型。

例如,我们可以将消息重新排队到原始目的地:

代码语言:javascript代码运行次数:0运行复制
@RabbitListener(queues = QUEUE_MESSAGES_DLQ)
public void processFailedMessagesRequeue(Message failedMessage) {
    log.info("Received failed message, requeueing: {}", failedMessage.toString());
    rabbitTemplate.send(EXCHANGE_MESSAGES, 
      failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
}Copy

但此类异常逻辑与默认重试策略并无不同:

代码语言:javascript代码运行次数:0运行复制
INFO 23476 --- [ntContainer#0-1] c.b.s.e.c.RoutingDLQAmqpContainer        :
  Received message: 
WARN 23476 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler :
  Execution of Rabbit message listener failed.
INFO 23476 --- [ntContainer#1-1] c.b.s.e.c.RoutingDLQAmqpContainer        : 
  Received failed message, requeueing:Copy

常见的策略可能需要重试处理邮件n次,然后拒绝它。让我们通过利用消息标头来实现此策略:

代码语言:javascript代码运行次数:0运行复制
public void processFailedMessagesRetryHeaders(Message failedMessage) {
    Integer retriesCnt = (Integer) failedMessage.getMessageProperties()
      .getHeaders().get(HEADER_X_RETRIES_COUNT);
    if (retriesCnt == null) retriesCnt = 1;
    if (retriesCnt > MAX_RETRIES_COUNT) {
        log.info("Discarding message");
        return;
    }
    log.info("Retrying message for the {} time", retriesCnt);
    failedMessage.getMessageProperties()
      .getHeaders().put(HEADER_X_RETRIES_COUNT, ++retriesCnt);
    rabbitTemplate.send(EXCHANGE_MESSAGES, 
      failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
}Copy

首先,我们获取x 重试计数标头的值,然后将此值与允许的最大值进行比较。随后,如果计数器达到尝试限制次数,则消息将被丢弃:

代码语言:javascript代码运行次数:0运行复制
WARN 1224 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : 
  Execution of Rabbit message listener failed.
INFO 1224 --- [ntContainer#1-1] c.b.s.e.consumer.DLQCustomAmqpContainer  : 
  Retrying message for the 1 time
WARN 1224 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : 
  Execution of Rabbit message listener failed.
INFO 1224 --- [ntContainer#1-1] c.b.s.e.consumer.DLQCustomAmqpContainer  : 
  Retrying message for the 2 time
WARN 1224 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : 
  Execution of Rabbit message listener failed.
INFO 1224 --- [ntContainer#1-1] c.b.s.e.consumer.DLQCustomAmqpContainer  : 
  Discarding messageCopy

我们应该补充一点,我们还可以利用x-message-ttl标头来设置消息应该被丢弃的时间。这可能有助于防止队列无限增长。

5.5. 停车场排队

另一方面,考虑一种情况,即我们不能只是丢弃一条消息,例如,它可能是银行领域的交易。或者,有时一条消息可能需要手动处理,或者我们只需要记录失败超过n 次的消息。

对于这种情况,有一个停车场队列的概念。我们可以将来自 DLQ 的所有消息(失败次数超过允许的次数)转发到停车场队列进行进一步处理。

现在让我们实现这个想法:

代码语言:javascript代码运行次数:0运行复制
public static final String QUEUE_PARKING_LOT = QUEUE_MESSAGES + ".parking-lot";
public static final String EXCHANGE_PARKING_LOT = QUEUE_MESSAGES + "exchange.parking-lot";
 
@Bean
FanoutExchange parkingLotExchange() {
    return new FanoutExchange(EXCHANGE_PARKING_LOT);
}
 
@Bean
Queue parkingLotQueue() {
    return QueueBuilder.durable(QUEUE_PARKING_LOT).build();
}
 
@Bean
Binding parkingLotBinding() {
    return BindingBuilder.bind(parkingLotQueue()).to(parkingLotExchange());
}Copy

其次,让我们重构侦听器逻辑以向停车场队列发送消息:

代码语言:javascript代码运行次数:0运行复制
@RabbitListener(queues = QUEUE_MESSAGES_DLQ)
public void processFailedMessagesRetryWithParkingLot(Message failedMessage) {
    Integer retriesCnt = (Integer) failedMessage.getMessageProperties()
      .getHeaders().get(HEADER_X_RETRIES_COUNT);
    if (retriesCnt == null) retriesCnt = 1;
    if (retriesCnt > MAX_RETRIES_COUNT) {
        log.info("Sending message to the parking lot queue");
        rabbitTemplate.send(EXCHANGE_PARKING_LOT, 
          failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
        return;
    }
    log.info("Retrying message for the {} time", retriesCnt);
    failedMessage.getMessageProperties()
      .getHeaders().put(HEADER_X_RETRIES_COUNT, ++retriesCnt);
    rabbitTemplate.send(EXCHANGE_MESSAGES, 
      failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
}Copy

最终,我们还需要处理到达停车场队列的消息:

代码语言:javascript代码运行次数:0运行复制
@RabbitListener(queues = QUEUE_PARKING_LOT)
public void processParkingLotQueue(Message failedMessage) {
    log.info("Received message in parking lot queue");
    // Save to DB or send a notification.
}Copy

现在我们可以将失败的消息保存到数据库,或者发送电子邮件通知。

让我们通过运行应用程序来测试此逻辑:

代码语言:javascript代码运行次数:0运行复制
WARN 14768 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : 
  Execution of Rabbit message listener failed.
INFO 14768 --- [ntContainer#1-1] c.b.s.e.c.ParkingLotDLQAmqpContainer     : 
  Retrying message for the 1 time
WARN 14768 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : 
  Execution of Rabbit message listener failed.
INFO 14768 --- [ntContainer#1-1] c.b.s.e.c.ParkingLotDLQAmqpContainer     : 
  Retrying message for the 2 time
WARN 14768 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : 
  Execution of Rabbit message listener failed.
INFO 14768 --- [ntContainer#1-1] c.b.s.e.c.ParkingLotDLQAmqpContainer     : 
  Sending message to the parking lot queue
INFO 14768 --- [ntContainer#2-1] c.b.s.e.c.ParkingLotDLQAmqpContainer     : 
  Received message in parking lot queueCopy

从输出中我们可以看到,在几次失败的尝试之后,消息被发送到停车场队列。

6. 自定义错误处理

在上一节中,我们已经了解了如何使用专用队列和交换处理故障。但是,有时我们可能需要捕获所有错误,例如记录或将它们持久保存到数据库中。

6.1. 全局错误处理程序

到目前为止,我们一直使用默认的SimpleRabbitListenerContainerFactory,而这个工厂默认使用ConditionalRejectingErrorHandler。此处理程序捕获不同的异常,并将它们转换为AmqpException层次结构中的异常之一。

值得一提的是,如果我们需要处理连接错误,那么我们需要实现ApplicationListener接口。

简单地说,ConditionalRejectingErrorHandler决定是否拒绝特定消息。当导致异常的邮件被拒绝时,它不会重新排队。

让我们定义一个自定义的错误处理程序,它只会简单地重新排队BusinessException

代码语言:javascript代码运行次数:0运行复制
public class CustomErrorHandler implements ErrorHandler {
    @Override
    public void handleError(Throwable t) {
        if (!(t.getCause() instanceof BusinessException)) {
            throw new AmqpRejectAndDontRequeueException("Error Handler converted exception to fatal", t);
        }
    }
}Copy

此外,当我们在侦听器方法中抛出异常时,它被包装在ListenerExecutionFailedException 中。因此,我们需要调用getCause方法来获取源异常。

6.2.致命异常策略

在后台,此处理程序使用FatalExceptionStrategy来检查异常是否应被视为致命异常。如果是这样,失败的邮件将被拒绝。

默认情况下,这些异常是致命的:

  • 消息转换异常
  • 消息转换异常
  • MethodArgumentNotValidException
  • 方法参数类型不匹配异常
  • NoSuchMethodException
  • 类投射异常

与其实现ErrorHandler接口,我们可以只提供我们的FatalExceptionStrategy

代码语言:javascript代码运行次数:0运行复制
public class CustomFatalExceptionStrategy 
      extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy {
    @Override
    public boolean isFatal(Throwable t) {
        return !(t.getCause() instanceof BusinessException);
    }
}Copy

最后,我们需要将自定义策略传递给ConditionalRejectingErrorHandler构造函数:

代码语言:javascript代码运行次数:0运行复制
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
  ConnectionFactory connectionFactory,
  SimpleRabbitListenerContainerFactoryConfigurer configurer) {
      SimpleRabbitListenerContainerFactory factory = 
        new SimpleRabbitListenerContainerFactory();
      configurer.configure(factory, connectionFactory);
      factory.setErrorHandler(errorHandler());
      return factory;
}
 
@Bean
public ErrorHandler errorHandler() {
    return new ConditionalRejectingErrorHandler(customExceptionStrategy());
}
 
@Bean
FatalExceptionStrategy customExceptionStrategy() {
    return new CustomFatalExceptionStrategy();
}Copy
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。 原始发表:2023-02-22,如有侵权请联系 cloudcommunity@tencent 删除amqp队列异常javaspring
发布评论

评论列表(0)

  1. 暂无评论