|
对于最新稳定版本,请使用Spring AMQP 4.0.0! |
发送消息
发送消息时,您可以使用以下任一方法:
void send(Message message) throws AmqpException;
void send(String routingKey, Message message) throws AmqpException;
void send(String exchange, String routingKey, Message message) throws AmqpException;
我们可以从前面列表中的最后一种方法开始讨论,因为它实际上是最明确的。
它允许在运行时提供AMQP交换名称(以及路由密钥)。
最后一个参数是回调,负责实际创建消息实例。
使用这种方法发送消息的一个例子可能如下:
以下示例展示了如何使用发送发送消息的方法:
amqpTemplate.send("marketData.topic", "quotes.nasdaq.THING1",
new Message("12.34".getBytes(), someProperties));
你可以设置交换如果你打算大部分时间或全部用模板实例发送到同一个交易所,模板本身的属性。
在这种情况下,你可以使用前面列表中的第二种方法。
以下示例在功能上等价于前一个示例:
amqpTemplate.setExchange("marketData.topic");
amqpTemplate.send("quotes.nasdaq.FOO", new Message("12.34".getBytes(), someProperties));
如果两者都交换和路由键属性设置在模板上,你可以使用只接受消息.
以下示例展示了如何实现:
amqpTemplate.setExchange("marketData.topic");
amqpTemplate.setRoutingKey("quotes.nasdaq.FOO");
amqpTemplate.send(new Message("12.34".getBytes(), someProperties));
更好的理解方式是,显式方法参数总是覆盖模板的默认值。
事实上,即使你没有在模板上明确设置这些属性,默认值也总是存在。
在这两种情况下,默认都是空字符串但这实际上是一个合理的默认选择。
就路由密钥而言,它并不总是必要的(例如,对于
一个扇形交换)。
此外,队列可能绑定到具有空的交换字符串.
这两种情形都合理地依赖默认空的字符串模板的路由键属性的值。
就交易所名称而言,空的字符串常用,因为AMQP规范中定义“默认交换”没有名称。
由于所有队列都自动绑定到该默认交换(即直接交换),并以其名称作为绑定值,前述列表中的第二种方法可以通过默认交换机实现对任意队列的简单点对点消息传递。
你可以提供队列名称路由键,要么在运行时提供方法参数。
以下示例展示了如何实现:
RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange
template.send("queue.helloWorld", new Message("Hello World".getBytes(), someProperties));
或者,你也可以创建一个模板,主要或仅用于发布到单个队列。 以下示例展示了如何实现:
RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange
template.setRoutingKey("queue.helloWorld"); // but we'll always send to this Queue
template.send(new Message("Hello World".getBytes(), someProperties));
消息构建器 API
从1.3版本开始,消息构建器API由消息构建器和消息属性构建器.
这些方法提供了一种方便的“流畅”方式来创建消息或消息属性。
以下示例展示了流 API 的实际应用:
Message message = MessageBuilder.withBody("foo".getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId("123")
.setHeader("bar", "baz")
.build();
MessageProperties props = MessagePropertiesBuilder.newInstance()
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId("123")
.setHeader("bar", "baz")
.build();
Message message = MessageBuilder.withBody("foo".getBytes())
.andProperties(props)
.build();
定义在消息属性可以设置。
其他方法包括setHeader(字符串键,字符串值),removeHeader(字符串键),removeHeaders()和copyProperties(MessageProperties properties).
每个属性设置方法都有set*IfAbsent()变体。
在存在默认初始值的情况下,方法被命名为set*IfAbsentOrDefault().
提供五种静态方法用于创建初始消息构建器:
public static MessageBuilder withBody(byte[] body) (1)
public static MessageBuilder withClonedBody(byte[] body) (2)
public static MessageBuilder withBody(byte[] body, int from, int to) (3)
public static MessageBuilder fromMessage(Message message) (4)
public static MessageBuilder fromClonedMessage(Message message) (5)
| 1 | 构建者创建的信息有一个正体,直接指向论点。 |
| 2 | 构建者创建的消息有一个新数组,包含参数中字节的副本。 |
| 3 | 构建者创建的消息有一个新数组,包含参数的字节范围。
看Arrays.copyOfRange()更多细节请阅读。 |
| 4 | 构建者创造的信息有一个直接指向论证主体的正体。
该参数的属性被复制到新的消息属性对象。 |
| 5 | 构建者创建的消息有一个新数组,包含参数正文的副本。
该参数的属性被复制到新的消息属性对象。 |
提供了三种静态方法来创建消息属性构建器实例:
public static MessagePropertiesBuilder newInstance() (1)
public static MessagePropertiesBuilder fromProperties(MessageProperties properties) (2)
public static MessagePropertiesBuilder fromClonedProperties(MessageProperties properties) (3)
| 1 | 一个新的消息属性对象会以默认值初始化。 |
| 2 | 构建器初始化为,build()将返回,所提供的属性对象。 |
| 3 | 该参数的属性被复制到新的消息属性对象。 |
与兔子模板实现Amqp模板,每个发送()方法有一个超载版本,需要额外的相关数据对象。
当发布者确认启用时,该对象会在描述的回调中返回。Amqp模板.
这使发送方能够关联确认(啊或纳克)并发送了发送信息。
从1.6.7版本开始,关联觉知消息后处理器引入了接口,允许在消息转换后修改相关数据。
以下示例展示了如何使用它:
Message postProcessMessage(Message message, Correlation correlation);
在2.0版本中,该接口被弃用。
该方法已被迁移到消息后处理器默认实现委派为postProcessMessage(消息消息).
从1.6.7版本开始,新增了一个回调接口,称为相关数据PostProcessor提供。
毕竟这是被调用的消息后处理器实例(提供于发送()方法以及提供的setBeforePublishPostProcessors()).
实现可以更新或替换提供在发送()方法(如果有的话)。
这消息以及原创相关数据(如果有的话)作为论据提供。
以下示例展示了如何使用后处理方法:
CorrelationData postProcess(Message message, CorrelationData correlationData);
配料
1.4.2 版本引入了批量处理兔子模板.
这是一个子类兔子模板带有覆盖发送根据批次处理策略.
只有当一批次完成后,消息才会发送给RabbitMQ。
以下列表显示了批次处理策略界面定义:
public interface BatchingStrategy {
MessageBatch addToBatch(String exchange, String routingKey, Message message);
Date nextRelease();
Collection<MessageBatch> releaseBatches();
}
| 批量处理的数据存储在内存中。 未发送的消息在系统故障时可能会丢失。 |
一个简单批处理策略提供。
它支持向单一交换或路由密钥发送消息。
它具有以下特性:
-
批量大小:发送前批次中的消息数量。 -
缓冲限制:批处理消息的最大大小。 这会优先于批量大小如果超过,则会发送部分批次。 -
超时:当没有新活动添加消息到批次时,发送部分批次的时间。
这简单批处理策略通过在每个嵌入消息前加上四字节的二进制长度来格式化批次。
通过设置springBatch格式消息属性 到长度标题4.
批处理消息默认由监听器容器自动解批(通过使用springBatch格式消息头部)。
拒绝批处理中的任何消息会导致整个批次被拒绝。 |
不过,更多信息请参见批处理@RabbitListener。