|
对于最新稳定版本,请使用Spring AMQP 4.0.0! |
异步消费者
Spring AMQP 还通过使用@RabbitListener注释并提供了开放的基础设施,用于程序化注册端点。
这是设置异步消费者最方便的方法。
详情请参见注释驱动监听器端点。 |
|
预取默认值以前是1,这可能导致高效用户的利用率不足。 从2.0版本开始,默认预取值为250,这在大多数常见场景下应该能让消费者忙碌起来, 从而提升吞吐量。 然而,也存在预取值应较低的情形:
此外,在低流量消息和多消费者(包括单个监听器容器实例内的并发)情况下,你可能希望减少预取,以实现消息在消费者间更均匀分布。 参见消息监听器容器配置。 关于预取的更多背景,请参见这篇关于RabbitMQ中消费者利用率的文章和这篇关于排队理论的文章。 |
消息监听器
对于异步消息接收,一个专用组件(不是Amqp模板)参与其中。
该组件是消息——让人心旷神怡。
我们在本节后面会考虑容器及其属性。
不过,首先我们应该看看回调功能,因为你的应用代码就是在回调中与消息系统集成的。
回调有几种选项,首先是实现消息监听器以下列表展示了界面:
public interface MessageListener {
void onMessage(Message message);
}
如果你的回调逻辑因任何原因依赖于AMQP通道实例,你可以改用频道意识信息听众.
它看起来很像,但多了一个参数。
以下列表显示了频道意识信息听众界面定义:
public interface ChannelAwareMessageListener {
void onMessage(Message message, Channel channel) throws Exception;
}
在2.1版本中,该接口从包中迁移O.S.AMQP.RABBIT.CORE自O.S.AMQP.RABBIT.LISTENER.API. |
MessageListener适配器
如果你更希望在应用逻辑和消息 API 之间保持更严格的分离,可以依赖框架提供的适配器实现。 这通常被称为“消息驱动的POJO”支持。
1.5 版本引入了更灵活的 POJO 消息机制,称为@RabbitListener注解。
更多信息请参见注释驱动监听器端点。 |
使用适配器时,只需提供适配器本身应调用的实例参考。 以下示例展示了如何实现:
MessageListenerAdapter listener = new MessageListenerAdapter(somePojo);
listener.setDefaultListenerMethod("myMethod");
你可以对适配器进行子类化,并提供一个实现getListenerMethodName()根据消息动态选择不同的方法。
该方法有两个参数,原始信息和extractedMessage后者是任何转换的结果。
默认情况下,简易消息转换器已配置。
看简易消息转换器如需更多信息及其他可用的转换器信息。
从1.4.2版本开始,原始消息为消费者队列和consumerTag属性,可用于确定消息接收的队列。
从1.5版本开始,你可以配置一个将消费者队列或标签映射到方法名称,动态选择要调用的方法。
如果地图中没有条目,我们会退回到默认的监听器方法。
默认监听器方法(如果未设置)为handleMessage.
从2.0版本开始,方便功能接口已提供。
以下列表展示了 的定义功能接口:
@FunctionalInterface
public interface ReplyingMessageListener<T, R> {
R handleMessage(T t);
}
该接口通过使用 Java 8 lambda 便于方便配置适配器,如下示例所示:
new MessageListenerAdapter((ReplyingMessageListener<String, String>) data -> {
...
return result;
}));
从2.2版本开始,buildListenerArguments(对象)已弃用并新增buildListenerArguments(对象、通道、消息)后来引入了其中一个。
新方法帮助听众获得渠道和消息争论以做更多事情,比如呼叫channel.basicReject(长,布尔)在手动确认模式下。
以下列表展示了最基本的例子:
public class ExtendedListenerAdapter extends MessageListenerAdapter {
@Override
protected Object[] buildListenerArguments(Object extractedMessage, Channel channel, Message message) {
return new Object[]{extractedMessage, channel, message};
}
}
现在你可以配置扩展听筒适配器同理MessageListener适配器如果你需要接收“频道”和“消息”。
监听器的参数应设为buildListenerArguments(对象、通道、消息)如以下听众示例所示,已回归:
public void handleMessage(Object object, Channel channel, Message message) throws IOException {
...
}
容器
现在你已经看到了各种选项消息——听回电,我们可以把注意力转向集装箱。
基本上,容器负责“主动”的责任,使监听者回调保持被动。
容器是“生命周期”组件的一个例子。
它提供了启动和停止的方法。
在配置容器时,你本质上是在AMQP队列和消息监听器实例。
您必须提供参考文献连接工厂以及该监听者应从中接收消息的队列名称或队列实例。
在2.0版本之前,只有一个监听器容器,称为SimpleMessageListenerContainer.
现在还有第二个容器,称为DirectMessageListenerContainer.
容器之间的差异以及你在选择容器时可能应用的标准,详见《选择容器》一文。
以下列表展示了最基本的例子,其工作方法如下:SimpleMessageListenerContainer:
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory);
container.setQueueNames("some.queue");
container.setMessageListener(new MessageListenerAdapter(somePojo));
作为“主动”组件,最常见的做法是创建带有 bean 定义的监听器容器,以便其能在后台运行。 以下示例展示了 XML 的一种实现方式:
<rabbit:listener-container connection-factory="rabbitConnectionFactory">
<rabbit:listener queues="some.queue" ref="somePojo" method="handle"/>
</rabbit:listener-container>
以下列表展示了另一种使用 XML 实现的方法:
<rabbit:listener-container connection-factory="rabbitConnectionFactory" type="direct">
<rabbit:listener queues="some.queue" ref="somePojo" method="handle"/>
</rabbit:listener-container>
前面两个例子都构成了DirectMessageListenerContainer(注意类型属性 —— 默认为简单).
或者,你也可以更喜欢使用 Java 配置,它看起来与前面的代码片段相似:
@Configuration
public class ExampleAmqpConfiguration {
@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setQueueName("some.queue");
container.setMessageListener(exampleListener());
return container;
}
@Bean
public CachingConnectionFactory rabbitConnectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
@Bean
public MessageListener exampleListener() {
return new MessageListener() {
public void onMessage(Message message) {
System.out.println("received: " + message);
}
};
}
}
消费者优先
从RabbitMQ 3.2版本开始,代理现在支持消费者优先级(参见“使用RabbitMQ的消费者优先级”)。
通过设置x优先级关于消费者的争论。
这SimpleMessageListenerContainer现在支持设置消费者参数,如下示例所示:
container.setConsumerArguments(Collections.
<String, Object> singletonMap("x-priority", Integer.valueOf(10)));
为了方便起见,命名空间提供了优先权属性听者元素,如下例所示:
<rabbit:listener-container connection-factory="rabbitConnectionFactory">
<rabbit:listener queues="some.queue" ref="somePojo" method="handle" priority="10" />
</rabbit:listener-container>
从1.3版本开始,你可以修改容器在运行时监听的队列。 参见监听器容器队列。
自动删除队列
当容器配置为监听自动删除队列中,队列具有X-过期选项,或在代理上配置的存活时间策略,当容器停止时(即最后一个消费者被取消),代理会移除队列。
在1.3版本之前,容器无法重启,因为缺少队列。
这兔子管理员只有在连接关闭或打开时才会自动重新声明队列等,而容器停止和启动时则不会发生这种情况。
从版本 1.3 开始,容器使用兔子管理员用于重新声明启动时缺失的队列。
你也可以将条件声明(参见条件声明)与auto-startup=“false”管理员请求推迟队列声明直到容器启动。
以下示例展示了如何实现:
<rabbit:queue id="otherAnon" declared-by="containerAdmin" />
<rabbit:direct-exchange name="otherExchange" auto-delete="true" declared-by="containerAdmin">
<rabbit:bindings>
<rabbit:binding queue="otherAnon" key="otherAnon" />
</rabbit:bindings>
</rabbit:direct-exchange>
<rabbit:listener-container id="container2" auto-startup="false">
<rabbit:listener id="listener2" ref="foo" queues="otherAnon" admin="containerAdmin" />
</rabbit:listener-container>
<rabbit:admin id="containerAdmin" connection-factory="rabbitConnectionFactory"
auto-startup="false" />
在这种情况下,队列和交换的声明为containerAdmin,其auto-startup=“false”这样元素在上下文初始化时不会被声明。
另外,容器也不会启动,原因也是一样。
当容器随后启动时,它使用其引用containerAdmin宣告元素。