Contents
  1. 1. Java客户端

RabbitMQ是一个常用的消息队列中间件,它能接收并转发消息。在消息通信模型中,系统可分为三部分:生产者,消息服务器和消费者。简单地说,生产者程序产生消息,发布到消息服务器;消费者程序连接到消息服务器,订阅到队列中。每当消息到达特定等队列时,RabbitMQ会将其发送给其中一个订阅/监听的消费者。
在RabbitMQ中,生产者并不是直接将消息交给某个消息队列的,而是将消息发送给交换器,发送时会指定投递的规则,这些规则称为路由键。交换器中最简单等一种就是direct交换器。它是一个以空白字符串为名称等默认交换器。当声明一个队列时,默认绑定到direct交换器。direct交换器的规则非常简单:如果路由键(队列名)匹配的话,消息就被投递到对应等队列。如下图所示

Java客户端

Java是当今流行的服务器应用程序开发语言。RabbitMQ也提供了Java的客户端SDK。用Java实现direct交换器中的生产者示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.wts.helloworld;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* Created by weitaosheng on 2017/5/1.
*/
public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
}

创建一个消息生产者等步骤:

  1. 新建连接工厂;
  2. 绑定RabbitMQ服务器地址(默认为localhost,端口5672);
  3. 获取连接;
  4. 建立频道(channel);
  5. 声明队列;
  6. 发布消息。
  7. 关闭频道;
  8. 关闭连接。

客户端示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* Created by weitaosheng on 2017/5/1.
*/
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for message. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}

创建客户端并订阅队列的步骤:

  1. 新建连接工厂;
  2. 绑定RabbitMQ服务器地址(默认为localhost,端口5672);
  3. 获取连接;
  4. 建立频道(channel);
  5. 声明队列;
  6. 声明一个消息消费者,本示例中是用一个匿名内部类来扩展了DefaltConsumer类,并重写了handleDelivery方法,该方法定义了接收到消息时进行等处理。

以下是客户端SDK关键的类和方法:
ConnectionFactory:连接工厂关键类,是客户端访问RabbitMQ服务器必须要先构造的类。从Connection的源代码可以看出,ConnectionFactory没有显示定义构造函数,因此使用的是编译器自动生成的默认构造函数。
获取连接的函数

1
2
3
4
public Connection newConnection() throws IOException, TimeoutException {
return newConnection(this.sharedExecutor, Collections.singletonList(new Address(getHost(), getPort())));
}

查看源码,sharedExecutor是一个ExecutorService的实例。此处为null,也就是没有使用线程池。连接时还需要地址和端口。

1
Channel createChannel() throws IOException;

createChannel:生成一个Channel类的实例。Connection是一条真实的TCP连接,Channel是Connection内等一条虚拟连接,它在RabbitMQ中会自动生成唯一的ID。通过一条TCP连接内多个虚拟连接的方式,可以提高性能,节约系统资源,因为TCP连接的创建和销毁是非常昂贵的。
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map arguments) throws IOException;
queueDeclare:声明队列。它的几个参数的含义是:队列名,是否持久化,是否是限制性等队列(仅限于此次连接),是否自动删除消息,其他参数

1
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;

basicPublish:发布消息,几个参数是:交换器名,路由键,是否设置了mandatory参数,消息参数,消息体(用byte数组装载)

Consumer是代表消费者等接口,DefaultConsumer是Consumer接口的一个默认实现。一般定义消费者只需扩展这个类即可。

1
2
3
4
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)

handleDelivery:消费者接收到消息时的回调方法。它的几个参数是:消费者标签,『信封』信息,消息头内容,消息体

Contents
  1. 1. Java客户端