该版本仍在开发中,尚未被视为稳定。请使用最新的稳定版本,使用 Spring AMQP 4.0.0!spring-doc.cadn.net.cn

使用 RabbitMQ 流插件

添加春兔溪对项目的依赖性:spring-doc.cadn.net.cn

<dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-rabbit-stream</artifactId>
  <version>4.0.1-SNAPSHOT</version>
</dependency>
compile 'org.springframework.amqp:spring-rabbit-stream:4.0.1-SNAPSHOT'

你可以像平常一样配置队列,使用兔子管理员Bean,使用QueueBuilder.stream()用来指定队列类型的方法。 例如:spring-doc.cadn.net.cn

@Bean
Queue stream() {
    return QueueBuilder.durable("stream.queue1")
            .stream()
            .build();
}

不过,这只有在你同时使用非流组件时才有效(例如SimpleMessageListenerContainerDirectMessageListenerContainer)因为管理员在打开AMQP连接时会被触发声明定义的豆子。 如果你的应用只使用流组件,或者你想使用高级流配置功能,你应该配置一个StreamAdmin相反:spring-doc.cadn.net.cn

@Bean
StreamAdmin streamAdmin(Environment env) {
    return new StreamAdmin(env, sc -> {
        sc.stream("stream.queue1").maxAge(Duration.ofHours(2)).create();
        sc.stream("stream.queue2").create();
    });
}

有关StreamCreator.spring-doc.cadn.net.cn

发送消息

兔子流模板提供了兔子模板(AMQP)功能。spring-doc.cadn.net.cn

兔子流运营
public interface RabbitStreamOperations extends AutoCloseable {

	CompletableFuture<Boolean> send(Message message);

	CompletableFuture<Boolean> convertAndSend(Object message);

	CompletableFuture<Boolean> convertAndSend(Object message, @Nullable MessagePostProcessor mpp);

	CompletableFuture<Boolean> send(com.rabbitmq.stream.Message message);

	MessageBuilder messageBuilder();

	MessageConverter messageConverter();

	StreamMessageConverter streamMessageConverter();

	@Override
	void close() throws AmqpException;

}

兔子流模板实现具有以下构造函数和属性:spring-doc.cadn.net.cn

兔子流模板
public RabbitStreamTemplate(Environment environment, String streamName) {
}

public void setMessageConverter(MessageConverter messageConverter) {
}

public void setStreamConverter(StreamMessageConverter streamConverter) {
}

public void setProducerCustomizer(ProducerCustomizer producerCustomizer) {
}

消息转换器用于convertAndSend将对象转换为 Spring AMQP 的方法消息.spring-doc.cadn.net.cn

流消息转换器用于从Spring AMQP转换消息连接到本地溪流消息.spring-doc.cadn.net.cn

你也可以发送原生流消息直接 s 的;其中messageBuilder()提供访问制作人的消息生成器。spring-doc.cadn.net.cn

制作定制器提供了一种机制,可以在生产者建造前进行定制。spring-doc.cadn.net.cn

请参阅 Java 客户端文档中关于定制环境制作人.spring-doc.cadn.net.cn

接收消息

异步消息接收由StreamListenerContainer(以及StreamRabbitListenerContainerFactory使用@RabbitListener).spring-doc.cadn.net.cn

监听器容器需要环境以及一个流域名称。spring-doc.cadn.net.cn

你可以选择获得春季AMQP消息S使用经典车型消息监听器或者你也可以接收原生流消息S使用新界面:spring-doc.cadn.net.cn

public interface StreamMessageListener extends MessageListener {

	void onStreamMessage(Message message, Context context);

}

有关支持属性的信息,请参见消息监听器容器配置spring-doc.cadn.net.cn

类似模板,容器有消费者定制器财产。spring-doc.cadn.net.cn

请参阅 Java 客户端文档中关于定制环境消费者.spring-doc.cadn.net.cn

使用@RabbitListener,配置一个StreamRabbitListenerContainerFactory;此时,大多数@RabbitListener性质(并发,等等)被忽略。只身份证,队列,自动启动集装箱工厂是支持的。 另外队列只能包含一个流名。spring-doc.cadn.net.cn

例子

@Bean
RabbitStreamTemplate streamTemplate(Environment env) {
    RabbitStreamTemplate template = new RabbitStreamTemplate(env, "test.stream.queue1");
    template.setProducerCustomizer((name, builder) -> builder.name("test"));
    return template;
}

@Bean
RabbitListenerContainerFactory<StreamListenerContainer> rabbitListenerContainerFactory(Environment env) {
    return new StreamRabbitListenerContainerFactory(env);
}

@RabbitListener(queues = "test.stream.queue1")
void listen(String in) {
    ...
}

@Bean
RabbitListenerContainerFactory<StreamListenerContainer> nativeFactory(Environment env) {
    StreamRabbitListenerContainerFactory factory = new StreamRabbitListenerContainerFactory(env);
    factory.setNativeListener(true);
    factory.setConsumerCustomizer((id, builder) -> {
        builder.name("myConsumer")
                .offset(OffsetSpecification.first())
                .manualTrackingStrategy();
    });
    return factory;
}

@RabbitListener(id = "test", queues = "test.stream.queue2", containerFactory = "nativeFactory")
void nativeMsg(Message in, Context context) {
    ...
    context.storeOffset();
}

@Bean
Queue stream() {
    return QueueBuilder.durable("test.stream.queue1")
            .stream()
            .build();
}

@Bean
Queue stream() {
    return QueueBuilder.durable("test.stream.queue2")
            .stream()
            .build();
}

2.4.5 版本增加了建议链属性StreamListenerContainer(以及它的工厂)。 还提供了新的工厂豆,用于创建无状态重试拦截器,并可选地StreamMessageRecoveryer用于接收原始流消息。spring-doc.cadn.net.cn

@Bean
public StreamRetryOperationsInterceptorFactoryBean sfb(RetryTemplate retryTemplate) {
    StreamRetryOperationsInterceptorFactoryBean rfb =
            new StreamRetryOperationsInterceptorFactoryBean();
    rfb.setRetryOperations(retryTemplate);
    rfb.setStreamMessageRecoverer((msg, context, throwable) -> {
        ...
    });
    return rfb;
}
该容器不支持有状态重试。

超级流

超级流是一种抽象的分区流概念,通过将若干流队列绑定到带有参数的交换节点实现X-超级流:真.spring-doc.cadn.net.cn

供应

为了方便,可以通过定义一个类型的 的单个豆来配置超级流超级流.spring-doc.cadn.net.cn

@Bean
SuperStream superStream() {
    return new SuperStream("my.super.stream", 3);
}

兔子管理员检测到该豆子并将声明交换(我的.超级.stream。)和3个队列(分区)——我的.超级流-n哪里n0,1,2,绑定路由键为n.spring-doc.cadn.net.cn

如果您也希望通过AMQP向交换机发布,可以提供自定义路由密钥:spring-doc.cadn.net.cn

@Bean
SuperStream superStream() {
    return new SuperStream("my.super.stream", 3, (q, i) -> IntStream.range(0, i)
					.mapToObj(j -> "rk-" + j)
					.collect(Collectors.toList()));
}

密钥数量必须等于分区数。spring-doc.cadn.net.cn

制作至超级流

你必须加上一个superStreamRoutingFunction前往兔子流模板:spring-doc.cadn.net.cn

@Bean
RabbitStreamTemplate streamTemplate(Environment env) {
    RabbitStreamTemplate template = new RabbitStreamTemplate(env, "stream.queue1");
    template.setSuperStreamRouting(message -> {
        // some logic to return a String for the client's hashing algorithm
    });
    return template;
}

你也可以通过AMQP发布,使用以下内容兔子模板.spring-doc.cadn.net.cn

用单一活跃用户观看超级流

调用超级流在监听器容器上的方法,使超级流中启用单个活跃消费者。spring-doc.cadn.net.cn

@Bean
StreamListenerContainer container(Environment env, String name) {
    StreamListenerContainer container = new StreamListenerContainer(env);
    container.superStream("ss.sac", "myConsumer", 3); // concurrency = 3
    container.setupMessageListener(msg -> {
        ...
    });
    container.setConsumerCustomizer((id, builder) -> builder.offset(OffsetSpecification.last()));
    return container;
}
此时,当并发大于1时,实际并发进一步由环境;为了实现完全并发,设置环境的maxConsumersByConnection(连接方式)1. 参见配置环境

微米观测

自3.0.5版本起,支持使用Micrometer进行观测,兔子流模板以及流的听众容器。 容器现在还支持微米计时器(当未启用观测时)。spring-doc.cadn.net.cn

设置观察启用每个组件以实现观察;这将使微米计时器失效,因为计时器将根据每次观测被管理。 使用带注释的监听器时,设置观察启用关于集装箱工厂。spring-doc.cadn.net.cn

更多信息请参见“微尺追踪”。spring-doc.cadn.net.cn

要给定时器/追踪添加标签,请自定义配置RabbitStreamTemplateObservationConventionRabbitStreamListenerObservationConvention分别是模板或监听器容器。spring-doc.cadn.net.cn

默认实现会添加名称模板观察和listener.id容器标签。spring-doc.cadn.net.cn

你可以选择任一子职业DefaultRabbitStreamTemplateObservationConventionDefaultStreamRabbitListenerObservationConvention或者提供全新的实现。spring-doc.cadn.net.cn

更多详情请参见“测距观测文档”。spring-doc.cadn.net.cn