该版本仍在开发中,尚未被视为稳定。请使用最新的稳定版本,使用 Spring AMQP 4.0.0!spring-doc.cadn.net.cn

RabbitMQ AMQP 1.0 支持

4.0版本引入Spring-rabbitmq-client用于 RabbitMQ 上 AMQP 1.0 协议支持的模块。spring-doc.cadn.net.cn

该产物基于 com.rabbitmq.client:amqp-client 库,因此只能与 RabbitMQ 及其 AMQP 1.0 协议支持配合使用。 它不能用于任何任意的AMQP 1.0经纪商。 为此,目前推荐使用 JMS 桥接器及其相应的 Spring JMS 集成。spring-doc.cadn.net.cn

必须将该依赖添加到项目中,才能与 RabbitMQ AMQP 1.0 支持交互:spring-doc.cadn.net.cn

<dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-rabbitmq-client</artifactId>
  <version>4.0.1-SNAPSHOT</version>
</dependency>
compile 'org.springframework.amqp:spring-rabbitmq-client:4.0.1-SNAPSHOT'

春兔(针对AMQP 0.9.1协议)作为传递依赖,用于在这个新客户端中重用某些常见API,例如例外、@RabbitListener支持。 目标项目中不必同时使用这两个功能,但 RabbitMQ 允许 AMQP 0.9.1 和 1.0 同时存在。spring-doc.cadn.net.cn

有关RabbitMQ AMQP 1.0 Java客户端的更多信息,请参阅其文档spring-doc.cadn.net.cn

RabbitMQ AMQP 1.0 环境

com.rabbitmq.client.amqp.Environment是项目中连接管理和其他常见设置必须添加的第一个内容。 它是节点或节点集群的入口点。 环境允许建立联系。 它可以包含连接间共享的基础设施相关配置设置,例如线程池、度量和/或观察:spring-doc.cadn.net.cn

@Bean
Environment environment() {
    return new AmqpEnvironmentBuilder()
            .connectionSettings()
            .port(5672)
            .environmentBuilder()
            .build();
}

一样环境实例 可用于连接不同的 RabbitMQ 代理,因此必须在特定连接上提供连接设置。 见下文。spring-doc.cadn.net.cn

AMQP连接工厂

org.springframework.amqp.rabbitmq.client.AmqpConnectionFactory引入抽象以管理com.rabbitmq.client.amqp.Connection. 不要把它和org.springframework.amqp.rabbit.connection.Connection.ConnectionFactory仅适用于AMQP 0.9.1协议。 这SingleAmqpConnectionFactory实现方式是管理一个连接及其设置。 一样连接可以由许多生产者、消费者和管理层共享。 复用由AMQP客户端库中AMQP 1.0协议链路抽象实现的内部处理。 这连接具有恢复能力,同时也处理拓扑。spring-doc.cadn.net.cn

大多数情况下,这些材料刚好足够把这颗豆子加入到项目中:spring-doc.cadn.net.cn

@Bean
AmqpConnectionFactory connectionFactory(Environment environment) {
    return new SingleAmqpConnectionFactory(environment);
}

SingleAmqpConnectionFactory所有连接特定设置的设置者。spring-doc.cadn.net.cn

RabbitMQ 拓扑管理

对于拓扑管理(交换、队列和绑定)从应用角度来看,兔子安克帕管理员存在,是现有的实现AmqpAdmin接口:spring-doc.cadn.net.cn

@Bean
RabbitAmqpAdmin admin(AmqpConnectionFactory connectionFactory) {
    return new RabbitAmqpAdmin(connectionFactory);
}

豆子定义相同交换,队列,捆绑可申报者必须使用配置代理中描述的实例来管理拓扑。 这兔子管理员春兔也可以这样做,但这在 AMQP 0.9.1 连接时会发生,因为兔子安克帕管理员基于AMQP 1.0连接,拓扑恢复工作由此顺利完成,同时发布者和消费者也得以恢复。spring-doc.cadn.net.cn

兔子安克帕管理员在其中执行相应的豆子扫描开始()生命周期回溯。 这initialize()以及所有其他RabbitMQ实体管理方法都可以在运行时手动调用。 内部兔子安克帕管理员使用com.rabbitmq.client.amqp.Connection.management()API用于执行相应的拓扑作。spring-doc.cadn.net.cn

兔子Amqp模板

兔子Amqp模板AsyncAmqpTemplate并采用AMQP 1.0协议执行各种发送/接收作。 需要Amqp连接工厂并且可以通过一些默认配置进行配置。 便com.rabbitmq.client:AMQP-client图书馆自带一个com.rabbitmq.client.amqp.Message兔子Amqp模板仍然会暴露基于已知 的 APIorg.springframework.amqp.core.Message所有辅助职业像消息属性消息转换器抽象化。 转换/转换com.rabbitmq.client.amqp.Message是在内部完成的兔子Amqp模板. 所有方法都返回完成未来最终获得作结果。 使用纯对象的作需要消息正体转换和简易消息转换器默认使用。 有关转换的更多信息,请参见消息转换器spring-doc.cadn.net.cn

通常,只需一颗这样的豆子,就足以完成所有可能的模板模式作:spring-doc.cadn.net.cn

@Bean
RabbitAmqpTemplate rabbitTemplate(AmqpConnectionFactory connectionFactory) {
    return new RabbitAmqpTemplate(connectionFactory);
}

它可以配置为某个默认的交换和路由密钥,或者仅仅排队。 这兔子Amqp模板设置一个默认队列用于接收作,另一个默认队列用于请求-回复作,客户端若不存在,则为请求创建临时队列。spring-doc.cadn.net.cn

以下是一些示例兔子Amqp模板操作:spring-doc.cadn.net.cn

@Bean
DirectExchange e1() {
    return new DirectExchange("e1");
}

@Bean
Queue q1() {
    return QueueBuilder.durable("q1").deadLetterExchange("dlx1").build();
}

@Bean
Binding b1() {
    return BindingBuilder.bind(q1()).to(e1()).with("k1");
}

...

@Test
void defaultExchangeAndRoutingKey() {
    this.rabbitAmqpTemplate.setExchange("e1");
    this.rabbitAmqpTemplate.setRoutingKey("k1");
	this.rabbitAmqpTemplate.setReceiveQueue("q1");

    assertThat(this.rabbitAmqpTemplate.convertAndSend("test1"))
            .succeedsWithin(Duration.ofSeconds(10));

    assertThat(this.rabbitAmqpTemplate.receiveAndConvert())
            .succeedsWithin(Duration.ofSeconds(10))
            .isEqualTo("test1");
}

这里我们宣告了一个E1交换问1并用K1路由密钥。 然后我们使用一个默认设置兔子Amqp模板将消息发布到上述交换机,使用相应路由密钥并使用问1作为接收作的默认队列。 这些方法有超载变体,可以发送到特定的交换或队列(发送和接收)。 这receiveAndConvert()运算ParameterizedTypeReference<T>需要一个智能消息转换器注射到......兔子Amqp模板.spring-doc.cadn.net.cn

下一个示例演示和RPC实现兔子Amqp模板(假设与前述示例相同的RabbitMQ对象):spring-doc.cadn.net.cn

@Test
void verifyRpc() {
    String testRequest = "rpc-request";
    String testReply = "rpc-reply";

    CompletableFuture<Object> rpcClientResult = this.template.convertSendAndReceive("e1", "k1", testRequest);

    AtomicReference<String> receivedRequest = new AtomicReference<>();
    CompletableFuture<Boolean> rpcServerResult =
            this.rabbitAmqpTemplate.<String, String>receiveAndReply("q1",
                     payload -> {
                         receivedRequest.set(payload);
                         return testReply;
                     });

    assertThat(rpcServerResult).succeedsWithin(Duration.ofSeconds(10)).isEqualTo(true);
    assertThat(rpcClientResult).succeedsWithin(Duration.ofSeconds(10)).isEqualTo(testReply);
    assertThat(receivedRequest.get()).isEqualTo(testRequest);
}

相关性和回复队列由内部管理。 服务器端可以通过以下方式实现@RabbitListener下面将介绍POJO方法。spring-doc.cadn.net.cn

RabbitMQ AMQP 1.0 消费者版

与许多面向消费者端的消息实现一样,Spring-rabbitmq-client模块附带RabbitAmqpListenerContainer这本质上是 well-known 的实现MessageListenerContainer. 它的作用和DirectMessageListenerContainer但支持 RabbitMQ AMQP 1.0。 需要Amqp连接工厂并且至少有一个队列可供消费。 另外,还有消息监听器(或针对AMQP 1.0兔子Amqp消息听者必须提供。 可以配置为autoSettle = false,其含义为确认模式。手册. 在这种情况下,消息提供给消息监听器在其消息属性Amqp致谢回调以考虑目标逻辑。spring-doc.cadn.net.cn

兔子Amqp消息听者有合同com.rabbitmq.client:AMQP-client抽象:spring-doc.cadn.net.cn

/**
 * Process an AMQP message.
 * @param message the message to process.
 * @param context the consumer context to settle message.
 *                Null if container is configured for {@code autoSettle}.
 */
void onAmqpMessage(Message message, Consumer.Context context);

其中第一个论元是本来的接收com.rabbitmq.client.amqp.Message上下文是消息结算的本地回调,类似于上述Amqp致谢抽象化。spring-doc.cadn.net.cn

兔子Amqp消息听者能够批量处理和结算消息,满足条件批量大小提供选项。 为此MessageListener.onMessageBatch()合同必须被执行。 这batchReceiveDuration用于为非完整批次安排强制释放,以避免内存和消费信用的耗尽。spring-doc.cadn.net.cn

通常,兔子Amqp消息听者类不直接用于目标项目,POJO 方法注释配置通过以下方式进行@RabbitListener被选用于声明式消费者配置。 这RabbitAmqpListenerContainerFactory必须注册在RabbitListenerAnnotationBeanPostProcessor.DEFAULT_RABBIT_LISTENER_CONTAINER_FACTORY_BEAN_NAME@RabbitListener注释过程将被注册兔子Amqp消息听者实例进入RabbitListenerEndpointRegistry. 目标 POJO 方法调用由特定RabbitAmqpMessageListener适配器实现,它扩展了MessagingMessageListenerAdapter并且重用了大量功能,包括请求-回复场景(无论是否异步)。 因此,注释驱动监听端点中描述的所有概念都被应用到了兔子Amqp消息听者也。spring-doc.cadn.net.cn

除了传统的消息传递有效载荷@RabbitListenerPOJO方法契约可以采用以下参数:spring-doc.cadn.net.cn

  • com.rabbitmq.client.amqp.Message- 原生AMQP 1.0消息,无需任何转换;spring-doc.cadn.net.cn

  • org.springframework.amqp.core.Message- Spring AMQP 消息抽象,作为本地 AMQP 1.0 消息的转换结果;spring-doc.cadn.net.cn

  • org.springframework.messaging.Message- Spring消息的抽象,作为Spring AMQP消息的转换结果;spring-doc.cadn.net.cn

  • 消费者。背景- RabbitMQ AMQP 客户端消费者结算 API;spring-doc.cadn.net.cn

  • org.springframework.amqp.core.AmqpAcknowledgegment- 春季AMQP确认摘要:代表们消费者。背景.spring-doc.cadn.net.cn

以下示例展示了一个简单的@RabbitListener对于RabbitMQ AMQP 1.0与人工结算的交互:spring-doc.cadn.net.cn

@Bean(RabbitListenerAnnotationBeanPostProcessor.DEFAULT_RABBIT_LISTENER_CONTAINER_FACTORY_BEAN_NAME)
RabbitAmqpListenerContainerFactory rabbitAmqpListenerContainerFactory(AmqpConnectionFactory connectionFactory) {
    return new RabbitAmqpListenerContainerFactory(connectionFactory);
}

final List<String> received = Collections.synchronizedList(new ArrayList<>());

CountDownLatch consumeIsDone = new CountDownLatch(11);

@RabbitListener(queues = {"q1", "q2"},
        ackMode = "#{T(org.springframework.amqp.core.AcknowledgeMode).MANUAL}",
        concurrency = "2",
        id = "testAmqpListener")
void processQ1AndQ2Data(String data, AmqpAcknowledgment acknowledgment, Consumer.Context context) {
    try {
        if ("discard".equals(data)) {
            if (!this.received.contains(data)) {
                context.discard();
            }
            else {
                throw new MessageConversionException("Test message is rejected");
            }
        }
        else if ("requeue".equals(data) && !this.received.contains(data)) {
            acknowledgment.acknowledge(AmqpAcknowledgment.Status.REQUEUE);
        }
        else {
            acknowledgment.acknowledge();
        }
        this.received.add(data);
    }
    finally {
        this.consumeIsDone.countDown();
    }
}