@RabbitListener批处理

接收一批消息时,容器通常执行解批处理,监听器一次调用一条消息。 从2.2版本开始,你可以配置监听器容器工厂和监听器,让整个批次一次性接收,只需设置工厂的批处理听器性质,并将方法有效载荷参数设为列表收集:spring-doc.cadn.net.cn

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory());
    factory.setBatchListener(true);
    return factory;
}

@RabbitListener(queues = "batch.1")
public void listen1(List<Thing> in) {
    ...
}

// or

@RabbitListener(queues = "batch.2")
public void listen2(List<Message<Thing>> in) {
    ...
}

设置批处理听器属性 变为 true 会自动关闭deBatchingEnabled工厂创建的容器属性(除非)consumerBatchEnabledtrue- 见下文)。实际上,清批处理从容器转移到监听器适配器,适配器生成传递给监听器的列表。spring-doc.cadn.net.cn

批处理启用的工厂不能与多方法监听器一起使用。spring-doc.cadn.net.cn

我也是从2.2版本开始的。当一次只收到一个批次消息时,最后一条消息包含一个布尔头,设置为true. 该头部可以通过添加@Header(AmqpHeaders.LAST_IN_BATCH)你的监听方法的布尔 last' 参数。 头部映射于MessageProperties.isLastInBatch(). 另外AmqpHeaders.BATCH_SIZE在每个消息片段中填充了批次大小。spring-doc.cadn.net.cn

此外,还有一处新房产consumerBatchEnabled已被添加到SimpleMessageListenerContainer. 当满足此条件时,容器将生成一批消息,最多批量大小;部分批次交付 如果收到超时时间过了,没有新消息到达。 如果收到生产者创建的批次,会被淘汰并加入消费者端批次;因此,实际送达的消息数量可能超过。批量大小,表示从中介处收到的消息数量。deBatchingEnabled当 必须为真consumerBatchEnabled是真的;集装箱工厂将执行这一要求。spring-doc.cadn.net.cn

@Bean
public SimpleRabbitListenerContainerFactory consumerBatchContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(rabbitConnectionFactory());
    factory.setConsumerTagStrategy(consumerTagStrategy());
    factory.setBatchListener(true); // configures a BatchMessageListenerAdapter
    factory.setBatchSize(2);
    factory.setConsumerBatchEnabled(true);
    return factory;
}

使用consumerBatchEnabled@RabbitListener:spring-doc.cadn.net.cn

@RabbitListener(queues = "batch.1", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch1(List<Message> amqpMessages) {
    ...
}

@RabbitListener(queues = "batch.2", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch2(List<org.springframework.messaging.Message<Invoice>> messages) {
    ...
}

@RabbitListener(queues = "batch.3", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch3(List<Invoice> strings) {
    ...
}
  • 第一个被叫做原始、未转换的org.springframework.amqp.core.Message收到。spring-doc.cadn.net.cn

  • 第二种称为org.springframework.messaging.Message<?>s,包含已转换的有效载荷和映射的头部/属性。spring-doc.cadn.net.cn

  • 第三个是用已转换的有效载荷调用的,无法访问头部或属性。spring-doc.cadn.net.cn

你也可以添加一个渠道参数,通常用于使用手动ACK模式。 这对第三个例子帮助不大,因为你无法访问delivery_tag财产。spring-doc.cadn.net.cn

Spring Boot 提供了consumerBatchEnabled批量大小,但不包括批处理听器. 从3.0版本开始,设置consumerBatchEnabledtrue集装箱工厂还设有布景批处理听器true. 什么时候consumerBatchEnabledtrue,监听者必须是批次监听者。spring-doc.cadn.net.cn

从3.0版本开始,监听器方法可以消耗收藏<?>名单<?>.spring-doc.cadn.net.cn

批处理模式下的监听器不支持回复,因为批处理中的消息与单次回复之间可能没有相关性。 批处理监听器仍然支持异步返回类型