该版本仍在开发中,尚未被视为稳定。请使用最新的稳定版本,使用 Spring AMQP 4.0.0!spring-doc.cadn.net.cn

韧性:从错误和经纪人失败中恢复

Spring AMQP 提供的一些关键(也是最受欢迎的)高级功能包括协议错误或代理失败时的恢复和自动重新连接。 我们在本指南中已经看到了所有相关组件,但在这里将它们汇总起来,并逐一列出功能和恢复场景,应该会有帮助。spring-doc.cadn.net.cn

主要的重连功能由以下方式实现缓存连接工厂本身。 使用以下设备通常也很有益兔子管理员自动声明功能。 此外,如果你关心保证送达,你可能还需要使用频道交易旗帜兔子模板SimpleMessageListenerContainer以及确认模式.自动(或者如果你自己做了,也可以手动作)SimpleMessageListenerContainer.spring-doc.cadn.net.cn

自动声明交换、排队和绑定

兔子管理员组件可以在启动时声明交换、队列和绑定。 它懒散地通过连接听者. 因此,如果经纪人在启动时不在场,这也无关紧要。 第一次连接使用(例如, 通过发送消息)监听器被触发,管理员功能被应用。 在监听器中进行自动声明的另一个好处是,如果连接因任何原因被断开(例如, 经纪人死亡、网络故障等),当连接重新建立时,这些设置会再次应用。spring-doc.cadn.net.cn

以这种方式声明的队列必须具有固定名称——要么是显式声明的,要么由框架生成的匿名队列实例。 匿名队列不持久、排他且自动删除。
只有在缓存连接工厂缓存模式为渠道(默认)。 这一限制存在于独占和自动删除队列绑定到连接上。

从2.2.2版本开始,兔子管理员将检测到类型的豆子DeclarableCustomizer并在实际处理声明之前应用该函数。 例如,在框架内支持一类参数之前,设置一个新参数(属性)非常有用。spring-doc.cadn.net.cn

@Bean
public DeclarableCustomizer customizer() {
    return dec -> {
        if (dec instanceof Queue && ((Queue) dec).getName().equals("my.queue")) {
            dec.addArgument("some.new.queue.argument", true);
        }
        return dec;
    };
}

它在不提供直接访问可宣告Beans定义。spring-doc.cadn.net.cn

同步作的失败及重试选项

如果你在使用时同步失去与经纪人的连接,兔子模板(例如),春季AMQP抛出Amqp例外(通常如此,但并非总是如此,AmqpIOException). 我们不会掩盖问题的存在,所以你必须能够发现并应对异常。 如果你怀疑连接丢失(而且不是你的错),最简单的办法就是再试一次作。 你可以手动作,或者考虑用 Spring Retry 来处理重试(命令式或声明式)。spring-doc.cadn.net.cn

Spring Retry 提供了几个 AOP 拦截器,并且有很大的灵活性来指定重试参数(尝试次数、异常类型、退回算法等)。 Spring AMQP 还提供了一些方便的工厂资源,方便于 AMQP 使用场景中创建 Spring Retry 拦截器,带有强类型回调接口,可用于实现自定义恢复逻辑。 参见 Javadoc 及其属性状态重试拦截 FactoryBean无状态重试拦截FactoryBean更多细节。 如果没有交易或在重试回调内开始交易,则无状态重试是合适的。 注意,无状态重试比有状态重试更易于配置和分析,但如果存在必须回滚或确定要回滚的持续事务,通常不适合无状态重试。 交易中途断线应与回滚产生相同效果。 因此,对于事务开始于栈上方的重连,状态重试通常是最佳选择。 有状态重试需要一种机制来唯一识别消息。 最简单的方法是让发送方在MessageId(信息ID)消息属性。 提供的消息转换器提供了实现这一功能的选项:你可以设置createMessageIdstrue. 否则,你可以注入一个消息键生成器在拦截机中实现。 密钥生成器必须为每个消息返回唯一密钥。 在2.0版本之前的版本中,aMissingMessageIdAdvice被提供了。 它支持了没有messageId(信息ID属性只会被重试一次(忽略重试设置)。 此建议已不再提供,因为春季重试版本1.2中,其功能内置于拦截器和消息监听器容器中。spring-doc.cadn.net.cn

为了向下兼容,消息ID为空消息,默认对消费者来说是致命的(一次重试后被停止)。 为了复制MissingMessageIdAdvice,你可以设置statefulRetryFatalWithNullMessageId属性到false在听众容器里。 在该设置下,消费者继续运行,消息在一次重试后被拒绝。 它会被丢弃或路由到死字母队列(如果已配置的话)。

从1.3版本开始,提供了一个构建API,帮助通过使用 Java 组装这些拦截器(在@Configuration课程)。 以下示例展示了如何实现:spring-doc.cadn.net.cn

@Bean
public StatefulRetryOperationsInterceptor interceptor() {
    return RetryInterceptorBuilder.stateful()
            .maxRetries(5)
            .backOffOptions(1000, 2.0, 10000) // initialInterval, multiplier, maxInterval
            .build();
}

只有部分重试功能可以用这种方式配置。 更高级的功能则需要配置为RetryPolicy. 参见RetryPolicyJavadoc有关可用政策及其配置的更多信息。spring-doc.cadn.net.cn

用批次监听器重试

不建议用批处理监听器配置重试,除非该批次是生产者在单条记录中创建的。 有关消费者和生产者创建批次的信息,请参见批量消息。 对于消费者创建的批处理,框架无法知道批次中哪条消息导致了失败,因此在重试次数耗尽后无法恢复。 而生产者创建的批次则只有一个消息实际失败,整个消息可以被恢复。 应用程序可能希望通知自定义恢复器故障发生在批次的哪个位置,比如通过设置抛出异常的索引属性。spring-doc.cadn.net.cn

批处理监听器的重试恢复器必须实现MessageBatchRecoveryer.spring-doc.cadn.net.cn

消息监听者与异步情况

如果消息监听器由于业务异常而失败,该异常由消息监听器容器处理,然后该容器返回监听另一条消息。 如果故障是由连接中断引起的(非业务例外),收集听取消息的消费者必须被取消并重启。 这SimpleMessageListenerContainer它能无缝处理,并且会留下日志,说明监听器正在被重启。 事实上,它不停地循环,试图重启消费者。 只有当消费者行为非常糟糕时,它才会放弃。 一个副作用是,如果中介在集装箱启动时宕机,它会一直尝试,直到建立起连接。spring-doc.cadn.net.cn

业务异常处理与协议错误和断开连接不同,可能需要更多思考和定制配置,尤其是在使用事务或容器确认时。 在2.8.x之前,RabbitMQ没有对死符行为的定义。 因此,默认情况下,因业务例外被拒绝或回滚的消息可以无限次重递。 为了限制客户的重投次数,有一个选择是状态重试拦截在听众的建议链中。 拦截器可以有恢复回调,实现自定义的死符动作——根据你的具体环境选择。spring-doc.cadn.net.cn

另一种选择是设置容器的defaultRequeueRejected属性到false. 这会导致所有失败的消息被丢弃。 使用 RabbitMQ 2.8.x 或更高版本时,这也便于将消息传递到死信交换。spring-doc.cadn.net.cn

或者,你也可以抛出AmqpRejectAndDontRequeueException. 这样做可以防止消息重新排队,无论设置如何defaultRequeueRejected财产。spring-doc.cadn.net.cn

从2.1版本开始,一个ImmediateRequeueAmqpException引入时,执行完全相反的逻辑:消息无论设置如何,都会被重新排队defaultRequeueRejected财产。spring-doc.cadn.net.cn

通常会结合使用这两种技术。 你可以用状态重试拦截在带有a消息恢复器那就AmqpRejectAndDontRequeueException. 这消息恢复当所有重试都用尽时,才会被叫出。 这RejectAndDontRequeueRecoveryer正是如此。 默认消息恢复器消耗了错误的信息并发射了a警告消息。spring-doc.cadn.net.cn

从1.3版本开始,新的RepublishMessageRecoveryer提供以允许在重试用尽后发布失败消息。spring-doc.cadn.net.cn

当恢复器消耗最终异常时,消息会被 ack'd,经纪人不会发送到死信交换(如果配置为前提)。spring-doc.cadn.net.cn

什么时候RepublishMessageRecoveryer在消费者端使用,接收到的消息具有传递模式接收交付模式消息属性。 在这种情况下传递模式. 这意味着NON_PERSISTENT经纪人的交付方式。 从2.0版本开始,你可以配置RepublishMessageRecoveryer对于传递模式如果是,则在消息中设置以重新发布. 默认情况下,它使用消息属性默认值 -MessageDeliveryMode.PERSISTENT.

以下示例展示了如何设置RepublishMessageRecoveryer作为回收者:spring-doc.cadn.net.cn

@Bean
RetryOperationsInterceptor interceptor() {
    return RetryInterceptorBuilder.stateless()
            .maxRetries(5)
            .recoverer(new RepublishMessageRecoverer(amqpTemplate(), "something", "somethingelse"))
            .build();
}

RepublishMessageRecoveryer通过消息头部发布消息,并附带额外信息,如异常消息、栈跟踪、原始交换和路由密钥。 通过创建子类并覆盖,可以添加额外的头部additionalHeaders(). 这传递模式(或其他任何性质)也可以在additionalHeaders(),如下示例所示:spring-doc.cadn.net.cn

RepublishMessageRecoverer recoverer = new RepublishMessageRecoverer(amqpTemplate, "error") {

    protected Map<? extends String, ? extends Object> additionalHeaders(Message message, Throwable cause) {
        message.getMessageProperties()
            .setDeliveryMode(message.getMessageProperties().getReceivedDeliveryMode());
        return null;
    }

};

从2.0.5版本开始,如果栈跟踪过大,可能会被截断;这是因为所有标题都必须集中在一个帧内。 默认情况下,如果栈迹导致其他头部可用的空间少于20,000字节(“余量”),则会被截断。 这可以通过设置恢复器的框架MaxHeadroom(框架最大头量)属性,如果你需要更多或更少空间放置其他头部。 从2.1.13、2.2.3版本开始,异常消息也包含在计算中,并通过以下算法最大化栈迹量:spring-doc.cadn.net.cn

  • 如果仅栈跟踪就超过了限制,异常消息头将被截断为97字节以上…​栈跟踪也被截断了。spring-doc.cadn.net.cn

  • 如果栈迹很小,消息会被截断(加…​)以适应可用字节(但栈跟踪内的消息被截断为97字节以上…​).spring-doc.cadn.net.cn

每当发生任何截断时,原始例外都会被记录以保留完整信息。 在增强头部后进行评估,以便表达式中使用异常类型等信息。spring-doc.cadn.net.cn

从2.4.8版本开始,错误交换和路由密钥可以作为SpEL表达式提供,其中消息是评估的根对象。spring-doc.cadn.net.cn

从2.3.3版本开始,新增一个子类转载消息恢复者确认提供;这支持两种类型的出版商确认,并且会等待确认后返回(如果未确认或消息返回,则抛出异常)。spring-doc.cadn.net.cn

如果确认类型为相关,子类还会检测消息是否返回,并抛出AmqpMessageReturnedException;如果发表内容被负面致谢,将会被抛出AmqpNack收到异常.spring-doc.cadn.net.cn

如果确认类型为简单,子类将调用等待确认或死亡频道上的方法。spring-doc.cadn.net.cn

有关确认和退货的更多信息,请参见出版商确认与退货spring-doc.cadn.net.cn

从2.1版本开始,一个ImmediateRequeueMessageRecoveryer加入后抛出ImmediateRequeueAmqpException,通知监听器容器重新排队当前失败的消息。spring-doc.cadn.net.cn

春季重试的例外分类

春季重试在确定哪些例外可以触发重试方面具有很大灵活性。 默认配置会对所有例外重试。 鉴于用户异常被包裹在ListenerExecutionFailedException我们需要确保分类能够审查例外原因。 默认分类器只关注顶层例外。spring-doc.cadn.net.cn

自 Spring Retry 1.0.3 起,二进制异常分类器有一个性质,称为穿越原因(默认:false). 什么时候true它会穿越异常原因,直到找到匹配或无原因。spring-doc.cadn.net.cn

要使用该分类器进行重试,可以使用SimpleRetryPolicy用取最大尝试次数的构造子创建,地图例外实例,以及布尔值(穿越原因)并将该策略注入重试模板.spring-doc.cadn.net.cn

重试中介

从队列中被死字母标记的消息,在从DLX重新路由后可以重新发布回该队列。 这种重试行为由经纪人端通过X-死亡页眉。 关于这种方法的更多信息,请参见官方RabbitMQ文档spring-doc.cadn.net.cn

另一种方法是从应用程序手动将失败消息重新发布回原始交换机。 从版本开始4.0,RabbitMQ经纪人不考虑X-死亡客户端发送的头部。 基本上,任何x-*客户端忽略了头部。spring-doc.cadn.net.cn

为了缓解RabbitMQ代理的这种新行为,Spring AMQP推出了retry_count从版本3.2开始。 当该头部缺失且服务器端DLX正在运行时,x-死亡.count属性映射到该头部。 当失败消息被手动重新发布以便重试时,retry_count头部值必须手动递增。 更多信息请参见 Javadocspring-doc.cadn.net.cn

以下示例总结了经纪人手动重试的算法:spring-doc.cadn.net.cn

@RabbitListener(queues = "some_queue")
public void rePublish(Message message) {
    try {
    // Process message
    }
    catch (Exception ex) {
        Long retryCount = message.getMessageProperties().getRetryCount();
        if (retryCount < 3) {
            message.getMessageProperties().incrementRetryCount();
            this.rabbitTemplate.send("", "some_queue", message);
        }
        else {
            throw new ImmediateAcknowledgeAmqpException("Failed after 4 attempts");
        }
    }
}