该版本仍在开发中,尚未被视为稳定。请使用最新的稳定版本,使用 Spring AMQP 4.0.0!spring-doc.cadn.net.cn

示例应用

春季AMQP样本项目包含两个示例应用。 第一个是一个简单的“Hello World”示例,展示了同步和异步消息接收。 它为理解关键组成部分提供了极好的起点。 第二个示例基于股票交易的用例,展示现实应用中常见的交互类型。 在本章中,我们将快速介绍每个样本,帮助您专注于最重要的组成部分。 这些样本都是基于 Maven 的,所以你应该可以直接导入任何支持 Maven 的集成开发环境(比如 SpringSource Tool Suite)。spring-doc.cadn.net.cn

《Hello World》采样

“Hello World”样本展示了同步和异步消息接收。 你可以导入春兔子世界先在IDE中试用,然后跟随下面的讨论。spring-doc.cadn.net.cn

同步示例

src/main/java目录,导航至该org.springframework.amqp.helloworld包。 打开HelloWorldConfiguration类并注意它包含@Configuration在类级层面做注释,注意一些@Bean方法层面的注释。 这是Spring基于Java配置的一个例子。 你可以在这里了解更多相关内容。spring-doc.cadn.net.cn

以下列表展示了连接工厂的创建过程:spring-doc.cadn.net.cn

@Bean
public CachingConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory =
        new CachingConnectionFactory("localhost");
    connectionFactory.setUsername("guest");
    connectionFactory.setPassword("guest");
    return connectionFactory;
}

该配置还包含一个兔子管理员默认情况下,它会寻找任何类型的 Exchange、队列或绑定的豆子,然后在经纪人上声明它们。 事实上,helloWorldQueue生成于HelloWorldConfiguration是一个例子,因为它是队列.spring-doc.cadn.net.cn

以下列表显示了helloWorldQueue豆子定义:spring-doc.cadn.net.cn

@Bean
public Queue helloWorldQueue() {
    return new Queue(this.helloWorldQueueName);
}

回顾兔子模板豆状配置,你可以看到它的名字为helloWorldQueue作为其队列属性(用于接收消息)和 其路由键属性(用于发送消息)。spring-doc.cadn.net.cn

现在我们已经探索了配置,可以看看实际使用这些组件的代码。 首先,打开制作人同一个包内的类。 它包含一个main()泉水的方法应用上下文被创造出来。spring-doc.cadn.net.cn

以下列表显示了主要方法:spring-doc.cadn.net.cn

public static void main(String[] args) {
    ApplicationContext context =
        new AnnotationConfigApplicationContext(RabbitConfiguration.class);
    AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
    amqpTemplate.convertAndSend("Hello World");
    System.out.println("Sent: Hello World");
}

在前面的例子中,Amqp模板豆子被检索并用于发送消息. 由于客户端代码应尽可能依赖接口,类型为Amqp模板而不是兔子模板. 即使豆子在HelloWorldConfiguration是 的一个实例兔子模板依赖接口意味着该代码更具可移植性(你可以独立于代码更改配置)。 自从......convertAndSend()调用方法,模板委托给其消息转换器实例。 在这种情况下,它使用默认的简易消息转换器但可以为兔子模板豆子,定义如下HelloWorldConfiguration.spring-doc.cadn.net.cn

现在打开消费者类。 它实际上共享相同的配置基类,这意味着它共享兔子模板豆。 这就是为什么我们配置了那个模板,同时也用路由键(用于发送)以及一个队列(接收)。 正如我们在Amqp模板你可以将“routingKey”参数传递给发送方法,“queue”参数传递给接收方法。 这消费者代码基本上是生产者的镜像,调用receiveAndConvert()而不是convertAndSend().spring-doc.cadn.net.cn

以下列表展示了主要的消费者:spring-doc.cadn.net.cn

public static void main(String[] args) {
    ApplicationContext context =
        new AnnotationConfigApplicationContext(RabbitConfiguration.class);
    AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
    System.out.println("Received: " + amqpTemplate.receiveAndConvert());
}

如果你运行制作人然后运行消费者你应该看看收到:Hello World在控制台输出中。spring-doc.cadn.net.cn

异步示例

同步示例解析了同步的Hello World采样。 本节介绍了一个稍先进但明显更强大的选项。 经过一些修改,Hello World 样本可以提供异步接收的一个例子,也称为消息驱动的 POJO。 事实上,有一个子包恰好提供了:org.springframework.amqp.samples.helloworld.async.spring-doc.cadn.net.cn

同样,我们从发送端开始。 打开ProducerConfiguration类并注意它生成了一个连接工厂以及一个兔子模板豆。 这次,由于配置是专门用于消息发送端的,我们甚至不需要任何队列定义,且兔子模板只有“routingKey”属性集合。 请记住,消息是发送到交换机,而不是直接发送到队列。 AMQP默认交易所是一个没有名称的直接交易所。 所有队列都绑定在该默认交换机上,其名称作为路由密钥。 这就是为什么我们只需要提供路由密钥。spring-doc.cadn.net.cn

以下列表显示了兔子模板定义:spring-doc.cadn.net.cn

public RabbitTemplate rabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(connectionFactory());
    template.setRoutingKey(this.helloWorldQueueName);
    return template;
}

由于该示例展示了异步消息接收,生产端被设计成持续发送消息(如果它是像同步版本那样的每次执行消息模型,就不会那么明显地表明它实际上是一个消息驱动的消费者)。 负责持续发送消息的组件被定义为ProducerConfiguration. 它设置为每三秒运行一次。spring-doc.cadn.net.cn

以下列表展示了该组件:spring-doc.cadn.net.cn

static class ScheduledProducer {

    @Autowired
    private volatile RabbitTemplate rabbitTemplate;

    private final AtomicInteger counter = new AtomicInteger();

    @Scheduled(fixedRate = 3000)
    public void sendMessage() {
        rabbitTemplate.convertAndSend("Hello World " + counter.incrementAndGet());
    }
}

你不需要理解所有细节,因为真正的重点应该放在接收方(我们接下来会讲)。 不过,如果你还不熟悉春季任务调度支持,可以在这里了解更多。 简而言之,后处理器豆子在ProducerConfiguration向调度器注册任务。spring-doc.cadn.net.cn

现在我们可以转向接收方。 为了强调消息驱动的POJO行为,我们从响应消息的组件开始。 该类称为你好世界处理员并见下表:spring-doc.cadn.net.cn

public class HelloWorldHandler {

    public void handleMessage(String text) {
        System.out.println("Received: " + text);
    }

}

那门课是个POJO。 它不扩展任何基类,不实现任何接口,甚至不包含任何导入。 它正在“适应”以适应消息监听器Spring AMQP 接口MessageListener适配器. 然后你可以在SimpleMessageListenerContainer. 对于这个样本,容器是在消费者配置类。 你可以看到POJO包裹在适配器里。spring-doc.cadn.net.cn

以下列表展示了listenerContainer定义如下:spring-doc.cadn.net.cn

@Bean
public SimpleMessageListenerContainer listenerContainer() {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory());
    container.setQueueName(this.helloWorldQueueName);
    container.setMessageListener(new MessageListenerAdapter(new HelloWorldHandler()));
    return container;
}

SimpleMessageListenerContainer是 Spring 生命周期组件,默认情况下会自动启动。 如果你看消费者你可以看到main()方法仅仅包含一个单行引导工具来创建应用上下文. 制片人main()方法也是单行引导法,因为其方法被注释为@Scheduled而且会自动开始。 你可以开始制作人消费者顺序不管,你应该每三秒看到一次消息的发送和接收。spring-doc.cadn.net.cn

股票交易

股票交易样本展示了比Hello World样本更高级的消息传递场景。 不过,配置非常相似,只是更复杂一些。 既然我们详细讲解了 Hello World 的配置,这里重点讲述这个样本的不同之处。 有一个服务器会将市场数据(股票报价)推送到话题交易所。 然后,客户端可以通过绑定队列与路由模式来订阅市场数据流(例如,应用。股票。报价。纳斯达克。*). 演示的另一个主要特点是由客户端发起,服务器处理的请求-回复“股票交易”交互。 那是个列兵回复队列由客户端在订单请求消息中发送。spring-doc.cadn.net.cn

服务器的核心配置是RabbitServerConfigurationorg.springframework.amqp.rabbit.stocks.config.server包。 它扩展了摘要StockAppRabbitConfiguration. 这就是服务器和客户端共同资源的定义,包括市场数据主题交换(名称为“app.stock.marketdata”)以及服务器为股票交易暴露的队列(名称为“app.stock.request”)。 在那个常见的配置文件中,你还会看到Jackson2JsonMessageConverter配置在兔子模板.spring-doc.cadn.net.cn

服务器专用配置包含两部分。 首先,它配置市场数据交换兔子模板这样它就不需要在每次通话中提供该交换名称来发送消息. 它在基础配置类定义的抽象回调方法内实现这一点。 以下列表展示了该方法:spring-doc.cadn.net.cn

public void configureRabbitTemplate(RabbitTemplate rabbitTemplate) {
    rabbitTemplate.setExchange(MARKET_DATA_EXCHANGE_NAME);
}

其次,声明库存请求队列。 在这种情况下,它不需要任何显式绑定,因为它绑定在默认的无名交换中,路由密钥是其自身名称。 如前所述,AMQP规范定义了这种行为。 以下列表展示了stockRequestQueue豆:spring-doc.cadn.net.cn

@Bean
public Queue stockRequestQueue() {
    return new Queue(STOCK_REQUEST_QUEUE_NAME);
}

现在你已经看到服务器AMQP资源的配置,请导航到以下内容。org.springframework.amqp.rabbit.stocks包裹在SRC/测试/Java目录。 在那里,你可以看到实际的服务器提供main()方法。 它创造了应用上下文基于server-bootstrap.xml配置文件。 在那里,你可以看到发布虚拟市场数据的计划任务。 这种配置依赖于斯普林的任务命名空间支持。 引导配置文件还会导入其他几个文件。 最有趣的是server-messaging.xml,直接在src/主/资源. 在那里,你可以看到messageListenerContainer负责处理股票交易请求的豆子。 最后,看看服务器处理程序定义为server-handlers.xml(这也在“src/main/resources”里)。 那颗豆子是ServerHandler类和 是一个很好的消息驱动 POJO 示例,同时也能发送回复消息。 注意它本身并未与框架或任何AMQP概念相关联。 它接受贸易请求并返回TradeResponse(交易响应). 以下列表展示了handleMessage方法:spring-doc.cadn.net.cn

public TradeResponse handleMessage(TradeRequest tradeRequest) { ...
}

现在我们已经看到了服务器最重要的配置和代码,可以转向客户端了。 最好的起点可能是RabbitClientConfigurationorg.springframework.amqp.rabbit.stocks.config.client包。 注意它声明两个队列但未提供明确名称。 以下列表展示了两个队列的豆子定义:spring-doc.cadn.net.cn

@Bean
public Queue marketDataQueue() {
    return amqpAdmin().declareQueue();
}

@Bean
public Queue traderJoeQueue() {
    return amqpAdmin().declareQueue();
}

这些是私有队列,唯一名称会自动生成。 第一个生成的队列被客户端用来绑定服务器已公开的市场数据交换。 回想一下,在AMQP中,消费者与队列互动,而生产者与交易所互动。 队伍与交换机的“绑定”是指示经纪人将消息从某个交换机构传递(或路由)到队列的机制。 由于市场数据交换是主题交换,绑定可以用路由模式来表示。 这RabbitClientConfiguration通过捆绑对象,而该对象由装订构建器流利API。 以下列表显示了捆绑:spring-doc.cadn.net.cn

@Value("${stocks.quote.pattern}")
private String marketDataRoutingKey;

@Bean
public Binding marketDataBinding() {
    return BindingBuilder.bind(
        marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey);
}

注意实际值已被外部化到一个属性文件中(client.propertiessrc/主/资源),并且我们使用 Spring 的@Value注释以注入该值。 这通常是个好主意。 否则,该值将被硬编码在类中,且无法修改,除非重新编译。 在这种情况下,运行多个客户端版本的同时修改绑定所用的路由模式会容易得多。 我们现在可以试试。spring-doc.cadn.net.cn

从跑步开始org.springframework.amqp.rabbit.stocks.Server然后org.springframework.amqp.rabbit.stocks.Client. 你应该看看纳斯达克股票,因为在client.properties中与“stocks.quote.pattern”键关联的当前价值是“app.stock.quotes.nasdaq”。'. 现在,在保留现有的同时服务器客户端运行中,将该房产价值改为“App.Stock.Quotes.NYSE”。然后开始第二个客户端实例。 你应该看到第一个客户仍然收到纳斯达克报价,而第二个客户则收到纽约证券交易所报价。 你可以改动模式,获取所有股票甚至单个股票代码。spring-doc.cadn.net.cn

我们最后一个探索的功能是从客户视角进行请求-回复交互。 回想一下,我们已经看到了ServerHandler接受贸易请求对象与返回TradeResponse(交易响应)对象。 对应的代码客户端边是RabbitStockServiceGatewayorg.springframework.amqp.rabbit.stocks.gateway包。 它委托给兔子模板为了传递信息。 以下列表显示了发送方法:spring-doc.cadn.net.cn

public void send(TradeRequest tradeRequest) {
    getRabbitTemplate().convertAndSend(tradeRequest, new MessagePostProcessor() {
        public Message postProcessMessage(Message message) throws AmqpException {
            message.getMessageProperties().setReplyTo(new Address(defaultReplyToQueue));
            try {
                message.getMessageProperties().setCorrelationId(
                    UUID.randomUUID().toString().getBytes("UTF-8"));
            }
            catch (UnsupportedEncodingException e) {
                throw new AmqpException(e);
            }
            return message;
        }
    });
}

注意,在发送消息之前,它设置了回复地址。 它提供了由traderJoeQueueBeans定义(如前所示)。 以下列表显示了@Bean定义StockServiceGateway职业本身:spring-doc.cadn.net.cn

@Bean
public StockServiceGateway stockServiceGateway() {
    RabbitStockServiceGateway gateway = new RabbitStockServiceGateway();
    gateway.setRabbitTemplate(rabbitTemplate());
    gateway.setDefaultReplyToQueue(traderJoeQueue());
    return gateway;
}

如果你已经不再运行服务器和客户端,现在就启动它们。 试着发送一个格式为“100 TCKR”的请求。 经过一段模拟“处理”请求的短暂人为延迟后,你应该会看到客户端出现确认消息。spring-doc.cadn.net.cn

从非Spring应用接收JSON文件

Spring 应用程序在发送 JSON 时,设置TypeID将 header 映射到完全限定的类名中,以帮助接收应用程序将 JSON 转换回 Java 对象。spring-doc.cadn.net.cn

Spring-rabbit-json示例介绍了将JSON从非Spring应用转换的几种技术。spring-doc.cadn.net.cn