|
对于最新稳定版本,请使用Spring AMQP 4.0.0! |
连接与资源管理
虽然我们在前一节描述的AMQP模型是通用且适用于所有实现的,但当我们进入资源管理时,细节则是针对经纪人实现的具体情况。 因此,在本节中,我们重点介绍仅存在于“Spring兔”模块中的代码,因为目前 RabbitMQ 是唯一支持的实现。
管理与RabbitMQ代理连接的核心组件是连接工厂接口。
责任连接工厂实现是提供一个实例org.springframework.amqp.rabbit.connection.Connection,是 的包装器com.rabbitmq.client.Connection.
选择连接工厂
共有三家连接工厂可供选择
-
池化通道连接工厂 -
ThreadChannelConnectionFactory -
缓存连接工厂
前两个是在2.3版本中添加的。
对于大多数用例,缓存连接工厂应该使用。
这ThreadChannelConnectionFactory如果你想确保严格的消息顺序,而无需使用作用域作,可以使用。
这池化通道连接工厂与缓存连接工厂它使用单一连接和多个通道池。
它的实现更简单,但不支持相关的发布商确认。
三家工厂都支持简单的出版商确认。
在配置兔子模板要使用独立连接,从版本 2.3.2 开始,你可以将发布连接工厂配置为不同类型。
默认情况下,出版工厂是同一类型,主工厂上设置的任何属性也会传播到出版工厂。
从3.1版本开始,摘要连接工厂包括连接正在创建后退该属性支持连接模块中的退避策略。
目前,行为方面有支持。createChannel()处理当channelMax达到极限后,基于尝试次数和间隔实施退避策略。
池化通道连接工厂
该工厂管理单一连接和两个基于Apache Pool2的通道池。
一个池用于事务通道,另一个用于非事务通道。
这些池塘是通用对象池s,默认配置;提供回调功能以配置池;更多信息请参阅Apache文档。
阿帕奇Commons-pool2贾尔必须在职业路径上才能使用这家工厂。
@Bean
PooledChannelConnectionFactory pcf() throws Exception {
ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();
rabbitConnectionFactory.setHost("localhost");
PooledChannelConnectionFactory pcf = new PooledChannelConnectionFactory(rabbitConnectionFactory);
pcf.setPoolConfigurer((pool, tx) -> {
if (tx) {
// configure the transactional pool
}
else {
// configure the non-transactional pool
}
});
return pcf;
}
ThreadChannelConnectionFactory
该工厂管理一个连接和两个连接ThreadLocals,一个用于事务通道,另一个用于非事务通道。
该工厂确保同一线程上的所有作使用同一条通道(只要通道保持开放)。
这便于严格的消息排序,无需使用范围作。
为避免内存泄漏,如果您的应用程序使用了许多短寿命线程,您必须调用工厂的关闭线程通道()释放通道资源。
从2.3.7版本开始,线程可以将其通道转移到另一个线程。
更多信息请参见多线程环境中的严格消息排序。
缓存连接工厂
第三种实现是缓存连接工厂默认情况下,该代理会建立一个应用程序可以共享的单一连接代理。
连接共享是可能的,因为与AMQP消息传递的“工作单元”实际上是一个“通道”(在某些方面,这类似于JMS中连接与会话之间的关系)。
连接实例提供createChannel(创建频道)方法。
这缓存连接工厂实现支持这些通道的缓存,并根据信道是否为事务性维护独立缓存。
在创建 的实例缓存连接工厂你可以通过构造函数提供“主机名”。
你还应该提供“用户名”和“密码”属性。
要配置通道缓存大小(默认为25),你可以调用setChannelCacheSize()方法。
从1.3版本开始,你可以配置缓存连接工厂既缓存连接,也只缓存通道。
在这种情况下,每次调用createConnection()创建一个新的连接(或从缓存中获取一个空闲连接)。
关闭连接会将其返回缓存(如果缓存大小尚未达到)。
在此类连接上创建的信道也会被缓存。
在某些环境中,使用独立连接可能有用,例如从HA集群中获取数据,在
与负载均衡器结合,连接不同的集群成员及其他成员。
要缓存连接,设置缓存模式自缓存模式.连接.
| 这并不限制连接数量。相反,它规定了允许的空闲开路连接数量。 |
从1.5.5版本开始,新增了一个名为连接限制是提供的。当该性质被设置时,它限制了允许的连接总数。当设置时,如果达到该限制,则频道 借阅时间限制用于等待连接进入空闲状态。如果时间超过,则AmqpTimeoutException被抛出。
|
当缓存模式为 此外,在撰写本文时, |
需要理解的是,缓存大小(默认情况下)不是限制,而仅仅是可缓存的通道数量。当缓存大小为10时,实际上可以使用任意数量的通道。如果使用超过10个通道且全部返回缓存,则有10个进入缓存。其余通道物理关闭。
从版本 1.6 开始,默认通道缓存大小从 1 增加到 25。在高流量多线程环境中,小缓存意味着通道的创建和关闭速度较快。增加默认缓存大小可以避免这些开销。你应通过 RabbitMQ 管理界面监控正在使用的通道,并考虑进一步增加缓存大小如果你看到许多通道被创建和关闭。缓存仅按需增长(以满足应用程序的并发需求),因此这一变化不会影响现有的低容量应用程序。
从版本 1.4.2 开始,缓存连接工厂有一个性质,称为频道 Checkout超时. 当该性质大于零时,则channelCacheSize成为连接上可创建通道数量的限制。如果达到限制,调用线程会阻塞,直到通道可用或达到超时,此时AmqpTimeoutException被抛出。
框架内使用的通道(例如,兔子模板)能够可靠地返回到缓存中。如果你在框架外创建通道,(例如,通过直接访问连接并调用createChannel()),你必须可靠地(通过关闭)归还它们,可能在最后屏蔽,以避免通道耗尽。 |
以下示例展示了如何创建新的连接:
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("somehost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = connectionFactory.createConnection();
使用 XML 时,配置可能如下示例:
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="somehost"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
</bean>
还有一个单一连接工厂仅在框架的单元测试代码中实现。它比缓存连接工厂,因为它不缓存通道,但由于性能和韧性不足,它不适合实际使用,除了简单测试。如果你需要自己实现连接工厂不知为何,摘要连接工厂基础职业可能是一个不错的起点。 |
一个连接工厂可以通过使用兔子命名空间快速且方便地创建,具体如下:
<rabbit:connection-factory id="connectionFactory"/>
在大多数情况下,这种方法更为可取,因为框架可以为你选择最佳默认值。创建的实例是缓存连接工厂. 请记住,通道的默认缓存大小是25。如果你想缓存更多通道,可以通过设置“channelCacheSize”属性来设置更大的值。在XML中,它看起来如下:
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="somehost"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
<property name="channelCacheSize" value="50"/>
</bean>
此外,利用命名空间,你可以添加“channel-cache-size”属性,具体如下:
<rabbit:connection-factory
id="connectionFactory" channel-cache-size="50"/>
默认缓存模式为渠道但你可以配置它来缓存连接。
在下面的例子中,我们使用连接缓存大小:
<rabbit:connection-factory
id="connectionFactory" cache-mode="CONNECTION" connection-cache-size="25"/>
您可以通过命名空间提供主机和端口属性,具体如下:
<rabbit:connection-factory
id="connectionFactory" host="somehost" port="5672"/>
或者,如果运行在集群环境中,你可以使用地址属性,具体如下:
<rabbit:connection-factory
id="connectionFactory" addresses="host1:5672,host2:5672" address-shuffle-mode="RANDOM"/>
有关相关信息,请参见“连接到集群”地址洗牌模式.
以下示例是一个自定义线程工厂,线程名称前缀为兔子MQ-:
<rabbit:connection-factory id="multiHost" virtual-host="/bar" addresses="host1:1234,host2,host3:4567"
thread-factory="tf"
channel-cache-size="10" username="user" password="password" />
<bean id="tf" class="org.springframework.scheduling.concurrent.CustomizableThreadFactory">
<constructor-arg value="rabbitmq-" />
</bean>
命名关联
从版本 1.7 开始,a连接名称策略为注入抽象连接工厂.
生成的名称用于针对特定应用的 RabbitMQ 连接识别。
如果RabbitMQ服务器支持,连接名称会显示在管理界面中。
该值不必是唯一的,也不能用作连接标识符——例如,在HTTP API请求中。
这个值应该是人类可读的,并且是客户属性在connection_name钥匙。
你可以使用一个简单的Lambda,具体如下:
connectionFactory.setConnectionNameStrategy(connectionFactory -> "MY_CONNECTION");
这连接工厂参数可以通过某些逻辑来区分目标连接名称。
默认情况下,豆名关于摘要连接工厂,一个表示对象的十六进制字符串和一个内部计数器被用来生成connection_name.
这<兔子:连接工厂>命名空间组件还包含连接-命名-策略属性。
一个实现简单属性价值连接名称策略将连接名称设置为应用属性。
你可以声明为@Bean并注入连接工厂,如下示例所示:
@Bean
public SimplePropertyValueConnectionNameStrategy cns() {
return new SimplePropertyValueConnectionNameStrategy("spring.application.name");
}
@Bean
public ConnectionFactory rabbitConnectionFactory(ConnectionNameStrategy cns) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
...
connectionFactory.setConnectionNameStrategy(cns);
return connectionFactory;
}
该属性必须存在于应用上下文中环境.
使用Spring Boot及其自动配置的连接工厂时,只需声明连接名称策略 @Bean.
Spring靴会自动检测咖啡豆并将其接入工厂。 |
阻塞连接与资源约束
连接可能会被与内存警报对应的代理进行交互而被阻断。
从2.0版本开始,org.springframework.amqp.rabbit.connection.Connection可以提供com.rabbitmq.client.BlockedListener实例将被通知连接被阻断和解除阻挡事件。
此外,摘要连接工厂发射 a连接阻塞事件和ConnectionUnblockedEvent分别通过其内部被封锁的听众实现。
这些功能让你能够提供应用逻辑,以适当应对代理存在的问题,并(例如)采取一些纠正措施。
当应用程序配置为单一缓存连接工厂,作为 Spring Boot 自动配置默认设置,当连接被代理阻断时,应用程序停止工作。
当它被经纪人屏蔽时,任何客户都会停止工作。
如果生产者和消费者在同一应用中,可能导致生产者阻断连接(因为代理服务器上没有资源),而消费者无法释放连接(因为连接被阻断)。
为了缓解这个问题,我们建议再设一个独立的缓存连接工厂示例中选项相同——一个面向生产者,一个面向消费者。
一个独立的缓存连接工厂对于在消费者线程上执行的交易生产者来说是不可能的,因为他们应该重用渠道与消费者交易相关。 |
从版本 2.0.2 开始,兔子模板有配置选项自动使用第二个连接工厂,除非正在使用事务。
更多信息请参见“使用单独连接”。
这连接名称策略对于出版商来说,连接与主要策略相同,且。发行人附加在调用方法的结果后。
从1.7.7版本开始,一个AmqpResourceNotAvailableException是 ,当 掷出SimpleConnection.createChannel()无法创建渠道(例如,因为channelMax当达到限制时,缓存中没有可用的通道)。
你可以在RetryPolicy在撤退后恢复行动。
配置底层客户端连接工厂
这缓存连接工厂使用Rabbit客户端的一个实例连接工厂.
通过(主机,端口,用户名,密码,请求心跳和连接超时例如)当将等价性质设置为缓存连接工厂.
要设置其他性质(clientProperties(客户属性)例如,你可以定义兔子工厂的一个实例,并通过使用相应的构造函数来引用缓存连接工厂.
使用命名空间(如前所述)时,你需要在连接工厂属性。
为方便起见,提供了工厂豆以协助在 Spring 应用上下文中配置连接工厂,具体将在下一节讨论。
<rabbit:connection-factory
id="connectionFactory" connection-factory="rabbitConnectionFactory"/>
4.0.x 客户端默认启用自动恢复。
虽然兼容此功能,但 Spring AMQP 有自己的恢复机制,客户端恢复功能通常不需要。
我们建议关闭AMQP-client自动恢复,以避免获得自动恢复连接非当前开放异常代理可用但连接尚未恢复的情况。
你可能会注意到这个例外,例如当重试模板配置为兔子模板即使切换到集群中的另一个经纪人。
由于自动恢复连接是定时恢复的,使用Spring AMQP的恢复机制可以更快地恢复连接。
从1.7.1版本开始,Spring AMQP会被禁用AMQP-client除非你明确创建自己的RabbitMQ连接工厂并提供给缓存连接工厂.
兔子MQ连接工厂由兔子连接工厂豆另外,默认关闭这个选项。 |
兔子连接工厂豆以及配置SSL
从1.4版本开始,方便兔子连接工厂豆通过依赖注入,方便配置底层客户端连接工厂上的 SSL 属性。
其他二传则委托给底层工厂。
以前,你必须通过程序自动配置SSL选项。
以下示例展示了如何配置兔子连接工厂豆:
-
Java
-
XML
@Bean
RabbitConnectionFactoryBean rabbitConnectionFactory() {
RabbitConnectionFactoryBean factoryBean = new RabbitConnectionFactoryBean();
factoryBean.setUseSSL(true);
factoryBean.setSslPropertiesLocation(new ClassPathResource("secrets/rabbitSSL.properties"));
return factoryBean;
}
@Bean
CachingConnectionFactory connectionFactory(ConnectionFactory rabbitConnectionFactory) {
CachingConnectionFactory ccf = new CachingConnectionFactory(rabbitConnectionFactory);
ccf.setHost("...");
// ...
return ccf;
}
<bean id="rabbitConnectionFactory"
class="org.springframework.amqp.rabbit.connection.RabbitConnectionFactoryBean">
<property name="useSSL" value="true" />
<property name="sslPropertiesLocation" value="classpath:secrets/rabbitSSL.properties"/>
</bean>
<rabbit:connection-factory id="connectionFactory"
connection-factory="rabbitConnectionFactory"
host="${host}"
port="${port}"
virtual-host="${vhost}"
username="${username}" password="${password}" />
Spring Boot 应用文件(.yaml或。性能)
-
Properties
-
YAML
spring.rabbitmq.host=...
spring.rabbitmq.ssl.keyStoreType=jks
spring.rabbitmq.ssl.trustStoreType=jks
spring.rabbitmq.ssl.keyStore=...
spring.rabbitmq.ssl.trustStore=...
spring.rabbitmq.ssl.trustStorePassword=...
spring.rabbitmq.ssl.keyStorePassword=...
spring.rabbitmq.ssl.enabled=true
spring:
rabbitmq:
host: ...
ssl:
keyStoreType: jks
trustStoreType: jks
keyStore: ...
trustStore: ...
trustStorePassword: ...
keyStorePassword: ...
enabled: true
有关配置SSL的信息,请参见RabbitMQ文档。
省略keyStore和trustStore配置为无需证书验证即可通过SSL连接。
下一个例子展示了如何提供密钥和信任存储配置。
这sslProperties位置地产是泉水资源指向包含以下键的属性文件:
keyStore=file:/secret/keycert.p12
trustStore=file:/secret/trustStore
keyStore.passPhrase=secret
trustStore.passPhrase=secret
这keyStore和Truststore是Spring资源指向商店。
通常,该属性文件由作系统保护,应用程序拥有读取权限。
从 Spring AMQP 1.5 版本开始,你可以直接在出厂豆子上设置这些属性。
如果离散性质和sslProperties位置提供后者的属性覆盖
离散的价值观。
从2.0版本开始,服务器证书默认被验证,因为它更安全。
如果你想跳过这个验证,可以设置出厂豆skipServerCertificateValidation属性到true.
从2.1版本开始,兔子连接工厂豆现在来电enableHostnameVerification()默认。
要恢复到之前的行为,请设置enableHostnameVerification属性到false. |
从2.2.5版本开始,出厂豆默认始终使用TLS v1.2;此前,它在某些情况下使用v1.1,在其他情况下使用v1.2(具体取决于其他属性)。
如果你因为某些原因需要使用v1.1,可以设置ssl算法财产:setSslAlgorithm(“TLSv1.1”). |
连接集群
要连接集群,请配置地址属性缓存连接工厂:
@Bean
public CachingConnectionFactory ccf() {
CachingConnectionFactory ccf = new CachingConnectionFactory();
ccf.setAddresses("host1:5672,host2:5672,host3:5672");
return ccf;
}
从3.0版本开始,底层连接工厂会在新连接建立时尝试通过随机地址连接到主机。
要恢复之前尝试从第一个连接到最后的行为,设置addressShuffleMode属性到地址洗牌模式.NONE.
从2.3版本开始,顺序新增了随机模式,意味着连接建立后,第一个地址会被移到末尾。
你可以考虑将此模式与RabbitMQ分片插件搭配使用。缓存模式.连接以及如果你希望从所有节点的所有分片中消费,则需要合适的并发。
@Bean
public CachingConnectionFactory ccf() {
CachingConnectionFactory ccf = new CachingConnectionFactory();
ccf.setAddresses("host1:5672,host2:5672,host3:5672");
ccf.setAddressShuffleMode(AddressShuffleMode.INORDER);
return ccf;
}
路由连接工厂
从1.3版本开始,摘要:RoutingConnectionFactory已经被引入。
该工厂提供了配置多个映射的机制连接工厂并确定目标连接工厂被一些人lookupKey在运行时。
通常,实现会检查线程绑定上下文。
为了方便起见,Spring AMQP 提供了SimpleRoutingConnectionFactory,该 得到当前线程绑定lookupKey来自简易资源持有者.
以下示例展示了如何配置SimpleRoutingConnectionFactoryXML和Java两种版本:
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory">
<property name="targetConnectionFactories">
<map>
<entry key="#{connectionFactory1.virtualHost}" ref="connectionFactory1"/>
<entry key="#{connectionFactory2.virtualHost}" ref="connectionFactory2"/>
</map>
</property>
</bean>
<rabbit:template id="template" connection-factory="connectionFactory" />
public class MyService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void service(String vHost, String payload) {
SimpleResourceHolder.bind(rabbitTemplate.getConnectionFactory(), vHost);
rabbitTemplate.convertAndSend(payload);
SimpleResourceHolder.unbind(rabbitTemplate.getConnectionFactory());
}
}
使用后解绑资源非常重要。
更多信息请参见 JavaDoc摘要:RoutingConnectionFactory.
从1.4版本开始,兔子模板支持SpELsendConnectionFactorySelectorExpression和接收连接工厂选择器表达式属性,这些属性在每个AMQP协议交互作中被评估(发送,发送与接收,收到或接收并回复),解析为lookupKey所提供的值摘要:RoutingConnectionFactory.
你可以使用豆子引用,比如@vHostResolver.getVHost(#root)在表达中。
为发送作中,要发送的消息是根评估对象。
为收到作,该队列名称是根评估对象。
路由算法如下:如果选择函数表达式为零或被评估为零或提供的连接工厂不是摘要:RoutingConnectionFactory,一切按原有方式运作,依赖于所提供的连接工厂实现。
如果评估结果不是,情况也相同零,但没有目标连接工厂为此lookupKey以及摘要:RoutingConnectionFactory配置为宽容后备 = 真.
在摘要:RoutingConnectionFactory,它确实会退回到其路由基于detemineCurrentLookupKey().
然而,如果宽容后备 = false一非法州例外被抛出。
命名空间支持还提供发送连接工厂选择式表达式和接收连接-工厂-选择器表达式属性<兔子:模板>元件。
另外,从1.4版本开始,你可以在监听器容器中配置路由连接工厂。
在这种情况下,队列名称列表作为查找键使用。
例如,如果你配置容器为setQueueNames(“thing1”, “thing2”),查找键为[东西1,东西]”(注意,调中没有空格。)
从1.6.9版本开始,你可以通过使用以下方式在查找键中添加限定符setLookupKeyQualifier在听众容器里。
这样做可以允许,比如监听同名队列,但存在不同的虚拟主机(每个队列都有连接工厂)。
例如,查找键修饰符东西1以及一个监听队列的容器东西2,你可以用来注册目标连接工厂的查找键可以是东西1[thing2].
| 目标连接工厂(如果提供默认)必须有相同的发布确认和返回设置。 详见出版商确认与退货。 |
从2.4.4版本开始,可以禁用该验证功能。
如果你遇到确认和返回之间的值需要不相等的情况,你可以用摘要:RoutingConnectionFactory#setConsistentConfirmsReturns(返回)关闭验证。
注意,第一个连接工厂新增了摘要:RoutingConnectionFactory将决定一般的值证实和返回.
如果你有案件,某些消息需要确认/退货,而其他则不需要,这可能会很有用。 例如:
@Bean
public RabbitTemplate rabbitTemplate() {
final com.rabbitmq.client.ConnectionFactory cf = new com.rabbitmq.client.ConnectionFactory();
cf.setHost("localhost");
cf.setPort(5672);
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(cf);
cachingConnectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
PooledChannelConnectionFactory pooledChannelConnectionFactory = new PooledChannelConnectionFactory(cf);
final Map<Object, ConnectionFactory> connectionFactoryMap = new HashMap<>(2);
connectionFactoryMap.put("true", cachingConnectionFactory);
connectionFactoryMap.put("false", pooledChannelConnectionFactory);
final AbstractRoutingConnectionFactory routingConnectionFactory = new SimpleRoutingConnectionFactory();
routingConnectionFactory.setConsistentConfirmsReturns(false);
routingConnectionFactory.setDefaultTargetConnectionFactory(pooledChannelConnectionFactory);
routingConnectionFactory.setTargetConnectionFactories(connectionFactoryMap);
final RabbitTemplate rabbitTemplate = new RabbitTemplate(routingConnectionFactory);
final Expression sendExpression = new SpelExpressionParser().parseExpression(
"messageProperties.headers['x-use-publisher-confirms'] ?: false");
rabbitTemplate.setSendConnectionFactorySelectorExpression(sendExpression);
}
这样,带有头部的消息x-use-publisher-confirms:真会通过缓存连接发送消息,你可以确保消息的送达。
有关确保消息传递的更多信息,请参见发布确认与退货。
队列亲和力与本地化队列连接工厂
在集群中使用HA队列时,为了获得最佳性能,你可能想连接到物理代理
领先队列所在的位置。
这缓存连接工厂可以配置多个经纪地址。
这是为了切换故障,客户端根据配置尝试连接地址洗牌模式次序。
这本地化队列连接工厂使用管理插件提供的 REST API 来确定队列的主节点。
然后它创建(或从缓存中检索)一个缓存连接工厂它只连接到那个节点。
如果连接失败,新的主节点会被确定,消费者连接到它。
这本地化队列连接工厂配置为默认连接工厂,以防无法确定队列的物理位置,则正常连接到集群。
这本地化队列连接工厂是路由连接工厂以及SimpleMessageListenerContainer如上文《路由连接工厂》中讨论的,使用队列名称作为查找键。
因此(查询时使用队列名称),本地化队列连接工厂只有在容器配置为监听单个队列时才能使用。 |
| 每个节点都必须启用RabbitMQ管理插件。 |
该连接工厂旨在实现长寿命连接,例如用于SimpleMessageListenerContainer.
它不适用于短连接,例如与兔子模板因为在连接前调用 REST API 会带来开销。
此外,发布作的队列是未知的,消息无论如何都会发布给所有集群成员,因此查找节点的逻辑价值不大。 |
以下示例配置展示了如何配置工厂:
@Autowired
private ConfigurationProperties props;
@Bean
public CachingConnectionFactory defaultConnectionFactory() {
CachingConnectionFactory cf = new CachingConnectionFactory();
cf.setAddresses(this.props.getAddresses());
cf.setUsername(this.props.getUsername());
cf.setPassword(this.props.getPassword());
cf.setVirtualHost(this.props.getVirtualHost());
return cf;
}
@Bean
public LocalizedQueueConnectionFactory queueAffinityCF(
@Qualifier("defaultConnectionFactory") ConnectionFactory defaultCF) {
return new LocalizedQueueConnectionFactory(defaultCF,
StringUtils.commaDelimitedListToStringArray(this.props.getAddresses()),
StringUtils.commaDelimitedListToStringArray(this.props.getAdminUris()),
StringUtils.commaDelimitedListToStringArray(this.props.getNodes()),
this.props.getVirtualHost(), this.props.getUsername(), this.props.getPassword(),
false, null);
}
注意前三个参数是地址,adminUris和节点.
它们是位置性的,当容器尝试连接队列时,它会使用管理员 API 来确定队列的主节点,并连接到与该节点在同一数组位置的地址。
从3.0版本开始,RabbitMQhttp-client不再用于访问 Rest API。
相反,默认情况下,Web客户端如果 Spring Webflux 的春季网流位于类路径上;否则Rest模板被使用。 |
补充一点网络流到职业路径:
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
</dependency>
compile 'org.springframework.amqp:spring-rabbit'
你也可以使用其他 REST 技术,实现 LocalizedQueueConnectionFactory#setNodeLocator 并覆盖其createClient,休息呼叫,并且可选地,关闭方法。
lqcf.setNodeLocator(new NodeLocator<MyClient>() {
@Override
public MyClient createClient(String userName, String password) {
...
}
@Override
public Map<String, Object> restCall(MyClient client, String baseUri, String vhost, String queue) throws URISyntaxException {
...
}
});
该框架提供了WebFluxNodeLocator和RestTemplateNodeLocator,默认值如上所述。
出版社确认并回归
确认(带有相关性)和返回消息的支持方式是设置缓存连接工厂属性publisherConfirmType(出版者确认类型)自确认类型。相关性以及出版商回归属性变为“真”。
当这些选项被设置时,渠道工厂创建的实例被包裹在出版商回应频道,用于促进回调。
当获得这样的信道时,客户端可以注册出版商回拨频道。听众其中渠道.
这出版商回应频道实现包含将确认或返回路由到相应监听器的逻辑。
这些特征将在后续章节中进一步解释。
另见相关出版商确认与回报simplePublisher确认在《范围行动》中。
| 更多背景信息,请参阅RabbitMQ团队的博客文章《介绍出版商确认》。 |
连接与频道听众
连接工厂支持注册连接听者和频道听众实现。
这样你就能接收连接和频道相关事件的通知。
(A连接听者被兔子管理员在连接建立时执行声明——更多信息请参见“交换、队列和绑定自动声明”。
以下列表显示了连接听者界面定义:
@FunctionalInterface
public interface ConnectionListener {
void onCreate(Connection connection);
default void onClose(Connection connection) {
}
default void onShutDown(ShutdownSignalException signal) {
}
}
从2.0版本开始,org.springframework.amqp.rabbit.connection.Connection对象可以由 提供com.rabbitmq.client.BlockedListener实例将被通知连接被阻断和解除阻挡事件。
以下示例展示了ChannelListener接口的定义:
@FunctionalInterface
public interface ChannelListener {
void onCreate(Channel channel, boolean transactional);
default void onShutDown(ShutdownSignalException signal) {
}
}
参见《发布异步——如何检测成功与失败》一文,说明你可能想注册频道听众.
Logging Channel 关闭事件
1.5版本引入了一种机制,使用户能够控制日志水平。
这摘要连接工厂使用默认的通道闭合记录策略如下:
-
正常频道关闭(200 OK)不会被记录。
-
如果因被动队列声明失败而关闭通道,则会在调试层级记录该通道。
-
如果通道是闭合的,因为
basic.consume因独家消费者条件被拒绝,记录为 调试级别(自3.1版本起,之前为INFO)。 -
其他所有记录都处于错误级别。
要修改这种行为,你可以注入一个自定义条件异常记录器进入缓存连接工厂在其closeExceptionLogger财产。
另外,还有抽象连接工厂.DefaultChannel关闭日志现在是公开的,允许它被子分类。
另见消费者活动。
运行时缓存属性
从1.6版本开始,缓存连接工厂现在通过getCacheProperties()方法。
这些统计数据可用于优化缓存,使其在生产环境中得到优化。
例如,高水位线可以用来决定缓存大小是否应增加。
如果缓存大小和缓存大小相等,你可能需要考虑进一步增加。
下表描述了缓存模式.CHANNEL性能:
| 属性 | 意义 |
|---|---|
connectionName |
由 |
channelCacheSize |
当前配置的最大允许闲置频道。 |
localPort |
连接的本地端口(如果有的话)。 这可以用来与RabbitMQ管理界面上的连接和通道进行关联。 |
idleChannelsTx |
当前处于空闲(缓存)的事务通道数量。 |
idleChannelsNotTx |
当前处于空闲(缓存)的非事务通道数量。 |
idleChannelsTxHighWater |
同时处于闲置(缓存)事务通道的最大数量。 |
idleChannelsNotTxHighWater |
最大数量的非事务通道同时处于闲置状态(缓存状态)。 |
下表描述了缓存模式.连接性能:
| 属性 | 意义 |
|---|---|
connectionName:<localPort> |
由 |
openConnections |
表示连接到经纪人的连接对象数量。 |
channelCacheSize |
当前配置的最大允许闲置频道。 |
connectionCacheSize |
目前配置的最大允许闲置连接。 |
idleConnections |
目前处于空闲状态的连接数量。 |
idleConnectionsHighWater |
同时处于闲置状态的最大连接数。 |
idleChannelsTx:<localPort> |
当前该连接处于空闲(缓存)事务通道的数量。
你可以使用 |
idleChannelsNotTx:<localPort> |
当前该连接中处于空闲(缓存)的非事务通道数量。
这 |
idleChannelsTxHighWater:<localPort> |
同时处于闲置(缓存)事务通道的最大数量。 属性名称中的localPort部分可用于与RabbitMQ管理界面中的连接和通道对应。 |
idleChannelsNotTxHighWater:<localPort> |
最大数量的非事务通道同时处于闲置状态(缓存状态)。
你可以使用 |
这缓存模式性质 (渠道或连接)也被包含在内。
RabbitMQ 自动连接/拓扑恢复
自 Spring AMQP 第一版以来,该框架就提供了自身的连接和通道恢复功能,以应对代理故障。
此外,正如《配置经纪人》中讨论的,兔子管理员连接重新建立时,重新声明所有基础设施豆(队列等)。
因此,它不依赖于现在由AMQP-client图书馆。
这AMQP-client默认启用了自动恢复功能。
两种恢复机制之间存在一些不兼容之处,因此默认情况下,Spring 会设置自动恢复启用基础上的性质RabbitMQ 连接工厂自false.
即使该财产是trueSpring 通过立即关闭所有恢复的连接,有效地禁用了它。
| 默认情况下,只有被定义为 Beans 的元素(队列、交换、绑定)在连接失败后才会被重新声明。 请参阅恢复自动删除声明,了解如何更改该行为。 |