RabbitMQ是一个常用的消息队列中间件,它能接收并转发消息。在消息通信模型中,系统可分为三部分:生产者,消息服务器和消费者。简单地说,生产者程序产生消息,发布到消息服务器;消费者程序连接到消息服务器,订阅到队列中。每当消息到达特定等队列时,RabbitMQ会将其发送给其中一个订阅/监听的消费者。
在RabbitMQ中,生产者并不是直接将消息交给某个消息队列的,而是将消息发送给交换器,发送时会指定投递的规则,这些规则称为路由键。交换器中最简单等一种就是direct交换器。它是一个以空白字符串为名称等默认交换器。当声明一个队列时,默认绑定到direct交换器。direct交换器的规则非常简单:如果路由键(队列名)匹配的话,消息就被投递到对应等队列。如下图所示
Java客户端
Java是当今流行的服务器应用程序开发语言。RabbitMQ也提供了Java的客户端SDK。用Java实现direct交换器中的生产者示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| package com.wts.helloworld; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; * Created by weitaosheng on 2017/5/1. */ public class Send { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } }
|
创建一个消息生产者等步骤:
- 新建连接工厂;
- 绑定RabbitMQ服务器地址(默认为localhost,端口5672);
- 获取连接;
- 建立频道(channel);
- 声明队列;
- 发布消息。
- 关闭频道;
- 关闭连接。
客户端示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| import com.rabbitmq.client.*; import java.io.IOException; * Created by weitaosheng on 2017/5/1. */ public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for message. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(QUEUE_NAME, true, consumer); } }
|
创建客户端并订阅队列的步骤:
- 新建连接工厂;
- 绑定RabbitMQ服务器地址(默认为localhost,端口5672);
- 获取连接;
- 建立频道(channel);
- 声明队列;
- 声明一个消息消费者,本示例中是用一个匿名内部类来扩展了DefaltConsumer类,并重写了handleDelivery方法,该方法定义了接收到消息时进行等处理。
以下是客户端SDK关键的类和方法:
ConnectionFactory:连接工厂关键类,是客户端访问RabbitMQ服务器必须要先构造的类。从Connection的源代码可以看出,ConnectionFactory没有显示定义构造函数,因此使用的是编译器自动生成的默认构造函数。
获取连接的函数
1 2 3 4
| public Connection newConnection() throws IOException, TimeoutException { return newConnection(this.sharedExecutor, Collections.singletonList(new Address(getHost(), getPort()))); }
|
查看源码,sharedExecutor是一个ExecutorService的实例。此处为null,也就是没有使用线程池。连接时还需要地址和端口。
1
| Channel createChannel() throws IOException;
|
createChannel:生成一个Channel类的实例。Connection是一条真实的TCP连接,Channel是Connection内等一条虚拟连接,它在RabbitMQ中会自动生成唯一的ID。通过一条TCP连接内多个虚拟连接的方式,可以提高性能,节约系统资源,因为TCP连接的创建和销毁是非常昂贵的。
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map arguments) throws IOException;
queueDeclare:声明队列。它的几个参数的含义是:队列名,是否持久化,是否是限制性等队列(仅限于此次连接),是否自动删除消息,其他参数
1
| void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
|
basicPublish:发布消息,几个参数是:交换器名,路由键,是否设置了mandatory参数,消息参数,消息体(用byte数组装载)
Consumer是代表消费者等接口,DefaultConsumer是Consumer接口的一个默认实现。一般定义消费者只需扩展这个类即可。
1 2 3 4
| public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
|
handleDelivery:消费者接收到消息时的回调方法。它的几个参数是:消费者标签,『信封』信息,消息头内容,消息体