Amqp模板
与 Spring Framework 及相关项目提供的许多其他高级抽象一样,Spring AMQP 提供了一个“模板”,发挥核心作用。
定义主要作的接口称为Amqp模板.
这些作涵盖了发送和接收消息的一般行为。
换句话说,它们并非任何实现独有——因此名称中带有“AMQP”。
另一方面,该接口的实现与AMQP协议的实现相关联。
与JMS本身是一个接口级API,AMQP是线级协议。
该协议的实现各自提供客户端库,因此模板接口的每个实现都依赖于特定的客户端库。
目前,只有一个实现版本:兔子模板.
在接下来的例子中,我们通常会使用Amqp模板.
然而,当你查看配置示例或任何实例化模板或调用 setter 的代码摘录时,你可以看到实现类型(例如,兔子模板).
另见异步兔子模板。
增加重试功能
从1.3版本开始,你现在可以配置兔子模板使用重试模板帮助处理经纪人连接问题。
更多信息请参见 Spring Framework 中的核心重试支持。
以下只是使用指数退避策略和默认情况下的一个例子SimpleRetryPolicy该系统尝试三次后将异常抛给呼叫者。
以下示例使用了@ConfigurationJava 注释:
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
RetryPolicy retryPolicy = RetryPolicy.builder()
.delay(Duration.ofMillis(500))
.multiplier(2.0)
.maxDelay(Duration.ofSeconds(10))
.build();
template.setRetryTemplate(new RetryTemplate(retryPolicy));
return template;
}
从1.4版本开始,除了retryTemplate财产,该恢复回调选项支持于兔子模板.
它被用作RetryTemplate.execute(RetryCallback<T, E> retryCallback, RecoveryCallback<T>recoveryCallback).
这恢复回调有一定限制,因为重试上下文仅包含最后一次投掷田。
对于更复杂的应用场景,你应该用外部设备重试模板这样你才能向恢复回调通过上下文的属性。
以下示例展示了如何实现: |
retryTemplate.execute(
new RetryCallback<Object, Exception>() {
@Override
public Object doWithRetry(RetryContext context) throws Exception {
context.setAttribute("message", message);
return rabbitTemplate.convertAndSend(exchange, routingKey, message);
}
}, new RecoveryCallback<Object>() {
@Override
public Object recover(RetryContext context) throws Exception {
Object message = context.getAttribute("message");
Throwable t = context.getLastThrowable();
// Do something with message
return null;
}
});
}
在这种情况下,你不会注入重试模板进入兔子模板.
出版是异步的——如何检测成功与失败
发布消息是一种异步机制,默认情况下,无法路由的消息会被RabbitMQ丢弃。 成功出版时,您可以收到异步确认,详见相关出版商确认与返回。 考虑两种失败情景:
-
发布到交易所,但没有匹配的目的地队列。
-
发布到一个不存在的交易所。
第一种情况由出版商退报覆盖,详见相关出版商确认与退货。
对于第二种情况,消息被丢弃且不生成返回。
标的通道是封闭的,只有一个例外。
默认情况下,这个异常会被记录,但你可以注册频道听众其中缓存连接工厂获取此类事件的通知。
以下示例展示了如何添加连接听者:
this.connectionFactory.addConnectionListener(new ConnectionListener() {
@Override
public void onCreate(Connection connection) {
}
@Override
public void onShutDown(ShutdownSignalException signal) {
...
}
});
你可以检查信号的原因财产以确定发生的问题。
要检测发送线程上的异常,你可以setChannelTransacted(true)在兔子模板异常检测到时txCommit().
然而,事务会显著影响性能,因此在仅为这一用例启用事务前,请仔细考虑这一点。
相关出版商确认并返回
这兔子模板实现Amqp模板支持出版社确认并回归。
对于回复的消息,模板的命令的属性必须设置为true或者强制表达式必须评估为true针对特定信息。
该功能需要缓存连接工厂其出版商回归属性设置为true(参见出版商确认与回归)
退货由注册兔子模板。回电回拨通过呼叫setReturnsCallback(返回Callback callback).
回调必须实现以下方法:
void returnedMessage(ReturnedMessage returned);
这返回消息具有以下性质:
-
消息- 返回消息本身 -
回复代码- 表示返回原因的代码 -
回复正文- 返回的文本原因 - 例如:NO_ROUTE -
交换- 消息发送到的交换局 -
路由键- 所使用的路由密钥
只有一个回归回唱由兔子模板.
另见回复超时。
对于出版商确认(也称为出版商确认),模板要求缓存连接工厂其出版商确认属性设置为确认类型。相关性.
确认邮件由注册RabbitTemplate.确认回电通过呼叫setConfirmCallback(确认Callback callback).
回调必须实现以下方法:
void confirm(CorrelationData correlationData, boolean ack, String cause);
这相关数据是客户端在发送原始消息时提供的对象。
这啊对于啊且为假纳克.
为纳克实例中,原因可能包含纳克,如果 在纳克生成。
例如,向不存在的交换机发送消息。
在这种情况下,经纪人会关闭该渠道。
关闭的原因包含在原因. 这原因在1.4版本中加入。
只有一个确认回电由兔子模板.
当兔子模板发送作完成时,该通道关闭。
这会阻止当连接出厂缓存满时接收确认或返回(当缓存中有空间时,通道未物理关闭,返回和确认正常进行)。
当缓存满时,框架会延迟关闭最多五秒,以便接收确认和返回。
使用确认时,当最后一次确认收到时通道会关闭。
仅使用回车时,通道保持开启完整五秒。
我们通常建议将连接设置为出厂设置channelCacheSize将值调整到足够大的值,使得发布消息的通道能够返回缓存,而不是被关闭。
你可以通过使用 RabbitMQ 管理插件来监控频道使用情况。
如果你发现通道频繁开闭,建议考虑增加缓存大小以减少服务器开销。 |
在2.1版本之前,启用的发布商确认通道会在收到确认信息之前返回缓存。
其他进程可能会检查信道并执行某些作导致信道关闭——例如向不存在的交换机发布消息。
这可能导致确认丢失。
2.1版及以后版本不再将信道返回缓存,而确认值为未完成。
这兔子模板执行逻辑接近()每次作后在频道播出。
一般来说,这意味着同一频道同时只有一个确认未完成。 |
从版本 2.2 开始,回调会在连接工厂中的某个执行者线程。
这是为了避免在回调内执行兔子作时可能出现的死锁。
在之前的版本中,回调是直接在AMQP-client连接输入输出线程;如果你执行某些RPC作(如开启新通道),会导致死锁,因为I/O线程会阻塞等待结果,但结果需要由I/O线程本身处理。
在这些版本中,需要将工作(如发送消息)交接给回调内的另一个线程。
这已不再必要,因为框架现在将回调调用交给执行者。 |
| 只要返回回调在60秒或更短内执行,确认在确认前收到返回消息的保证依然有效。 确认信计划在回拨结束后或60秒后(以先到者为准)送达。 |
这相关数据对象有一个完成未来你可以用它来获得结果,而不是用确认回电在模板上。
以下示例展示了如何配置相关数据实例:
CorrelationData cd1 = new CorrelationData();
this.templateWithConfirmsEnabled.convertAndSend("exchange", queue.getName(), "foo", cd1);
assertTrue(cd1.getFuture().get(10, TimeUnit.SECONDS).isAck());
ReturnedMessage = cd1.getReturn();
...
因为它是CompletableFuture<确认吗>,你可以选择get()结果是准备好或使用的状态whenComplete()对于异步回拨。
这确认对象是一个具有两个性质的简单豆子:啊和原因(当纳克实例)。
该原因不包含经纪人生成的纳克实例。
其人口为纳克框架生成的实例(例如,在啊实例非常出色)。
此外,当确认和返回都被启用时,相关数据 返回如果无法路由到任何队列,则该属性被填充返回的消息。
可以保证返回消息的性质在未来属性被设置之前,用啊.CorrelationData.getReturn()返回 a返回消息其性质为:
-
消息(返回消息)
-
回复代码
-
回复正文
-
交换
-
路由键
另见范围作,提供一种更简单的等待出版商确认的机制。
范围作
通常,使用模板时,a渠道被从缓存中借出(或创建),用于作,然后返回缓存中进行重复使用。
在多线程环境中,无法保证下一个作会使用相同的通道。
不过,有时你可能希望对信道的使用有更多控制,并确保多个作都在同一信道上进行。
从2.0版本开始,出现了一个名为调用提供 ,且运营回调.
在回调范围内执行的任何作,并且在兔子行动论元使用相同的专用渠道该缓存最终会被关闭(不会返回缓存)。
如果信道是出版商回应频道,在收到所有确认后,它会被归还到缓存中(参见相关出版商确认与返回)。
@FunctionalInterface
public interface OperationsCallback<T> {
T doInRabbit(RabbitOperations operations);
}
一个你可能需要这样做的例子是,如果你想使用waitForConfirms()基础上的方法渠道.
此前 Spring API 未曾公开该方法,因为通道通常被缓存和共享,如前所述。
这兔子模板现在提供waitForConfirms(长时间暂停)和waitForConfirmsOrDie(长时间暂停),该通道被委托给在运营回调.
出于显而易见的原因,这些方法不能超出这个范围使用。
请注意,其他地方提供了更高层次的抽象,可以将确认与请求关联起来(参见相关出版商确认与返回)。 如果你只想等经纪人确认交货,可以使用以下示例所示的技术:
Collection<?> messages = getMessagesToSend();
Boolean result = this.template.invoke(t -> {
messages.forEach(m -> t.convertAndSend(ROUTE, m));
t.waitForConfirmsOrDie(10_000);
return true;
});
如果你愿意兔子管理员作将在运营回调,行政节点必须是用相同的兔子模板该调用操作。
如果模板作已经在现有事务的范围内执行——例如,在交易监听器容器线程上运行并对交易模板执行作时,上述讨论就无关紧要。
在这种情况下,作在该通道上执行,线程返回容器时提交。
不必使用调用在这种情况下。 |
以这种方式使用确认时,许多用于将确认与请求关联的基础设施其实并不需要(除非同时启用返回)。
从版本 2.2 开始,连接工厂支持一个新的属性,称为publisherConfirmType(出版者确认类型).
当此设定为确认类型。简单,避免了基础设施,确认处理可以更高效。
此外,兔子模板设置出版商序列号发送消息中的属性消息属性.
如果你想检查(或记录或以其他方式使用)特定确认,可以用超载的调用如下例所示:
public <T> T invoke(OperationsCallback<T> action, com.rabbitmq.client.ConfirmCallback acks,
com.rabbitmq.client.ConfirmCallback nacks);
这些确认回电对象(对于啊和纳克实例)是兔子客户端的回调,而不是模板的回调。 |
以下示例日志啊和纳克实例:
Collection<?> messages = getMessagesToSend();
Boolean result = this.template.invoke(t -> {
messages.forEach(m -> t.convertAndSend(ROUTE, m));
t.waitForConfirmsOrDie(10_000);
return true;
}, (tag, multiple) -> {
log.info("Ack: " + tag + ":" + multiple);
}, (tag, multiple) -> {
log.info("Nack: " + tag + ":" + multiple);
}));
| 有作用域的作绑定在一个线程上。 关于多线程环境中严格排序的讨论,请参见《多线程环境中的严格消息排序》。 |
多线程环境中的严格消息顺序
Scoped Operations 中的讨论仅适用于作在同一线程上执行时。
请考虑以下情况:
-
线程1向队列发送消息并交接工作线程-2 -
线程-2向同一队列发送消息
由于RabbitMQ的异步特性和缓存通道的使用;不确定是否会使用同一信道,因此消息到达队列的顺序无法保证。
(大多数情况下,包裹会按顺序到达,但乱序交付的概率并非零。)
为了解决这种情况,你可以使用大小为 的有界通道缓存1(与频道 Checkout超时确保消息始终发布在同一频道,保证秩序。
为此,如果你对连接工厂有其他用途,比如消费者,你应该使用专门的连接工厂来存放模板,或者配置模板使用嵌入主连接工厂中的发布者连接工厂(参见使用单独连接)。
这点可以用一个简单的Spring靴应用来最好说明:
@SpringBootApplication
public class Application {
private static final Logger log = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
TaskExecutor exec() {
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
exec.setCorePoolSize(10);
return exec;
}
@Bean
CachingConnectionFactory ccf() {
CachingConnectionFactory ccf = new CachingConnectionFactory("localhost");
CachingConnectionFactory publisherCF = (CachingConnectionFactory) ccf.getPublisherConnectionFactory();
publisherCF.setChannelCacheSize(1);
publisherCF.setChannelCheckoutTimeout(1000L);
return ccf;
}
@RabbitListener(queues = "queue")
void listen(String in) {
log.info(in);
}
@Bean
Queue queue() {
return new Queue("queue");
}
@Bean
public ApplicationRunner runner(Service service, TaskExecutor exec) {
return args -> {
exec.execute(() -> service.mainService("test"));
};
}
}
@Component
class Service {
private static final Logger LOG = LoggerFactory.getLogger(Service.class);
private final RabbitTemplate template;
private final TaskExecutor exec;
Service(RabbitTemplate template, TaskExecutor exec) {
template.setUsePublisherConnection(true);
this.template = template;
this.exec = exec;
}
void mainService(String toSend) {
LOG.info("Publishing from main service");
this.template.convertAndSend("queue", toSend);
this.exec.execute(() -> secondaryService(toSend.toUpperCase()));
}
void secondaryService(String toSend) {
LOG.info("Publishing from secondary service");
this.template.convertAndSend("queue", toSend);
}
}
即使发布在两个不同的线程上,它们仍会使用同一个通道,因为缓存被限制在一个通道。
从2.3.7版本开始,ThreadChannelConnectionFactory支持通过prepareContextSwitch和切换上下文方法。
第一种方法返回上下文,传递给第二个线程,后者调用第二个方法。
线程可以绑定非事务通道或事务通道(或两者之一);你不能单独转移它们,除非你用两个连接工厂。
举个例子:
@SpringBootApplication
public class Application {
private static final Logger log = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
TaskExecutor exec() {
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
exec.setCorePoolSize(10);
return exec;
}
@Bean
ThreadChannelConnectionFactory tccf() {
ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();
rabbitConnectionFactory.setHost("localhost");
return new ThreadChannelConnectionFactory(rabbitConnectionFactory);
}
@RabbitListener(queues = "queue")
void listen(String in) {
log.info(in);
}
@Bean
Queue queue() {
return new Queue("queue");
}
@Bean
public ApplicationRunner runner(Service service, TaskExecutor exec) {
return args -> {
exec.execute(() -> service.mainService("test"));
};
}
}
@Component
class Service {
private static final Logger LOG = LoggerFactory.getLogger(Service.class);
private final RabbitTemplate template;
private final TaskExecutor exec;
private final ThreadChannelConnectionFactory connFactory;
Service(RabbitTemplate template, TaskExecutor exec,
ThreadChannelConnectionFactory tccf) {
this.template = template;
this.exec = exec;
this.connFactory = tccf;
}
void mainService(String toSend) {
LOG.info("Publishing from main service");
this.template.convertAndSend("queue", toSend);
Object context = this.connFactory.prepareSwitchContext();
this.exec.execute(() -> secondaryService(toSend.toUpperCase(), context));
}
void secondaryService(String toSend, Object threadContext) {
LOG.info("Publishing from secondary service");
this.connFactory.switchContext(threadContext);
this.template.convertAndSend("queue", toSend);
this.connFactory.closeThreadChannel();
}
}
一旦准备SwitchContext如果当前线程执行更多作,则将在新通道上执行。
当线缆绑定通道不再需要时,关闭它非常重要。 |
消息集成
从1.4版本开始,兔子消息模板(建在兔子模板)提供了与Spring Framework消息抽象的集成——即,org.springframework.messaging.Message.
这使你可以通过以下方式发送和接收消息春季消息 留言<?>抽象化。
这种抽象被其他 Spring 项目使用,如 Spring Integration 和 Spring 的 STOMP 支持。
涉及两个消息转换器:一个用于在Spring消息之间转换留言<?>以及春季AMQP的消息一个用于在Spring AMQP之间转换的抽象消息抽象以及底层RabbitMQ客户端库所需的格式。
默认情况下,消息有效载荷由兔子模板实例的消息转换器。
或者,你也可以注入自定义消息信息转换器与其他有效载荷转换器合作,如下示例所示:
MessagingMessageConverter amqpMessageConverter = new MessagingMessageConverter();
amqpMessageConverter.setPayloadConverter(myPayloadConverter);
rabbitMessagingTemplate.setAmqpMessageConverter(amqpMessageConverter);
验证用户ID
从1.6版本开始,模板现在支持一个用户ID-表达式 (用户ID表达当使用 Java 配置时)。
如果发送了消息,用户ID属性会在评估该表达式后设置(如果尚未设置)。
评估的根对象是要发送的消息。
以下示例展示了如何使用用户ID-表达式属性:
<rabbit:template ... user-id-expression="'guest'" />
<rabbit:template ... user-id-expression="@myConnectionFactory.username" />
第一个例子是一个字面表达。
第二种获得用户名在应用上下文中,来自连接工厂豆的属性。
使用独立连接
从2.0.2版本开始,你可以设置使用出版商连接属性到true尽可能使用与监听器容器不同的连接方式。
这是为了避免当生产者因任何原因被封锁时,消费者也会被封锁。
连接工厂为此设有第二个内部连接工厂;默认情况下,它与主工厂类型相同,但如果你想使用不同的工厂类型来发布,可以明确设置。
如果兔子模板运行在监听器容器发起的事务中,则无论该设置如何,都会使用容器的通道。
一般来说,你不应该使用兔子管理员使用一个将此设置为 的模板true.
使用该兔子管理员建造者会利用连接工厂。
如果你使用另一个取模板的构造函数,确保模板的属性为false.
这是因为管理员通常用于声明监听器容器的队列。
使用一个将属性设置为 的模板true这意味着独占队列(例如匿名队列)会在与监听器容器使用的连接不同的情况下声明。
在这种情况下,容器无法使用队列。 |