|
对于最新稳定版本,请使用Spring AMQP 4.0.0! |
RabbitMQ AMQP 1.0 支持
4.0版本引入Spring-rabbitmq-client用于 RabbitMQ 上 AMQP 1.0 协议支持的模块。
该产物基于 com.rabbitmq.client:amqp-client 库,因此只能与 RabbitMQ 及其 AMQP 1.0 协议支持配合使用。 它不能用于任何任意的AMQP 1.0经纪商。 为此,目前推荐使用 JMS 桥接器及其相应的 Spring JMS 集成。
必须将该依赖添加到项目中,才能与 RabbitMQ AMQP 1.0 支持交互:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbitmq-client</artifactId>
<version>3.2.8</version>
</dependency>
compile 'org.springframework.amqp:spring-rabbitmq-client:3.2.8'
这春兔(针对AMQP 0.9.1协议)作为传递依赖,用于在这个新客户端中重用某些常见API,例如例外、@RabbitListener支持。
目标项目中不必同时使用这两个功能,但 RabbitMQ 允许 AMQP 0.9.1 和 1.0 同时存在。
有关RabbitMQ AMQP 1.0 Java客户端的更多信息,请参阅其文档。
RabbitMQ AMQP 1.0 环境
这com.rabbitmq.client.amqp.Environment是项目中连接管理和其他常见设置必须添加的第一个内容。
它是节点或节点集群的入口点。
环境允许建立联系。
它可以包含连接间共享的基础设施相关配置设置,例如线程池、度量和/或观察:
@Bean
Environment environment() {
return new AmqpEnvironmentBuilder()
.connectionSettings()
.port(5672)
.environmentBuilder()
.build();
}
一样环境实例 可用于连接不同的 RabbitMQ 代理,因此必须在特定连接上提供连接设置。
见下文。
AMQP连接工厂
这org.springframework.amqp.rabbitmq.client.AmqpConnectionFactory引入抽象以管理com.rabbitmq.client.amqp.Connection.
不要把它和org.springframework.amqp.rabbit.connection.Connection.ConnectionFactory仅适用于AMQP 0.9.1协议。
这SingleAmqpConnectionFactory实现方式是管理一个连接及其设置。
一样连接可以由许多生产者、消费者和管理层共享。
复用由AMQP客户端库中AMQP 1.0协议链路抽象实现的内部处理。
这连接具有恢复能力,同时也处理拓扑。
大多数情况下,这些材料刚好足够把这颗豆子加入到项目中:
@Bean
AmqpConnectionFactory connectionFactory(Environment environment) {
return new SingleAmqpConnectionFactory(environment);
}
看SingleAmqpConnectionFactory所有连接特定设置的设置者。
RabbitMQ 拓扑管理
对于拓扑管理(交换、队列和绑定)从应用角度来看,兔子安克帕管理员存在,是现有的实现AmqpAdmin接口:
@Bean
RabbitAmqpAdmin admin(AmqpConnectionFactory connectionFactory) {
return new RabbitAmqpAdmin(connectionFactory);
}
豆子定义相同交换,队列,捆绑和可申报者必须使用配置代理中描述的实例来管理拓扑。
这兔子管理员从春兔也可以这样做,但这在 AMQP 0.9.1 连接时会发生,因为兔子安克帕管理员基于AMQP 1.0连接,拓扑恢复工作由此顺利完成,同时发布者和消费者也得以恢复。
这兔子安克帕管理员在其中执行相应的豆子扫描开始()生命周期回溯。
这initialize()以及所有其他RabbitMQ实体管理方法都可以在运行时手动调用。
内部兔子安克帕管理员使用com.rabbitmq.client.amqp.Connection.management()API用于执行相应的拓扑作。
兔子Amqp模板
这兔子Amqp模板是AsyncAmqpTemplate并采用AMQP 1.0协议执行各种发送/接收作。
需要Amqp连接工厂并且可以通过一些默认配置进行配置。
便com.rabbitmq.client:AMQP-client图书馆自带一个com.rabbitmq.client.amqp.Message这兔子Amqp模板仍然会暴露基于已知 的 APIorg.springframework.amqp.core.Message所有辅助职业像消息属性和消息转换器抽象化。
转换/转换com.rabbitmq.client.amqp.Message是在内部完成的兔子Amqp模板.
所有方法都返回完成未来最终获得作结果。
使用纯对象的作需要消息正体转换和简易消息转换器默认使用。
有关转换的更多信息,请参见消息转换器。
通常,只需一颗这样的豆子,就足以完成所有可能的模板模式作:
@Bean
RabbitAmqpTemplate rabbitTemplate(AmqpConnectionFactory connectionFactory) {
return new RabbitAmqpTemplate(connectionFactory);
}
它可以配置为某个默认的交换和路由密钥,或者仅仅排队。
这兔子Amqp模板设置一个默认队列用于接收作,另一个默认队列用于请求-回复作,客户端若不存在,则为请求创建临时队列。
以下是一些示例兔子Amqp模板操作:
@Bean
DirectExchange e1() {
return new DirectExchange("e1");
}
@Bean
Queue q1() {
return QueueBuilder.durable("q1").deadLetterExchange("dlx1").build();
}
@Bean
Binding b1() {
return BindingBuilder.bind(q1()).to(e1()).with("k1");
}
...
@Test
void defaultExchangeAndRoutingKey() {
this.rabbitAmqpTemplate.setExchange("e1");
this.rabbitAmqpTemplate.setRoutingKey("k1");
this.rabbitAmqpTemplate.setReceiveQueue("q1");
assertThat(this.rabbitAmqpTemplate.convertAndSend("test1"))
.succeedsWithin(Duration.ofSeconds(10));
assertThat(this.rabbitAmqpTemplate.receiveAndConvert())
.succeedsWithin(Duration.ofSeconds(10))
.isEqualTo("test1");
}
这里我们宣告了一个E1交换问1并用K1路由密钥。
然后我们使用一个默认设置兔子Amqp模板将消息发布到上述交换机,使用相应路由密钥并使用问1作为接收作的默认队列。
这些方法有超载变体,可以发送到特定的交换或队列(发送和接收)。
这receiveAndConvert()运算ParameterizedTypeReference<T>需要一个智能消息转换器注射到......兔子Amqp模板.
下一个示例演示和RPC实现兔子Amqp模板(假设与前述示例相同的RabbitMQ对象):
@Test
void verifyRpc() {
String testRequest = "rpc-request";
String testReply = "rpc-reply";
CompletableFuture<Object> rpcClientResult = this.template.convertSendAndReceive("e1", "k1", testRequest);
AtomicReference<String> receivedRequest = new AtomicReference<>();
CompletableFuture<Boolean> rpcServerResult =
this.rabbitAmqpTemplate.<String, String>receiveAndReply("q1",
payload -> {
receivedRequest.set(payload);
return testReply;
});
assertThat(rpcServerResult).succeedsWithin(Duration.ofSeconds(10)).isEqualTo(true);
assertThat(rpcClientResult).succeedsWithin(Duration.ofSeconds(10)).isEqualTo(testReply);
assertThat(receivedRequest.get()).isEqualTo(testRequest);
}
相关性和回复队列由内部管理。
服务器端可以通过以下方式实现@RabbitListener下面将介绍POJO方法。
RabbitMQ AMQP 1.0 消费者版
与许多面向消费者端的消息实现一样,Spring-rabbitmq-client模块附带RabbitAmqpListenerContainer这本质上是 well-known 的实现MessageListenerContainer.
它的作用和DirectMessageListenerContainer但支持 RabbitMQ AMQP 1.0。
需要Amqp连接工厂并且至少有一个队列可供消费。
另外,还有消息监听器(或针对AMQP 1.0兔子Amqp消息听者必须提供。
可以配置为autoSettle = false,其含义为确认模式。手册.
在这种情况下,消息提供给消息监听器在其消息属性一Amqp致谢回调以考虑目标逻辑。
这兔子Amqp消息听者有合同com.rabbitmq.client:AMQP-client抽象:
/**
* Process an AMQP message.
* @param message the message to process.
* @param context the consumer context to settle message.
* Null if container is configured for {@code autoSettle}.
*/
void onAmqpMessage(Message message, Consumer.Context context);
其中第一个论元是本来的接收com.rabbitmq.client.amqp.Message和上下文是消息结算的本地回调,类似于上述Amqp致谢抽象化。
这兔子Amqp消息听者能够批量处理和结算消息,满足条件批量大小提供选项。
为此MessageListener.onMessageBatch()合同必须被执行。
这batchReceiveDuration用于为非完整批次安排强制释放,以避免内存和消费信用的耗尽。
通常,兔子Amqp消息听者类不直接用于目标项目,POJO 方法注释配置通过以下方式进行@RabbitListener被选用于声明式消费者配置。
这RabbitAmqpListenerContainerFactory必须注册在RabbitListenerAnnotationBeanPostProcessor.DEFAULT_RABBIT_LISTENER_CONTAINER_FACTORY_BEAN_NAME和@RabbitListener注释过程将被注册兔子Amqp消息听者实例进入RabbitListenerEndpointRegistry.
目标 POJO 方法调用由特定RabbitAmqpMessageListener适配器实现,它扩展了MessagingMessageListenerAdapter并且重用了大量功能,包括请求-回复场景(无论是否异步)。
因此,注释驱动监听端点中描述的所有概念都被应用到了兔子Amqp消息听者也。
除了传统的消息传递有效载荷和头这@RabbitListenerPOJO方法契约可以采用以下参数:
-
com.rabbitmq.client.amqp.Message- 原生AMQP 1.0消息,无需任何转换; -
org.springframework.amqp.core.Message- Spring AMQP 消息抽象,作为本地 AMQP 1.0 消息的转换结果; -
org.springframework.messaging.Message- Spring消息的抽象,作为Spring AMQP消息的转换结果; -
消费者。背景- RabbitMQ AMQP 客户端消费者结算 API; -
org.springframework.amqp.core.AmqpAcknowledgegment- 春季AMQP确认摘要:代表们消费者。背景.
以下示例展示了一个简单的@RabbitListener对于RabbitMQ AMQP 1.0与人工结算的交互:
@Bean(RabbitListenerAnnotationBeanPostProcessor.DEFAULT_RABBIT_LISTENER_CONTAINER_FACTORY_BEAN_NAME)
RabbitAmqpListenerContainerFactory rabbitAmqpListenerContainerFactory(AmqpConnectionFactory connectionFactory) {
return new RabbitAmqpListenerContainerFactory(connectionFactory);
}
final List<String> received = Collections.synchronizedList(new ArrayList<>());
CountDownLatch consumeIsDone = new CountDownLatch(11);
@RabbitListener(queues = {"q1", "q2"},
ackMode = "#{T(org.springframework.amqp.core.AcknowledgeMode).MANUAL}",
concurrency = "2",
id = "testAmqpListener")
void processQ1AndQ2Data(String data, AmqpAcknowledgment acknowledgment, Consumer.Context context) {
try {
if ("discard".equals(data)) {
if (!this.received.contains(data)) {
context.discard();
}
else {
throw new MessageConversionException("Test message is rejected");
}
}
else if ("requeue".equals(data) && !this.received.contains(data)) {
acknowledgment.acknowledge(AmqpAcknowledgment.Status.REQUEUE);
}
else {
acknowledgment.acknowledge();
}
this.received.add(data);
}
finally {
this.consumeIsDone.countDown();
}
}