|
对于最新稳定版本,请使用Spring AMQP 4.0.0! |
@RabbitListener批处理
接收一批消息时,容器通常执行解批处理,监听器一次调用一条消息。
从2.2版本开始,你可以配置监听器容器工厂和监听器,让整个批次一次性接收,只需设置工厂的批处理听器性质,并将方法有效载荷参数设为列表或收集:
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setBatchListener(true);
return factory;
}
@RabbitListener(queues = "batch.1")
public void listen1(List<Thing> in) {
...
}
// or
@RabbitListener(queues = "batch.2")
public void listen2(List<Message<Thing>> in) {
...
}
设置批处理听器属性 变为 true 会自动关闭deBatchingEnabled工厂创建的容器属性(除非)consumerBatchEnabled是true- 见下文)。实际上,清批处理从容器转移到监听器适配器,适配器生成传递给监听器的列表。
批处理启用的工厂不能与多方法监听器一起使用。
我也是从2.2版本开始的。当一次只收到一个批次消息时,最后一条消息包含一个布尔头,设置为true.
该头部可以通过添加@Header(AmqpHeaders.LAST_IN_BATCH)你的监听方法的布尔 last' 参数。
头部映射于MessageProperties.isLastInBatch().
另外AmqpHeaders.BATCH_SIZE在每个消息片段中填充了批次大小。
此外,还有一处新房产consumerBatchEnabled已被添加到SimpleMessageListenerContainer.
当满足此条件时,容器将生成一批消息,最多批量大小;部分批次交付 如果收到超时时间过了,没有新消息到达。
如果收到生产者创建的批次,会被淘汰并加入消费者端批次;因此,实际送达的消息数量可能超过。批量大小,表示从中介处收到的消息数量。deBatchingEnabled当 必须为真consumerBatchEnabled是真的;集装箱工厂将执行这一要求。
@Bean
public SimpleRabbitListenerContainerFactory consumerBatchContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(rabbitConnectionFactory());
factory.setConsumerTagStrategy(consumerTagStrategy());
factory.setBatchListener(true); // configures a BatchMessageListenerAdapter
factory.setBatchSize(2);
factory.setConsumerBatchEnabled(true);
return factory;
}
使用consumerBatchEnabled跟@RabbitListener:
@RabbitListener(queues = "batch.1", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch1(List<Message> amqpMessages) {
...
}
@RabbitListener(queues = "batch.2", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch2(List<org.springframework.messaging.Message<Invoice>> messages) {
...
}
@RabbitListener(queues = "batch.3", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch3(List<Invoice> strings) {
...
}
-
第一个被叫做原始、未转换的
org.springframework.amqp.core.Message收到。 -
第二种称为
org.springframework.messaging.Message<?>s,包含已转换的有效载荷和映射的头部/属性。 -
第三个是用已转换的有效载荷调用的,无法访问头部或属性。
你也可以添加一个渠道参数,通常用于使用手动ACK模式。
这对第三个例子帮助不大,因为你无法访问delivery_tag财产。
Spring Boot 提供了consumerBatchEnabled和批量大小,但不包括批处理听器.
从3.0版本开始,设置consumerBatchEnabled自true集装箱工厂还设有布景批处理听器自true.
什么时候consumerBatchEnabled是true,监听者必须是批次监听者。
从3.0版本开始,监听器方法可以消耗收藏<?>或名单<?>.
| 批处理模式下的监听器不支持回复,因为批处理中的消息与单次回复之间可能没有相关性。 批处理监听器仍然支持异步返回类型。 |