|
对于最新稳定版本,请使用Spring AMQP 4.0.0! |
交易
Spring Rabbit框架支持同步和异步用例中的自动事务管理,具有多种不同的语义,这些语义可以通过声明式选择,这对现有Spring事务用户来说是熟悉的。 这使得许多(如果不是大多数)常见的消息模式变得易于实施。
向框架传递所需的交易语义有两种方式。
在这两者中兔子模板和SimpleMessageListenerContainer,有一面旗帜频道交易如果true,告诉框架使用事务通道,并通过提交或回滚结束所有作(发送或接收)(取决于结果),例外表示回滚。
另一个信号是用Spring的外部交易提供一个PlatformTransactionManager实现作为当前运营的背景。
如果在框架发送或接收消息时已有交易正在进行中,且频道交易旗帜是true,消息事务的提交或回滚会被推迟到当前事务结束。
如果频道交易旗帜是false,消息作不涉及交易语义(自动自动确认)。
这频道交易flag 是一个配置时间设置。
当创建AMQP组件时,通常在应用启动时,声明并处理一次。
外部事务原则上更具动态性,因为系统在运行时响应当前线程状态。
然而,在实际作中,当事务以声明式方式叠加到应用时,这通常也是一种配置设置。
对于同步使用场景兔子模板,外部事务由调用者根据喜好以声明式或命令式方式提供(通常的Spring事务模型)。
以下示例展示了声明式方法(通常因非侵入性而被推荐),模板配置为channelTransacted=真:
@Transactional
public void doSomething() {
String incoming = rabbitTemplate.receiveAndConvert();
// do some more database processing...
String outgoing = processInDatabaseAndExtractReply(incoming);
rabbitTemplate.convertAndSend(outgoing);
}
在上述例子中,a字符串payload 被接收、转换并发送为@Transactional.
如果数据库处理失败且有异常,接收消息将返回给代理,发送消息则不会发送。
这适用于任何具有兔子模板在一系列事务方法中(除非,例如,渠道直接作以提前提交事务)。
对于异步应用场景SimpleMessageListenerContainer如果需要外部事务,容器在设置监听器时必须请求该事务。
为了表示需要外部事务,用户提供一个实现PlatformTransactionManager容器配置好后再发送。
以下示例展示了如何实现:
@Configuration
public class ExampleExternalTransactionAmqpConfiguration {
@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setTransactionManager(transactionManager());
container.setChannelTransacted(true);
container.setQueueName("some.queue");
container.setMessageListener(exampleListener());
return container;
}
}
在前述示例中,事务管理器作为依赖从另一个豆定义注入(未显示)添加,且频道交易flag 也设置为true.
其效果是,如果监听者失败且有异常,交易会被回滚,消息也会返回给经纪人。
值得注意的是,如果事务未能提交(例如,因为
数据库约束错误或连接性问题),AMQP事务也会被回滚,消息返回给代理。
这有时被称为“最佳努力1阶段提交”,是可靠消息传递的强大模式。
如果频道交易旗帜被设置为false(默认)在前述示例中,外部事务仍会为监听者提供,但所有消息作都会自动被锁定,因此即使在业务作回滚时,消息作也会被提交。
条件回滚
在1.6.6版本之前,向容器的transactionAttribute而使用外部事务管理器(如JDBC)则无影响。
例外总是会回滚交易。
此外,在容器的建议链中使用交易建议时,条件回滚并不太有用,因为所有监听者例外都被包裹在ListenerExecutionFailedException.
第一个问题已经得到纠正,规则现在被正确应用。
此外,ListenerFailedRuleBasedTransactionAttribute现已提供。
它是 的子类规则基础交易属性,唯一的区别是它知道ListenerExecutionFailedException并以此类例外的原因为规则。
该事务属性可以直接在容器中使用,也可以通过交易建议使用。
以下示例使用了该规则:
@Bean
public AbstractMessageListenerContainer container() {
...
container.setTransactionManager(transactionManager);
RuleBasedTransactionAttribute transactionAttribute =
new ListenerFailedRuleBasedTransactionAttribute();
transactionAttribute.setRollbackRules(Collections.singletonList(
new NoRollbackRuleAttribute(DontRollBackException.class)));
container.setTransactionAttribute(transactionAttribute);
...
}
关于回滚接收消息的说明
AMQP交易仅适用于发送给经纪人的消息和自动确认。
因此,当Spring事务回滚且收到消息时,Spring AMQP不仅要回滚事务,还要手动拒绝消息(有点像nack,但规范中并非如此称呼)。
消息拒绝的作独立于事务,且依赖于defaultRequeueRejected性质(默认:true).
有关拒绝失败消息的更多信息,请参见“消息监听器与异步案例”。
有关RabbitMQ交易及其限制的更多信息,请参见RabbitMQ经纪人语义。
| 在RabbitMQ 2.7.0之前,这类消息(以及在通道关闭或中止时未锁定的消息)会被放在Rabbit代理队列的后方。 自2.7.0版本起,拒绝消息会被排到队列前端,方式类似于JMS的回滚消息。 |
此前,事务回滚时的消息重新排队在本地事务之间存在不一致,当事务管理器被提供了。
在前者情况下,正常的重新排队逻辑(AmqpRejectAndDontRequeueException或defaultRequeueRejected=false)应用(参见消息监听器与异步情况)。
使用事务管理器时,消息在回滚时无条件重新排队。
从2.0版本开始,行为保持一致,且在两种情况下都应用了正常的重新排队逻辑。
要恢复到之前的行为,可以设置容器的alwaysRequeueWithTxManagerRollback属性到true.
参见消息监听器容器配置。 |
用RabbitTransactionManager
这RabbitTransactionManager是在外部事务内部执行兔子作并同步的替代方案。
该事务管理器是PlatformTransactionManager接口,应与单一Rabbit一起使用连接工厂.
| 该策略无法提供 XA 事务——例如,为了在消息传递和数据库访问之间共享事务。 |
应用程序代码需要通过以下方式获取事务性Rabbit资源ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactory, boolean)而不是标准Connection.createChannel()随后频道创建时呼叫。
使用 Spring AMQP 时兔子模板它会自动检测线程绑定通道并自动参与其交易。
通过 Java 配置,您可以使用以下 bean 来设置新的 RabbitTransactionManager:
@Bean
public RabbitTransactionManager rabbitTransactionManager() {
return new RabbitTransactionManager(connectionFactory);
}
如果您偏好 XML 配置,可以在您的 XML 应用上下文文件中声明以下 bean:
<bean id="rabbitTxManager"
class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
事务同步
将RabbitMQ事务与其他事务(例如DBMS)同步,提供了“尽力而为一阶段提交”的语义。
RabbitMQ 事务在事务同步完成后阶段可能未能提交。
这会被春季-德克萨斯基础设施作为错误,但调用代码不投出例外。
从2.3.10版本开始,你可以调用ConnectionUtils.checkAfterCompletion()在交易提交到处理该事务的同一线程后。
如果没有例外,它会直接返回;否则它会抛出完成失败后例外该系统将具有表示完成同步状态的属性。
通过呼叫来启用此功能ConnectionFactoryUtils.enableAfterCompletionFailureCapture(true);这是一个全局标志,适用于所有线程。