|
对于最新稳定版本,请使用Spring AMQP 4.0.0! |
请求/回复消息
这Amqp模板还提供多种发送与接收接受之前描述的单向发送作相同参数选项的方法(交换,路由键和消息).
这些方法在请求-回复场景中非常有用,因为它们处理了必要的配置回复发送前的属性,并且可以在内部为此目的创建的排他队列中监听回复消息。
类似的请求-回复方法也存在于消息转换器同时应用于请求和回复。
这些方法命名为转发与接收. 参见JavadocAmqp模板更多细节。
从版本 1.5.0 开始,每个发送与接收方法变体有一个超载版本,其相关数据. 结合正确配置的连接工厂,这使得发送端能够接收发布确认。参见相关发布确认与返回,以及Javadoc for兔子行动更多信息请见。
从2.0版本开始,这些方法有多种变体(convertSendAndReceiveAsType) 并需要额外的参数化类型引用参数用于转换复返回类型。模板必须配置为智能消息转换器.
看从消息跟兔子模板更多信息请见。
从2.1版本开始,你可以配置兔子模板其中noLocalReplyConsumer控制 a 的选项无本地标记回复消费者。 这是false默认。
回复暂停
默认情况下,发送和接收方法在五秒后超时并返回空。您可以通过设置回复Timeout财产。 从1.5版本开始,如果你设置命令的属性到true(或者说强制表达式评估为true对于特定消息),如果消息无法送达队列,则AmqpMessageReturnedException被抛弃。该例外有返回消息,回复代码和回复正文性质,以及交换和路由键用于发送。
此功能使用发布商退货。您可以通过设置来启用出版商回归自true在缓存连接工厂(参见出版商确认和退货)此外,你必须没有注册过自己的回调其中兔子模板. |
从版本 2.1.2 开始,a回复TimedOut新增了方法,允许子类被告知超时,以便清理任何保留状态。
从2.0.11和2.1.3版本开始,使用默认版本时DirectReplyToMessageListenerContainer,你可以通过设置模板的回复错误处理财产。 该错误处理程序用于任何失败的送达,如延迟回复和未包含关联头的消息。传递的异常是ListenerExecutionFailedException,其失败消息财产。
RabbitMQ 直接回复
从3.4.0版本开始,RabbitMQ服务器支持直接回复。这消除了固定回复队列的主要原因(以避免为每个请求创建临时队列)。从Spring AMQP 1.4.1版本开始,默认使用直接回复(如果服务器支持),而不是创建临时回复队列。当不成立时回复队列被提供(或用一个名称amq.rabbitmq.reply-to),兔子模板自动检测是否支持直接回复,并要么使用,要么退回使用临时回复队列。使用直接回复时,一个回复监听器不要求也不应配置。 |
回复监听器仍然支持指定队列(除amq.rabbitmq.reply-to),允许控制回复并发等。
从1.6版本开始,如果你希望为每个回复使用临时的、独占的自动删除队列useTemporaryReplyQueues属性到true. 如果你设置 a回复地址.
你可以通过子类来改变是否使用直接回复的标准兔子模板以及覆盖useDirectReplyTo()以检查不同的条件。该方法仅在发送第一个请求时调用一次。
在2.0版本之前,兔子模板为每个请求创建一个新的消费者,并在收到回复(或超时)后取消该消费者。现在模板使用DirectReplyToMessageListenerContainer相反,让消费者被重复使用。模板仍然负责关联回复,因此不会有迟到回复发送到其他发件人的风险。如果你想恢复到之前的行为,可以设置useDirectReplyToContainer (直接回复容器当使用 XML 配置时)属性设置为 false。
这异步兔子模板没有这样的选项。它总是使用直接回复容器用于直接回复时的回复。
从版本 2.3.7 开始,模板新增了一个属性useChannelForCorrelation. 当这时true服务器无需将请求消息头中的相关ID复制到回复消息中。相反,发送请求的信道用于将回复与请求相关联。
消息与回复队列的相关性
使用固定的回复队列时(除amq.rabbitmq.reply-to),你必须提供相关数据,以便回复能与请求进行关联。
参见RabbitMQ远程过程调用(RPC)。
默认情况下,标准关联Id属性用于保存相关性数据。
然而,如果你想使用自定义属性来存储相关性数据,可以设置相关密钥<rabbit-template/> 上的属性。
显式地将属性设置为关联Id等同于省略属性。
客户端和服务器必须使用相同的相关数据头部。
Spring AMQP 1.1 版本使用了一个名为spring_reply_correlation为了这些数据。
如果你想在当前版本中恢复到这种行为(可能是为了保持与使用1.1的其他应用程序的兼容性),你必须将属性设置为spring_reply_correlation. |
默认情况下,模板会生成自己的相关ID(忽略用户提供的值)。
如果你想使用自己的相关ID,请设置兔子模板实例用户关联ID属性到true.
| 相关ID必须是唯一的,以避免请求返回错误的回复。 |
回复监听器容器
在使用 3.4.0 之前的 RabbitMQ 版本时,每个回复都会使用新的临时队列。
不过,可以在模板上配置一个单一的回复队列,这样效率更高,也能在该队列上设置参数。
但在这种情况下,你还必须提供<回复监听者/>子元素。
该元素为回复队列提供了一个监听器容器,模板即为监听器。
<监听器容器/>允许的所有消息监听器容器配置属性在该元素上都被允许,除了连接工厂和消息转换器这些都继承自模板的配置。
如果你运行多个应用程序实例或使用多个实例兔子模板你必须为每个实例使用独特的回复队列。
RabbitMQ无法从队列中选择消息,因此如果它们都使用同一个队列,每个实例都会竞争回复,而不一定会收到自己的回复。 |
以下示例定义了一个带有连接工厂的兔子模板:
<rabbit:template id="amqpTemplate"
connection-factory="connectionFactory"
reply-queue="replies"
reply-address="replyEx/routeReply">
<rabbit:reply-listener/>
</rabbit:template>
虽然容器和模板共享一个连接工厂,但它们不共享通道。 因此,请求和回复不会在同一事务内执行(如果是事务易)。
在1.5.0版本之前,回复地址属性不可用。
回复总是通过默认交换和回复队列命名为路由密钥。
这仍然是默认设置,但你现在可以指定新的回复地址属性。
这回复地址可以包含一个形式为<exchange>/<routingKey>回复路由到指定的交换机,并路由到与路由密钥绑定的队列。
这回复地址具有优先级回复队列.
只有回复地址正在使用,<回复听众>必须配置为独立的<监听器容器>元件。
这回复地址和回复队列(或队列属性<监听器容器>) 必须逻辑上指向同一个队列。 |
在这种配置下,一个SimpleListenerContainer用于接收回复,其中兔子模板即消息监听器.
在定义模板时<兔子:模板/>命名空间元素,如前例所示,解析器将模板中的容器和线定义为监听器。
当模板不使用固定回复队列(或使用直接回复——参见RabbitMQ直接回复),不需要监听器容器。
直接回复是使用 RabbitMQ 3.4.0 或更高版本时的首选机制。 |
如果你定义了你的兔子模板作为<豆/>或者使用一个@Configuration类定义为@Bean或者当你用程序创建模板时,你需要自己定义并接线回复监听器容器。
如果未能做到这一点,模板永远无法接收回复,最终超时,返回 null 作为调用发送与接收方法。
从1.5版本开始,兔子模板检测是否已经
配置为消息监听器等待回复。
如果没有,则尝试发送和接收带有回复地址的消息
失败非法州例外(因为回复从未收到过)。
此外,如果是简单的回复地址(队列名称)被使用时,回复监听器容器会验证它正在监听
进入同名队列。
如果回复地址是交换和路由密钥,并且写入了调试日志消息,则无法进行此检查。
在自己制作回复监听器和模板时,确保模板是回复地址还有集装箱的队列(或队列名称属性指向相同的队列。
模板将回复地址插入到外发消息中回复财产。 |
以下列表展示了如何手动接线豆子的示例:
<bean id="amqpTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<constructor-arg ref="connectionFactory" />
<property name="exchange" value="foo.exchange" />
<property name="routingKey" value="foo" />
<property name="replyQueue" ref="replyQ" />
<property name="replyTimeout" value="600000" />
<property name="useDirectReplyToContainer" value="false" />
</bean>
<bean class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
<constructor-arg ref="connectionFactory" />
<property name="queues" ref="replyQ" />
<property name="messageListener" ref="amqpTemplate" />
</bean>
<rabbit:queue id="replyQ" name="my.reply.queue" />
@Bean
public RabbitTemplate amqpTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMessageConverter(msgConv());
rabbitTemplate.setReplyAddress(replyQueue().getName());
rabbitTemplate.setReplyTimeout(60000);
rabbitTemplate.setUseDirectReplyToContainer(false);
return rabbitTemplate;
}
@Bean
public SimpleMessageListenerContainer replyListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueues(replyQueue());
container.setMessageListener(amqpTemplate());
return container;
}
@Bean
public Queue replyQueue() {
return new Queue("my.reply.queue");
}
一个完整的示例兔子模板本测试用例展示了带有固定回复队列的有线结构,以及一个“远程”监听器容器处理请求并返回回复。
当回复超时时 (回复Timeout),发送与接收()方法返回空。 |
在1.3.6版本之前,超时消息的迟到回复只会被记录。
如果收到迟到回复,则被拒绝(模板抛出AmqpRejectAndDontRequeueException).
如果回复队列配置为将被拒绝的消息发送到死信交换,则可以检索回复以便后续分析。
为此,将队列绑定到配置的死信交换,路由密钥等于响应队列的名称。
有关配置死信的更多信息,请参阅RabbitMQ死信文档。
你也可以看看固定回复队列死信测试举个例子。
异步兔子模板
1.6 版本引入了异步兔子模板.
这与此类似发送与接收(和转发与接收)方法,适用于Amqp模板.
然而,他们没有阻挡,而是返回完成未来.
这发送与接收方法返回兔子信息未来.
这转发与接收方法返回兔子转换器未来.
你可以同步检索结果,之后通过调用get()在未来,或者你可以对结果进行异步调用。
以下列表展示了两种方法:
@Autowired
private AsyncRabbitTemplate template;
...
public void doSomeWorkAndGetResultLater() {
...
CompletableFuture<String> future = this.template.convertSendAndReceive("foo");
// do some more work
String reply = null;
try {
reply = future.get(10, TimeUnit.SECONDS);
}
catch (ExecutionException e) {
...
}
...
}
public void doSomeWorkAndGetResultAsync() {
...
RabbitConverterFuture<String> future = this.template.convertSendAndReceive("foo");
future.whenComplete((result, ex) -> {
if (ex == null) {
// success
}
else {
// failure
}
});
...
}
如果命令的当 设置且消息无法传递时,未来会抛出执行异常原因为AmqpMessageReturnedException,封装了返回的消息及其相关信息。
如果enableConfirms是设定的,未来有一个性质,称为确认,而该 本身是CompletableFuture<Boolean>跟true这表明出版成功。
如果确认未来是false这兔子未来还有一个性质称为nackCause(无名之因),如果有,包含了失败的原因。
| 如果在回复之后收到,发布者确认会被丢弃,因为回复意味着成功发布。 |
你可以设置收到超时模板上的属性用于超时回复(默认为30000- 30秒)。
如果发生超时,未来将完成Amqp回复TimeoutException.
模板实现SmartLifecycle.
在有待处理回复时停止模板会导致待处理前途实例将被取消。
从2.0版本开始,异步模板支持直接回复,取代配置后的回复队列。 要启用此功能,请使用以下构造函数之一:
public AsyncRabbitTemplate(ConnectionFactory connectionFactory, String exchange, String routingKey)
public AsyncRabbitTemplate(RabbitTemplate template)
请参见RabbitMQ直接回复,以使用同步的直接回复兔子模板.
2.0版本引入了这些方法的变体(convertSendAndReceiveAsType) 并需要额外的参数化类型引用参数用于转换复返回类型。
你必须配置底层兔子模板其中智能消息转换器.
看从消息跟兔子模板更多信息请见。
从3.0版本开始,异步兔子模板方法现已返回完成未来s 代替可听未来s. |