Dubbo的同步调用

使用Dubbo很长时间了,基本使用的都是Dubbo的同步调用方式。Dubbo默认使用的是协议格式Dubbo协议,序列化方式是Hessian,传输层使用的是Tcp协议,默认使用netty框架。

Dubbo的同步调用是指,调用一个远程服务时,发起调用这个线程会一直阻塞(至少从使用者的感觉来说是这样的)到远程服务返回结果,或是超时为止。

那么Dubbo是如何实现这种同步调用机制的呢?我们知道RPC的本质是用TCP协议向另一个进程(可能在同一台机器上,也可能不在)发送请求,服务端进程接收到请求后,将请求解析为对本地方法的一个调用,调用之后将结果返回给客户端。客户端接收到返回之后,再将结果反序列化为需要的结果,返回给上层的业务。

Dubbo同步调用在客户端的实现

Dubbo的整个调用流程是:
1.服务提供者导出服务到本机的IP和端口上,即是建立一个TCP服务器;
2.服务提供者发布服务到注册中心;
3.服务消费者向注册中心订阅服务提供者的信息;
4.服务消费者建立远程服务的代理,并建立一个TCP客户端。

以上几步分别是服务提供者和服务消费者启动时做的事情,这之后消费者和提供者之间就建立了TCP长连接,实际调用服务时是无需经过注册中心的,直接通过TCP连接完成。

下面就来看一下消费者是如何发起到提供者的请求,以及如何实现对上层业务而言的同步调用,以及超时机制等。

Dubbo消费者在发起调用之后,经过重重阻隔,经历了各种包装的代理类之后(暂时忽略集群容错和负载均衡等),最终会到达下面这个类:

1
2
3
4
5
6
public class DubboInvoker<T> extends AbstractInvoker<T> {
// 省略大量代码
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
//省略大量代码
return (Result) currentClient.request(inv, timeout).get();

最后这个return语句返回的就是最终调用返回的结果。再进入这个request方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final class HeaderExchangeChannel implements ExchangeChannel {
// 省略大量代码
@Override
public ResponseFuture request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// create request.
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = new DefaultFuture(channel, req, timeout);
try {
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}

我们看到,在这个方法中,先是创建了发送给提供者服务的请求request,之后创建了一个DefaultFuture的类。最好就调用channel将请求发送出去(此处的channel是dubbo定义的一个类的实例,但内部包装了netty的channel)。

我们看到,调用channel的send方法发送请求,但这个方法并没有返回值。而是在下方直接返回了这个DefaultFuture类的实例。

再回到上面DubboInvoke中,在request返回了DefaultFuture实例之后,就调用get方法去获取调用结果。因此再看get方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Override
public Object get() throws RemotingException {
return get(timeout);
}
@Override
public Object get(int timeout) throws RemotingException {
if (timeout <= 0) {
timeout = Constants.DEFAULT_TIMEOUT;
}
if (!isDone()) {
long start = System.currentTimeMillis();
lock.lock();
try {
while (!isDone()) {
done.await(timeout, TimeUnit.MILLISECONDS);
if (isDone() || System.currentTimeMillis() - start > timeout) {
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
if (!isDone()) {
throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
}
}
return returnFromResponse();
}

到这个方法中就清除了,首先判断一下调用是否有返回结果,如果还没有返回,就先记录一下当前时间,之后用一个可重入锁将代码块加锁。之后在一个while循环条件中判断调用是否有返回。若没有返回就直接将线程进入等待状态,只有另一个线程向它发信号,才能唤醒它继续执行。唤醒之后,再次判断是否有返回,或是已经超时,则跳出循环。之后判断若没有返回,则已经超时。若有返回结果,则正常返回结果。

那么哪里有一个线程会来唤醒这个调用线程呢?

答案就是netty提供的机制,netty客户端在发送请求给服务端,当有结果返回时,会触发事先注册的处理器,将请求通知上层调用。最终会调用到DefaultFuture中的received方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public static void received(Channel channel, Response response) {
try {
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
future.doReceived(response);
} else {
logger.warn("The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
+ ", response " + response
+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()
+ " -> " + channel.getRemoteAddress()));
}
} finally {
CHANNELS.remove(response.getId());
}
}
private void doReceived(Response res) {
lock.lock();
try {
response = res;
if (done != null) {
done.signal();
}
} finally {
lock.unlock();
}
if (callback != null) {
invokeCallback(callback);
}
}

首先是获取当前调用的DefaultFuture实例,之后调用它的doReceived方法。这个方法先加了锁,之后将返回结果赋给DefaultFuture的response(这个response是DefaultFuture的一个实例属性,也就是上层业务要获取的结果)。之后就是上面说的向线程发信号,唤醒等待中的调用线程。

以上就是Dubbo将底层的netty的异步调用转为同步调用。还有一个问题是dubbo是如何实现接口调用超时的。

答案在下面:

1
2
3
4
5
6
7
8
9
10
public class DefaultFuture implements ResponseFuture {
private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>();
static {
Thread th = new Thread(new RemotingInvocationTimeoutScan(), "DubboResponseTimeoutScanTimer");
th.setDaemon(true);
th.start();
}

DefaultFuture中用了一个FUTURES的map来保存每个具体调用的DefaultFuture实例。它的key是每次调用的id,这个id就是用来区分不同的调用的,每个请求都不一样。之后在初始化这个类时,启动一个守护线程,它就是用来实现超时的。它的run方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private static class RemotingInvocationTimeoutScan implements Runnable {
@Override
public void run() {
while (true) {
try {
for (DefaultFuture future : FUTURES.values()) {
if (future == null || future.isDone()) {
continue;
}
if (System.currentTimeMillis() - future.getStartTimestamp() > future.getTimeout()) {
// create exception response.
Response timeoutResponse = new Response(future.getId());
// set timeout status.
timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);
timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));
// handle response.
DefaultFuture.received(future.getChannel(), timeoutResponse);
}
}
Thread.sleep(30);
} catch (Throwable e) {
logger.error("Exception when scan the timeout invocation of remoting.", e);
}
}
}
}

逻辑很简单,就是每30毫秒轮询一次所有的DefaultFuture,若future已经有返回结果,则跳过;若没有返回,则计算一下当前时间是否超时,若超时,则构造一个超时的返回结果,之后调用上面提到的DefaultFuture的received方法。

总结

以上就是Dubbo同步调用在消费者端的大体实现。主要就是用了一个锁来将调用线程阻塞起来,之后netty接收到返回结果之后,会发信号唤醒调用线程。或是超时,也会返回一个超时的结果,并唤醒调用线程。这样就实现了对上层业务来说的同步调用。
因为Dubbo的流程非常复杂,因此本篇文章限于篇幅,无法将它的上下游流程勾勒得很清楚。只是提纲挈领地写出关键之处。对于整体流程的把握,还是需要自己去研究代码才行。

从一个问题说起

假设有4个方法,可以获取四个值,这四个值可以用来构造一个用户的实例。如果要写一个方法,并发获取这4个值。

这个题目应该用Future来做。下面是我模拟的程序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
/**
* 服务类,有4个方法
*/
class Service {
public int get1() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
}
public int get2() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 2;
}
public int get3() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 3;
}
public int get4() {
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 4;
}
}
public class FutureTaskTest {
public static void main(String[] args) {
Service service = new Service();
/** 记录开始执行时间 */
long start = System.currentTimeMillis();
/**
* 用4个FutureTask来并行执行service的4个方法, 执行完之后一并
* 获取执行结果
*/
FutureTask<Integer> futureTask1 = new FutureTask<>(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return service.get1();
}
});
Thread thread1 = new Thread(futureTask1);
thread1.start();
FutureTask<Integer> futureTask2 = new FutureTask<>(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return service.get2();
}
});
Thread thread2 = new Thread(futureTask2);
thread2.start();
FutureTask<Integer> futureTask3 = new FutureTask<>(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return service.get3();
}
});
Thread thread3 = new Thread(futureTask3);
thread3.start();
FutureTask<Integer> futureTask4 = new FutureTask<>(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return service.get4();
}
});
Thread thread4 = new Thread(futureTask4);
thread4.start();
try {
int a = futureTask1.get();
int b = futureTask2.get();
int c = futureTask3.get();
int d = futureTask4.get();
long end = System.currentTimeMillis();
System.out.println(end - start);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}

执行结果,打印执行时间为4007毫秒,与预期相同,也就是并行执行4个任务中耗时最长的一个。

下面就来分析FutureTask的原理。

FutureTask的使用

Java中,使用线程执行任务一般实现Runnable接口。它只有一个run方法,没有返回值。因此当主线程需要用一个子线程去执行某个任务,并返回某个结果时,必须采用其他的方法。比如在Thread中添加一个实例,然后运行run方法时去修改这个值。但这有一个问题,就是主线程并不知道子线程何时执行结束。因此又引出其他的问题。显然仅靠Runnable并不是解决这类问题的好方法。

因此,Java提供了Callable接口,它代表一个能返回执行结果的任务。它的定义如下:

1
2
3
4
5
6
7
8
9
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}

这个接口有一个参数类型V,同时只有一个方法call。当call方法执行完之后,会返回一个V类型的实例。

但是我们遍览jdk的类库,发现并没有一个public的类是实现了Callable接口的。因此它是一个完全由用户实现的接口,必须搭配其他的接口和类才能使用。

于是java又提供了Future接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public interface Future<V> {
/**
* 取消任务执行,参数代表如果任务已经开始执行,是否要中断
*/
boolean cancel(boolean mayInterruptIfRunning);
/**
* 判断任务是否已经取消
*/
boolean isCancelled();
/**
* 判断任务是否执行完成
*/
boolean isDone();
/**
* 获取线程执行的结果
*/
V get() throws InterruptedException, ExecutionException;
/**
* 获取线程执行结果,并设置一个超时时间,若超过了则抛出异常
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

这个接口的注释说:这个接口代表了一次异步计算的结果。所谓异步计算,也就是代表一个线程。

FutureTask源码解析

通过上面的例子,我们可以通过推理和验证的方式来研究FutureTask的源码实现。首先FutureTask能作为参数传递给Thread类来实例化一个线程,说明它实现了Runnable接口。同时它也实现了Future接口,否则我们这篇文章也不会写它了。

于是我们看FutureTask的定义头:

1
public class FutureTask<V> implements RunnableFuture<V> {

而RunnableFuture的定义头:

1
public interface RunnableFuture<V> extends Runnable, Future<V>

果如所料。

再从上面的例子中可以看出,FutureTask的构造方法的参数是一个Callable接口。那说明它内部就是通过一个Callable实例来执行具体的逻辑的,并且Callable的实现类是由用户提供的。FutureTask的构造方法如下:

1
2
3
4
5
6
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}

从构造方法可以看出,FutureTask还有一个实例属性是state,并且在构造方法中被初始化为NEW。这个属性和可能的值定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 任务运行状态,初始化为NEW,这个状态变量只会在set方法、setException和cancel方法中被转化为终态
*
* Possible state transitions:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;

详细的状态转化在下面方法中介绍。

构造了FutureTask之后,就需要用一个Thread去执行它。Thread执行它持有的Runnable实例的run方法。那么我们看一下FutureTask的run方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public void run() {
// 若state不为NEW,或者不能将执行此FutureTask的线程赋值给FutureTask的Thread属性,则放弃执行
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
// 再次判断state必须为NEW才执行
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 执行Callable方法,获取结果
result = c.call();
ran = true;
} catch (Throwable ex) {
// 捕获异常时处理异常
result = null;
ran = false;
setException(ex);
}
// 执行正常结束时设置结果
if (ran)
set(result);
}
} finally {
// runner在将state设置为能阻止对run的并发调用前必须保持非null的值
runner = null;
// 再次读取state,若为正在中断中,或已中断,则进行相应处理
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}

进入run方法时,用了Oracle的私有api Unsafe的方法compareAndSwapObject来线程安全地将FutureTask持有的Thread实例初始化为当前执行的线程(也就是被主线程开启的子线程本身)。这个方法采用了CAS(Compare and Swap),也就是要修改的参数必须符合预期的值才能给它赋新值。这里是为了确保只有一个子线程能执行FutureTask,以避免并发执行带来的问题。

当执行完之后,无论是正常还是捕获到异常,都需要将runner置为null。而再次读state,则是若runner被中断了,需要进行相应处理。具体处理在后面介绍。

下面我们看get方法。get方法的作用是获取执行的结果。在看代码之前,我们可以设想,虽然FutureTask的执行只能由一个线程来完成。但是应该是允许多个线程来并发获取结果的。那它必然有某种数据结构来保存读取结果的线程。说这么多,其实这种数据结构就是WaitNode:

1
2
3
4
5
6
7
8
9
10
11
12
13
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;
/**
* 记录等待线程(即是读取执行结果的线程)的简单链表
*
*/
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}

get方法:

1
2
3
4
5
6
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}

首先判断state,若处于完成中,或者NEW。则调用awaitDone。否则直接调用report返回结果。

在我们的例子中看到,当在主线程调用FutureTask的get的时候,会一直等待到子线程将任务执行完成,再将结果返回给主线程。这个等待过程就是通过awaitDone方法来完成的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/**
* 等待完成,若已中断或超时则终止
*
* @param timed true if use timed waits
* @param nanos time to wait, if timed
* @return state upon completion
*/
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
// 若线程已中断,则从等待链表中移除当前线程节点
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
// 若非NEW或完成中状态,则将节点的线程置为null,并返回当前状态
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
// 若为完成中状态,则当前线程让出CPU执行时间,待下次竞争到CPU时再判断是否完成
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
// 若state为NEW,则新建等待队列节点
else if (q == null)
q = new WaitNode();
// 若当前线程所属的等待节点还没有插入链表,则插入到链表头
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
// 如果考虑超时时间,则计算时间并进行处理
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}

从上述代码看出,awaitDone的逻辑也不是很复杂,主要就是根据state的各种状态,进行新建等待节点,节点入等待列表等操作。

report方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* 任务完成时返回结果或抛出异常
*
* @param s completed state value
*/
@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}

outcome就是要返回的执行结果。那么它的值是在哪里赋的呢?就是我们前面提到过的set方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
*
* 若执行结果还没有赋值,或被取消时,设置执行结果
*
* @param v the value
*/
protected void set(V v) {
// 先将state从NEW转化为完成中
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
// 将state从NEW转化为正常结束
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}

当执行Callable的call方法发生异常时,会调用setException方法。

1
2
3
4
5
6
7
8
9
protected void setException(Throwable t) {
// 先将state从NEW转化为完成中
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
// 将state从NEW转化为发生异常
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}

finishCompletion主要完成一些扫尾工作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}

此方法的逻辑也很简单,就是遍历等待列表,将每一个节点都从列表删除,同时将每个等待线程解除阻塞。

以上分析了任务执行完成的情况,还有一种情况是执行被取消。它通过cancel方法来实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public boolean cancel(boolean mayInterruptIfRunning) {
// 此条件等价于: state != NEW || !UNSAFE.compareAndSwapInt(this, stateOffset, NEW,mayInterruptIfRunning ? INTERRUPTING : CANCELLED)
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}

当state为NEW时,返回fasle。若为NEW,则根据传入的参数,若传入true,则将state转化为正在中断,否则转化为已取消。

若传入参数为true,取消时还需中断执行任务的线程。之后将state设置为已中断。

总结

以上就是FutureTask的简单使用和源码解析。除了直接使用新建线程来执行FutureTask,还可以使用线程池,本文没有对此进行介绍。

分析FutureTask的源码,发现还是比较简单的。主要就是通过一个内部持有的Thread来执行任务,同时用一个等待列表来存放等待获取结果的线程。当等待线程尝试用get获取结果时,会先判断是否为完成中以后的状态,若是则直接返回结果。否则就一直轮询,等待状态更新。最后执行完成之后,则遍历等待列表,将所有等待线程都解除阻塞。当然还有处理异常等情况,此处不再分析。

线程池的概念

线程池(英语:thread pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。

Dubbo中的线程池

Dubbo是一款高性能的分布式服务框架,它实现了透明化的远程调用,只需要通过配置,就能像调用当前jvm内的方法一样调用另一个JVM中的方法,而无需关心底层通信细节。

我们在使用Dubbo时,如果只是发布一个服务供其他java进程调用,或是调用另一个JVM进程中的方法,一般不需要显式地关注多线程的使用。但是我们的一个程序,可以发布多个接口,可以并发处理多个接口的调用。或是并发地调用多个远程接口。这显然是底层框架帮我们处理了多线程的工作。

Dubbo中对Netty线程的使用

Dubbo在传输层默认使用的是Netty作为通信框架。Dubbo作为服务提供者时使用Netty建立Tcp服务端,作为服务使用者时也使用Netty建立Tcp客户端。

Netty是基于java的NIO技术并结合线程池的通信框架。以下是Dubbo建立服务端的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class NettyServer extends AbstractServer implements Server {
private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);
private Map<String, Channel> channels; // <ip:port, channel>
private ServerBootstrap bootstrap;
private io.netty.channel.Channel channel;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
@Override
protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap();
bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
new DefaultThreadFactory("NettyServerWorker", true));
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("handler", nettyServerHandler);
}
});
// bind
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
// 省略若干代码
}

ServerBootstrap是Netty的一个类,用于建立服务端的功能。它使用了两组线程池,一个是父线程池,用于处理接受客户端的连接请求,因为Dubbo协议只需要监听一个端口,所以这个线程池只需要一个线程即可。另一个是子线程池,每当和一个客户端建立连接之后,就会从此线程池中选择一个线程进行IO操作。

Dubbo中建立tcp客户端也使用了Netty,下面是实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class NettyClient extends AbstractClient {
private static final Logger logger = LoggerFactory.getLogger(NettyClient.class);
private static final NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(Constants.DEFAULT_IO_THREADS, new DefaultThreadFactory("NettyClientWorker", true));
private Bootstrap bootstrap;
private volatile Channel channel; // volatile, please copy reference to use
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
super(url, wrapChannelHandler(url, handler));
}
@Override
protected void doOpen() throws Throwable {
final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
bootstrap = new Bootstrap();
bootstrap.group(nioEventLoopGroup)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
//.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
.channel(NioSocketChannel.class);
if (getTimeout() < 3000) {
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
} else {
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout());
}
bootstrap.handler(new ChannelInitializer() {
@Override
protected void initChannel(Channel ch) throws Exception {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("handler", nettyClientHandler);
}
});
}

Netty客户端只需要一个线程池就可以。

以上是Dubbo中对于IO线程池的使用。此外,Dubbo发送请求和处理请求也使用了线程池。

Dubbo业务线程池的使用

Dubbo中线程池的种类

Dubbo中使用的线程池都是基于Java提供的ThreadPoolExecutor类。它基于不同的策略提供了创建不同的ThreadPoolExecutor的线程池工厂。包括以下几种:

CachedThreadPool : 缓存线程池,它创建的是一个可以扩展的线程池,可以配置核心线程数量,最大线程数量,任务队列长度,空闲线程保活时间。这个线程池默认的队列长度是java中int类型的最大值。

EagerThreadPool : 这个线程池工厂提供的是一个Dubbo提供自定义的线程池类EagerThreadPoolExecutor,这个线程池扩展了java中的ThreadPoolExecutor。当所有的核心线程都处于忙碌状态时,若有新任务到来,将会直接创建新线程,而不是放到任务队列中。

FixedThreadPool : 固定线程池,它创建的是一个拥有固定线程数量的线程池。用户可配置相关参数。这个线程池是默认选项。

LimitedThreadPool : 可伸缩线程池。它提供的线程池的特色是,线程池设定的线程保活时间是Long类型的最大值。因此它的线程不会减少,只会增加。

Dubbo中对业务线程池的配置是在下面这个类中实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class WrappedChannelHandler implements ChannelHandlerDelegate {
protected static final Logger logger = LoggerFactory.getLogger(WrappedChannelHandler.class);
protected static final ExecutorService SHARED_EXECUTOR = Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler", true));
protected final ExecutorService executor;
protected final ChannelHandler handler;
protected final URL url;
public WrappedChannelHandler(ChannelHandler handler, URL url) {
this.handler = handler;
this.url = url;
executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) {
componentKey = Constants.CONSUMER_SIDE;
}
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
}

这个类持有了一个线程池接口ExecutorService executor。当实例化这个类时,会根据URL的参数获取对应的线程池,也就是上面说的四种线程池中的一种。

以上是对Dubbo中线程池使用的简要介绍,还有很多地方没有深入。后续有机会再分析。

Spring是企业级Java开发中应用最广泛的编程框架。在EJB日渐式微的情况下,Spring已成为企业级Java开发的事实标准。经过十多年的发展,不仅传统的配置方式仍然在广泛使用,同时也诞生了Spring Boot,Spring Cloud等基于Spring的新的框架,在方兴未艾的微服务领域继续引领着潮流。

在使用Spring的过程中,相信我们都很容易感受到Spring给我们带来的便利,这不禁就会引发我们对Spring本身的设计和实现的兴趣。通过阅读文档和代码,我们会发现Spring堪称Java项目设计与实现的典范,它的设计非常地优雅,是我们学习Java开发,乃至面向对象设计与开放的很好的教材。

在Spring的体系中,依赖注入(Dependency Injection,简称DI)是一个基础设施级的功能。一般说使用Spring,默认都会认为必然会使用DI的功能。所以学习Spring的源代码,一般也会从DI入手。

Spring的依赖注入主要是靠应用上下文(ApplicationContext)来实现的。顾名思义,应用上下文就是持有了应用启动必须的各种信息的对象。在多种ApplicationContext中,ClassPathXmlApplicationContext是比较简单的一个,从它的名字可以看出,它是基于Java的Classpath的,同时是基于Xml配置的。

下面先分析一下ClassPathXmlApplicationContext的类关系

我们首先关注一下ClassPathXmlApplicationContext到ApplicationContext的继承和实现关系。从顶向下各个接口和实现类的功能如下:

ApplicationContext:这个接口是提供了应用程序配置的核心接口,当程序运行时它是只读的,不能修改状态,但是可以重新加载(reload),只要具体的实现类支持。

ConfigurableApplicationContext:这是一个支持SPI加载的接口,绝大多数应用上下文都实现了它。它内部定义了一些默认的基础常量,同时提供了ApplicationContext之外的配置应用程序的方法。

AbstractApplicationContext:应用上下文的抽象实现类。这个类可以说是应用上下文的骨架实现类。它不关心用到的配置的存储方式;实现了通用的应用上下文功能;采用了模板设计模式,具体的实现类需要实现它定义的抽象方法。

Base class for {@link org.springframework.context.ApplicationContext}

  • implementations which are supposed to support multiple calls to {@link #refresh()},
  • creating a new internal bean factory instance every time.
  • Typically (but not necessarily), such a context will be driven by
  • a set of config locations to load bean definitions from.

AbstractRefreshableApplicationContext:这个类支持多次刷新上下文。每次刷新时,它会创建一个新的内部Bean Factory(Bean工厂,通过它实际持有创建的bean)。

AbstractRefreshableConfigApplicationContext:提供了对某种形式的存储配置文件路径的支持,包括类路径(ClassPath),文件系统等。

AbstractXmlApplicationContext:这个类提供了从XML文件中提取bean定义的功能(通过XmlBeanDefinitionReader实现)

ClassPathXmlApplicationContext:这个类从Class path获取Context配置文件。

下面就以dubbo源代码中提供的demo来跟踪一下ClassPathXmlApplicationContext这个应用上下文的启动过程。它主要通过下面这一行代码来启动:

1
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-provider.xml"});

这行代码很简单,就是调用了构造方法,参数是一个字符串数组,只有一个元素,指出了配置文件的路径。

进入这个构造方法:

1
2
3
4
5
6
7
8
9
/**
* Create a new ClassPathXmlApplicationContext, loading the definitions
* from the given XML files and automatically refreshing the context.
* @param configLocations array of resource locations
* @throws BeansException if context creation failed
*/
public ClassPathXmlApplicationContext(String... configLocations) throws BeansException {
this(configLocations, true, null);
}

它又调用了自己的另一个构造函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* Create a new ClassPathXmlApplicationContext with the given parent,
* loading the definitions from the given XML files.
* @param configLocations array of resource locations
* @param refresh whether to automatically refresh the context,
* loading all bean definitions and creating all singletons.
* Alternatively, call refresh manually after further configuring the context.
* @param parent the parent context
* @throws BeansException if context creation failed
* @see #refresh()
*/
public ClassPathXmlApplicationContext(String[] configLocations, boolean refresh, ApplicationContext parent)
throws BeansException {
super(parent);
setConfigLocations(configLocations);
if (refresh) {
refresh();
}
}

这个构造函数先调用了超类的构造函数,之后判断传入的是否刷新的布尔值,如果为true,则调用refresh方法。

超类的构造方法基本什么也没做,除了每个超类定义的在构造函数之前就需要初始化的field的初始化之外,只是在AbstractApplicationContext的构造函数中设置了一下parent context(Spring支持有层级的应用上下文,但本例中不涉及)。

由此可见,所有的启动和刷新上下文的功能都是refresh这个方法完成的。这个方法是在ConfigurableApplicationContext中定义,在AbstractApplicationContext中定义的,子类没有覆盖它,这也说明Spring不同的上下文启动和刷新的流程是通用的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
@Override
public void refresh() throws BeansException, IllegalStateException {
synchronized (this.startupShutdownMonitor) {
// Prepare this context for refreshing.
prepareRefresh();
// Tell the subclass to refresh the internal bean factory.
ConfigurableListableBeanFactory beanFactory = obtainFreshBeanFactory();
// Prepare the bean factory for use in this context.
prepareBeanFactory(beanFactory);
try {
// Allows post-processing of the bean factory in context subclasses.
postProcessBeanFactory(beanFactory);
// Invoke factory processors registered as beans in the context.
invokeBeanFactoryPostProcessors(beanFactory);
// Register bean processors that intercept bean creation.
registerBeanPostProcessors(beanFactory);
// Initialize message source for this context.
initMessageSource();
// Initialize event multicaster for this context.
initApplicationEventMulticaster();
// Initialize other special beans in specific context subclasses.
onRefresh();
// Check for listener beans and register them.
registerListeners();
// Instantiate all remaining (non-lazy-init) singletons.
finishBeanFactoryInitialization(beanFactory);
// Last step: publish corresponding event.
finishRefresh();
}
catch (BeansException ex) {
if (logger.isWarnEnabled()) {
logger.warn("Exception encountered during context initialization - " +
"cancelling refresh attempt: " + ex);
}
// Destroy already created singletons to avoid dangling resources.
destroyBeans();
// Reset 'active' flag.
cancelRefresh(ex);
// Propagate exception to caller.
throw ex;
}
finally {
// Reset common introspection caches in Spring's core, since we
// might not ever need metadata for singleton beans anymore...
resetCommonCaches();
}
}
}

prepareFresh方法主要做了一些准备工作,如设置启动时间,设置关闭状态为false,活动状态为true,初始化属性源等。

obtainFreshBeanFactory方法内部通过AbstractRefreshableApplicationContext中的refreshBeanFactory方法刷新bean工厂,它先判断内部的bean factory是否已存在,若存在则销毁它们保存的bean,并关闭之。之后这个方法的核心工作是调用了createBeanFactory方法创建内部的bean factory。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* Create an internal bean factory for this context.
* Called for each {@link #refresh()} attempt.
* <p>The default implementation creates a
* {@link org.springframework.beans.factory.support.DefaultListableBeanFactory}
* with the {@linkplain #getInternalParentBeanFactory() internal bean factory} of this
* context's parent as parent bean factory. Can be overridden in subclasses,
* for example to customize DefaultListableBeanFactory's settings.
* @return the bean factory for this context
* @see org.springframework.beans.factory.support.DefaultListableBeanFactory#setAllowBeanDefinitionOverriding
* @see org.springframework.beans.factory.support.DefaultListableBeanFactory#setAllowEagerClassLoading
* @see org.springframework.beans.factory.support.DefaultListableBeanFactory#setAllowCircularReferences
* @see org.springframework.beans.factory.support.DefaultListableBeanFactory#setAllowRawInjectionDespiteWrapping
*/
protected DefaultListableBeanFactory createBeanFactory() {
return new DefaultListableBeanFactory(getInternalParentBeanFactory());
}

可见是新建了一个DefaultListableBeanFactory。这个类的类关系如下图:

关注一下BeanFactory这个接口,它是访问Spring bean容器的根接口,提供了访问bean容器的基本功能。

Extension of the {@link BeanFactory} interface to be implemented by bean factories

  • that can enumerate all their bean instances, rather than attempting bean lookup
  • by name one by one as requested by clients. BeanFactory implementations that
  • preload all their bean definitions (such as XML-based factories) may implement
  • this interface.

ListableBeanFactory提供了枚举bean实例的功能,它会预加载bean的定义

BeanDefinitionRegistry:持有bean定义,例如root bean definition和child bean definition实例通常被bean factory实现。

DefaultListableBeanFactory:ListableBeanFactory和BeanDefinitionRegistry的默认实现。常用于在访问bean之前,保存所有bean的definition。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* This implementation performs an actual refresh of this context's underlying
* bean factory, shutting down the previous bean factory (if any) and
* initializing a fresh bean factory for the next phase of the context's lifecycle.
*/
@Override
protected final void refreshBeanFactory() throws BeansException {
if (hasBeanFactory()) {
destroyBeans();
closeBeanFactory();
}
try {
DefaultListableBeanFactory beanFactory = createBeanFactory();
beanFactory.setSerializationId(getId());
customizeBeanFactory(beanFactory);
loadBeanDefinitions(beanFactory);
synchronized (this.beanFactoryMonitor) {
this.beanFactory = beanFactory;
}
}
catch (IOException ex) {
throw new ApplicationContextException("I/O error parsing bean definition source for " + getDisplayName(), ex);
}
}

通过loadBeanDefinition加载定义到beanFactory中。这个方法是通过XmlBeanDefinitionReader去加载配置文件中的bean定义。具体过程比较繁琐,这里就不展开了,后续有时间再专门介绍。加载完之后,会将beanDefinition保存在DefaultListableBeanFactory的一个field中:

1
2
/** Map of bean definition objects, keyed by bean name */
private final Map<String, BeanDefinition> beanDefinitionMap = new ConcurrentHashMap<String, BeanDefinition>(256);

至此,Spring容器启动过程中的第一大步骤就算基本完成了,就是将bean定义从配置文件中读取出来,并解析为BeanDefinition保存在应用上下文的内置bean factory的内部的一个map钟,key为配置文件中定义的bean的name。

之后回到refresh方法,下面是prepareBeanFactory方法,这个方法就是对内部的bean factory做各种设置,以方便后面使用。具体就不介绍了。感兴趣可以自行研究代码。

postProcessBeanFactory是一个空方法,可以自定义一些对bean factory的定制化处理。由此以及后续的过程可以看出,Spring非常注重扩展性,留出了很多供使用者灵活扩展的地方,充分体现了“对修改关闭,对扩展开放”的面向对象设计原则。

invokeBeanFactoryPostProcessors:实例化并调用所有的BeanFactoryPostProcessor,BeanFactoryPostProcessor就是在bean factory的标准初始化流程结束之后,对它进行一些特殊配置的类。这个接口和后面的一些接口都可以看出Spring设计的原则,那就是先定义好某个功能的标准处理流程,但也提供了进行定制化处理的接口,并通过先注册后调用的方式很有秩序的进行处理。

registerBeanPostProcessors:实例化并调用所有已经注册的BeanPostProcessor。BeanPostProcessor和BeanFactoryPostProcessor类似,只不过一个是针对bean factory,一个是针对具体的bean。它定义了两个方法postProcessBeforeInitialization和postProcessAfterInitialization。前者会在某个bean的初始化方法(InitializingBean接口的afterPropertiesSet方法,或自定义的init-method)调用之前被调用。后者则是在初始化方法调用之后调用。

initMessageSource方法初始化message source。

initApplicationEventMulticaster方法初始化应用事件多播器。应用事件多播器是管理一系列ApplicationListener的,并且发布事件给它们。

onRefresh空方法,留给子类扩展。

registerListeners是获取所有实现了ApplicationListener的类,并注册它们,同时将一些早起的Application event发布出去。

finishBeanFactoryInitialization终于到最重要的一步了,就是完成Context中的bean factory的初始化,并初始化所有的还未初始化的单例bean。这个方法首先又对bean factory做了一系列设置,之后调用DefaultListableBeanFactory的preInstantiateSingletons方法对bean进行了初始化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public void preInstantiateSingletons() throws BeansException {
if (this.logger.isDebugEnabled()) {
this.logger.debug("Pre-instantiating singletons in " + this);
}
// Iterate over a copy to allow for init methods which in turn register new bean definitions.
// While this may not be part of the regular factory bootstrap, it does otherwise work fine.
List<String> beanNames = new ArrayList<String>(this.beanDefinitionNames);
// Trigger initialization of all non-lazy singleton beans...
for (String beanName : beanNames) {
RootBeanDefinition bd = getMergedLocalBeanDefinition(beanName);
if (!bd.isAbstract() && bd.isSingleton() && !bd.isLazyInit()) {
if (isFactoryBean(beanName)) {
final FactoryBean<?> factory = (FactoryBean<?>) getBean(FACTORY_BEAN_PREFIX + beanName);
boolean isEagerInit;
if (System.getSecurityManager() != null && factory instanceof SmartFactoryBean) {
isEagerInit = AccessController.doPrivileged(new PrivilegedAction<Boolean>() {
@Override
public Boolean run() {
return ((SmartFactoryBean<?>) factory).isEagerInit();
}
}, getAccessControlContext());
}
else {
isEagerInit = (factory instanceof SmartFactoryBean &&
((SmartFactoryBean<?>) factory).isEagerInit());
}
if (isEagerInit) {
getBean(beanName);
}
}
else {
getBean(beanName);
}
}
}

基本过程是就是遍历所有的bean definition,判断不是抽象类,同时是单例,并且没有设置lazy-init的就进行处理。处理时又分为是否是工厂类和不是工厂类进行处理。普通bean直接调用getBean进行处理,工厂bean则要进行一些处理,判断是否是立即加载的。

getBean内部直接调用了doGetBean方法,doGetBean中最终调用了createBean方法来创建一个bean,createBean中调用了doCreateBean来实际创建一个bean。

Spring是通过一个BeanWrapper接口来包裹我们实际要创建的类型的bean,这也是一种比较常见的设计模式,就是通过包装类来提供一些额外的功能。BeanWrapper的实现类主要是实现了Bean的属性编辑器的功能。doCreateBean做的事情比较杂,后续有时间再专门分析。

finishRefresh方法主要是完成刷新,主要做了一些善后工作。

通过对ClassPathXmlApplicationContext的启动过程的分析,我们可以总结一些规律。一是Spring的应用上下文的类体系设计得比较复杂,也因此显得很强大和完善。二是标准流程和扩展流程相分离,给使用者的扩展留出了足够的空间。三是采用了很多内部缓存类,比如缓存了bean的定义,bean实例,bean的name等都用了不同的集合做了专门的缓存。特别是针对单例bean的三级缓存,可以解决循环依赖的问题。

数据库的事务隔离级别

一.读未提交
读未提交是指当一个事务在执行过程中,若另一个事务修改了同一条数据,但没有提交,仍然会被当前数据读取到被更改但未提交的新值。此时可能发生脏读。
二.读提交
读提交是指当一个事务在执行过程中,若另一个事务修改了数据并且提交了修改。此时提交后的数据会被当前数据读取到。此时会发生不可重复读。
三.可重复读
可重复读是指当一个事务在执行过程中,不管读同一条数据多少次,读到的数据和第一次读取的值保持一致。此时可能发生幻读,即另一个事务插入了新数据,但没有被当前事务读取到。
四.可序列化
可序列化是指所有事务顺序执行,此时可避免幻读。

通过JDBC对数据库的访问,可验证数据库的事务隔离级别。

建表语句

1
2
3
4
5
6
7
8
CREATE TABLE Orders
(
O_id INT AUTO_INCREMENT,
OrderPrice DOUBLE(11, 2) NOT NULL,
Customer VARCHAR(50) NULL,
CONSTRAINT `PRIMARY`
PRIMARY KEY (O_id)
);

插入一条数据
1 1000 Bush 4

程序代码

编写两个Java程序,开启两个事务对MySQL数据库进行访问

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import java.sql.*;
public class JdbcTest {
static final String JDBC_DRIVER = "com.mysql.jdbc.Driver";
static final String DB_URL = "jdbc:mysql://localhost/database2";
static final String USER = "root";
static final String PASS = "1234";
public static void main(String[] args) {
Connection connection = null;
Statement statement = null;
try {
Class.forName(JDBC_DRIVER);
System.out.println("connecting...");
connection = DriverManager.getConnection(DB_URL, USER, PASS);
connection.setAutoCommit(false);
statement = connection.createStatement();
String sql = "select * from Orders where O_id = ?";
PreparedStatement ps = connection.prepareStatement(sql);
ps.setInt(1,1);
ResultSet rs = ps.executeQuery();
while (rs.next()) {
System.out.println(rs.getInt(4));
}
rs = ps.executeQuery();
while (rs.next()) {
System.out.println(rs.getInt(4));
}
rs.close();
connection.commit();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import java.sql.*;
public class JdbcTestTwo {
static final String JDBC_DRIVER = "com.mysql.jdbc.Driver";
static final String DB_URL = "jdbc:mysql://localhost/database2?useSSL=false";
static final String USER = "root";
static final String PASS = "1234";
public static void main(String[] args) {
Connection connection = null;
Statement statement = null;
try {
Class.forName(JDBC_DRIVER);
System.out.println("connecting...");
connection = DriverManager.getConnection(DB_URL, USER, PASS);
connection.setAutoCommit(false);
statement = connection.createStatement();
String sql = "UPDATE Orders SET col = 2 WHERE O_id = ?";
PreparedStatement ps = connection.prepareStatement(sql);
ps.setInt(1,1);
// ResultSet rs = ps.executeQuery();
// while (rs.next()) {
// System.out.println(rs.getInt(4));
// }
int count = ps.executeUpdate();
// while (rs.next()) {
// System.out.println(rs.getInt(4));
// }
// rs.close();
connection.commit();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}

在JdbcTest中对数据库表的同一条数据进行了两次查询,只需要在两次查询之间加一个断点,当程序暂停时,运行JdbcTest2,就可以观察到数据库中的数据变化是否符合隔离级别的定义。

读未提交

将MySQL数据库全局隔离级别设置为读未提交的命令:
SET GLOBAL TRANSACTION ISOLATION LEVEL READ UNCOMMITTED;

1.运行JdbcTest,第一次查询出数据列col为4,暂停程序;
2.运行JdbcTest2,执行为update语句之后,暂停,不提交;
3.继续运行JdbcTest,第二次查询出数据列已经变为2;

读提交

将MySQL数据库全局隔离级别设置为读提交的命令:
SET GLOBAL TRANSACTION ISOLATION LEVEL READ COMMITTED;

1.运行JdbcTest,第一次查询出数据列col为4,暂停程序;
2.运行JdbcTest,第一次查询出数据列为2,暂停程序,不提交;
3.继续运行JdbcTest,第二次查询出数据列仍然为4;

若第二步不暂停直接提交,则第三步查询出的数据将为2;

可重复读

将MySQL数据库全局隔离级别设置为可重复读的命令:
SET GLOBAL TRANSACTION ISOLATION LEVEL REPEATABLE READ;

1.运行JdbcTest,第一次查询出数据列col为4,暂停;
2.运行JdbcTest2,直接提交,将数据修改为2;
3.运行JdbcTest,读出数据依然为2。

可序列化

将MySQL数据库全局隔离级别设置为可重复读的命令:
SET GLOBAL TRANSACTION ISOLATION LEVEL SERIALIZABLE;

1.运行JdbcTest,第一次查询出数据列col为4,暂停;
2.运行JdbcTest2,将一直被阻塞,直到JdbcTest恢复执行完后,它才能完成事务,或者超时抛出异常。

总结

数据库的隔离级别是很重要的特性,在不同场合使用不同的隔离级别会有不同的用处。大多数情况下使用的是可重复读。通过编程来验证隔离级别,可以更好地加深对知识的理解。

Dubbo是阿里推出的服务治理框架,被国内很多公司广泛使用。关于它的介绍可参考http://http://dubbo.io.
本文主要介绍如何通过dubbo搭建服务治理环境,通过在本地搭建环境,有助于学习dubbo的源代码实现,毕竟可以边看源代码边运行调试。

Zookeeper的安装

dubbo是一个服务治理服务,那它必然有一个服务注册中心。dubbo的服务注册中心可以采用multicast、zookeeper、redis或simple注册中心。比较常见的是zookeeper注册中心。

首先下载zookeeper,下载之后首先要做的是进入它的子目录conf中,将zoo_sample.cfg文件名修改为zoo.cfg,因为zookeeper启动时会使用它。之后从终端进入它所在目录,运行
bin/zkServer.sh start。

之后可输入jps命令,若显示运行的进程中有
QuorumPeerMain,则证明zookeeper已启动。

运行bin/zkServer.sh stop停止zookeeper。

安装tomcat

下载tomcat 7。为什么是7,因为经试验tomcat 8和9两个版本在部署dubbo时都有问题。下载之后进入子目录conf中,将server.xml中配置的connector的port由8080改成其他数字,注意不要和已经被其他程序使用的端口冲突。

进入tomcat的子目录bin中,运行startup.sh(若是Windows则是startup.bat)。若启动过程不报错,且最后显示tomcat started,则证明启动成功。

运行shutdown.sh停止tomcat。

安装dubbo-admin

http://dubbo.io/
或github上下载dubbo项目的源代码。dubbo-admin是dubbo的控制台,可以通过它查看和管理在dubbo上注册的服务。下载dubbo源代码之后,从终端进入dubbo文件夹。输入命令

mvn install -Dmaven.test.skip=true

将会编译整个dubbo项目,也包括dubbo-admin。

之后进入dubbo-admin的子目录target中,会发现多了一个dubbo-admin-2.5.7.war。将这个包拷到tomcat的webapps文件夹中。

按前面介绍的方法先启动zookeeper,再启动tomcat。之后在浏览器中输入localhost:8888/dubbo-admin-2.5.7。会看到如下界面。

可以看到服务提供者和调用者都是0,说明还没有服务注册到zookeeper中。

注册服务生产者和消费者

为简化起见,本文采用IDE来构建服务。

首先是用intellij idea导入整个dubbo项目。

可以看到其中就有dubbo-demo项目,它主要有三个子项目构成。

其中远程调用的接口DemoService:

1
2
3
4
5
6
7
package com.alibaba.dubbo.demo;
public interface DemoService {
String sayHello(String name);
}

这个类之所以是一个单独的项目,是因为它需要被服务端和客户端同时引用。服务端会实现它:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package com.alibaba.dubbo.demo.provider;
import com.alibaba.dubbo.demo.DemoService;
import com.alibaba.dubbo.rpc.RpcContext;
import java.text.SimpleDateFormat;
import java.util.Date;
public class DemoServiceImpl implements DemoService {
public String sayHello(String name) {
System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "] Hello " + name + ", request from consumer: " + RpcContext.getContext().getRemoteAddress());
return "Hello " + name + ", response form provider: " + RpcContext.getContext().getLocalAddress();
}
}

客户端则可以通过配置,就像调用本地方法一样调用它。这也是RPC的主要目的之一。

服务端主类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.alibaba.dubbo.demo.provider;
import org.springframework.context.support.ClassPathXmlApplicationContext;
/**
* Created by ken.lj on 2017/7/31.
*/
public class Provider {
public static void main(String[] args) throws Exception {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-provider.xml"});
context.start();
System.in.read(); // 按任意键退出
}
}

客户端主类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.alibaba.dubbo.demo.consumer;
import com.alibaba.dubbo.demo.DemoService;
import org.springframework.context.support.ClassPathXmlApplicationContext;
/**
* Created by ken.lj on 2017/7/31.
*/
public class Consumer {
public static void main(String[] args) throws Exception {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-consumer.xml"});
context.start();
DemoService demoService = (DemoService) context.getBean("demoService"); // 获取远程服务代理
String hello = demoService.sayHello("世界"); // 执行远程方法
System.out.println(hello); // 显示调用结果
System.in.read();
}
}

客户端最后一行代码是我加的,为了让客户端和服务端一样执行完调用之后不会退出程序。

此外服务端和客户端还需要在spring配置文件中做一些配置。服务端配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
xmlns="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
<!-- 提供方应用信息,用于计算依赖关系 -->
<dubbo:application name="demo-provider"/>
<!-- 使用multicast广播注册中心暴露服务地址 -->
<dubbo:registry address="zookeeper://127.0.0.1:2181"/>
<!-- 用dubbo协议在20880端口暴露服务 -->
<dubbo:protocol name="dubbo" port="20880"/>
<!-- 和本地bean一样实现服务 -->
<bean id="demoService" class="com.alibaba.dubbo.demo.provider.DemoServiceImpl"/>
<!-- 声明需要暴露的服务接口 -->
<dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService"/>
</beans>

客户端配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
xmlns="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
<!-- 消费方应用名,用于计算依赖关系,不是匹配条件,不要与提供方一样 -->
<dubbo:application name="demo-consumer"/>
<!-- 使用multicast广播注册中心暴露发现服务地址 -->
<dubbo:registry address="zookeeper://127.0.0.1:2181"/>
<!-- 生成远程服务代理,可以和本地bean一样使用demoService -->
<dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService"/>
</beans>

可见配置文件中就配置了我们之前启动的zookeeper,所以程序启动之后就可以注册到zookeeper中了。

现在先运行服务端的Provider的main方法,再运行Consumer的main方法。可以看到consumer端打印出了如下内容。

可见,客户端通过调用服务端的方法,返回了字符串并打印出来了。

再看dubbo-admin的页面:

可见dubbo-admin中已经能看到注册进来的服务提供者和调用者了。

总结

本文采用了比较简单的方式,即从IDE中运行dubbo项目,虽然在实际运用中需要单独部署服务,但这种方式便于学习源代码。比如可以单步跟踪,看整个dubbo执行远程调用的流程。

Spring Boot是Spring家族一个新兴的成员,它能用于快速构建基于Spring的独立的生产级的应用程序。我们可以将它与Spring平台的其他项目以及第三方库自由组合。大多数基于Spring Boot的应用只需要非常少量的Spring配置。

Spring Boot的安装

与其他Java库类似,Spring Boot也是打包成jar的形式,所以可以在应用中直接将spring-boot-*.jar引入到classpath中。可以像开发其他Java程序那样开发Spring Boot程序。

但是在现代的企业级Java开发中,一般会使用专门的构建工具来管理应用的整个生命周期,例如Maven和Gradle。

当使用Maven时,只需要在pom.xml中配置好相关依赖,就可以在构建过程中自动引用Spring Boot的相关jar。

用Spring Boot开发web应用

学习任何新技术,都是从开发Hello world级别的程序开始的。用Spring Boot结合Maven开发Hello world级别的Web应用可谓易如反掌。

创建pom.xml文件

在磁盘合适的位置新建一个项目文件夹,比如SpringBootTest。从终端进入该文件夹,创建一个pom.xml文件,输入如下内容并保存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>myproject</artifactId>
<version>0.0.1-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.3.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
</project>

可以看到引入了spring-boot-starter-parent和spring-boot-starter-web两个starter。Spring Boot是用于简化Spring应用开发的,spring-boot-starter-parent的作用是让maven自动引入默认的一些基础依赖,spring-boot-starter-web则会让maven自动引入web应用需要的依赖。试想一下如果我们不使用Spring Boot,而是手工用Spring,Spring MVC来开发web应用,那需要配置应用上下文,Servlet上下文等,在maven中也要手工将相关依赖都引入进来。配置量会成倍增加。而用Spring Boot则非常轻松,会将配置最少化。

有了这个pom文件,就可以用maven来构建程序了,可以用mvn package来测试,会看到maven会下载很多依赖,打印出如下信息,就证明能正常构建程序。
Building myproject 0.0.1-SNAPSHOT
Build Success
同时,项目文件夹下出现了target文件夹,该文件夹就是构建出来的可执行文件。
可以执行maven dependency:tree命令来打印出项目依赖的树形表示。

编写代码

使用Maven管理应用必须使文件夹结构符合maven的约定。所以在SpringBootTest文件夹下新建如下目录结构:
src/main/java
进入该文件夹,创建文件Example.java,输入如下内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import org.springframework.boot.*;
import org.springframework.boot.autoconfigure.*;
import org.springframework.stereotype.*;
import org.springframework.web.bind.annotation.*;
@RestController
@EnableAutoConfiguration
public class Example {
@RequestMapping("/")
String home() {
return "Hello World!";
}
public static void main(String[] args) throws Exception {
SpringApplication.run(Example.class, args);
}
}

@RestController注解标志这个类是一个web Controller,并且是REST的访问方式。

@RequestMapping提供了“路由”信息。它告诉Spring任何HTTP请求,如果路径符合“/”就会被路由到home方法。因为是用Rest方式访问的Controller,所以直接返回Hello World!字符串,而不是重定向到一个view。

@EnableAutoConfiguration告诉Spring Boot开发者希望如何配置Spring,依据pom.xml中添加的依赖。因为spring-boot-starter-web依赖会自动添加Tomcat和Spring MVC,自动配置组件就知道开发者在开发一个web应用程序,并据此设置Spring。

main方法

这里的main方法就是Java程序中标准的main方法。但该方法代理执行了Spirng Boot的SpringApplication类的run方法,从而启动了Spring去自动配置Tomcat。此处需要将Example.class作为参数传给run方法,告诉SpringApplication它是主要的Spring组件。

运行程序

万事俱备只欠东风。用终端进入Spring进入到SpringBootTest(必须是pom.xml所在文件夹),输入如下命令。
mvn spring-boot:run

会看到输出如下内容:

1
2
3
4
5
6
7
8
9
10
 .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v1.5.3.RELEASE)
…… …
…… …
Started Example in 3.239 seconds (JVM running for 21.929)

打开浏览器,输入localhost:8080,会看到如下输出
Hello World!

总结

从上文可以看出,Spring Boot可以在Spring框架的基础上快速搭建应用,例如Web应用。它可以有效减少配置量,而且可以使用内嵌的Web容器,可以高效地开发和测试程序。

Kotlin是一门新兴的语言,它可以运行于JVM、JavaScript以及native环境(native正在开发中)。它是由著名软件开发商Jetbrains开发,具备了很多新的特性,能够在开发中提升效率。目前已经成为Android官方支持的开发语言。
Kotlin语言中的一切类型都是对象,这与Java不同。虽然Java标榜自己是一门纯粹的面向对象的语言,但由于设计的局限,还是保留了一些非面向对象的元素,最明显的就是在Java中,int、float、double、long、char等基本类型不是类,而是原生类型(primitive types)。为了补救这个缺陷,Java开发了对应的包装类型Integer、Float、Double、Long、Chararater等。
Kotlin吸收了C#等语言的设计思想,将上述基本类型内置为类,它们具有自己的成员函数和成员属性。这些基本类型包括:数字类型(numbers),字符类型(characters),布尔类型(booleans)和数组类型(arrays)。

Number

Kotlin中的数字类型与Java类似,包括Double、Float、Long、Int、Short、Byte等类型。如上文所述,它们都是类。

字面常量

Kotlin中的字面常量(literal constants)与Java类似,Int类型支持十进制,直接用普通十进制表示即可;Long类型则需跟上L后缀,如123L表示一个Long型常数,值是123。十六进制加0x前缀,如0x0F表示十六进制数,值是15。二进制加0b前缀,如0b00001011表示二进制数,值是11。Kotlin中不支持八进制数。
Kotlin支持通用的浮点数表示法,默认是Double类型,如123.5,123.5e10,如果是Float类型,需要加f或F后缀,如123.5f。

下划线分隔位数

Kotlin支持在数字常量中使用下划线分隔数位。这能使位数较多的数字具有更强的可读性。

1
2
val oneMillion = 1_000_000
val creditCardNumber = 1234_5678_9012_3456L

当Kotlin运行于JVM中时,number类型实际上是存储为JVM原生类型,当被赋值给可以为空(nullable)的引用时,Kotlin的number类型是对原生类型的包装。
Kotlin中,当使用一个number型常量(val)去初始化一个不可为空的number型常量时,被初始化的常量都指向内存中同一个地址。而当初始化为一个可以为空的常量时,则会在内存中不同地址创建两个常量,并将值赋给它们。

1
2
3
4
5
6
7
8
9
10
11
val a: Int = 10000
print(a === a) // Prints 'true'
val boxedA: Int = a
val anotherBoxedA: Int = a
print(boxedA === anotherBoxedA) // !!!Prints true!!!
val a: Int = 10000
print(a === a) // Prints 'true'
val boxedA: Int? = a
val anotherBoxedA: Int? = a
print(boxedA === anotherBoxedA) // !!!Prints 'false'!!!

但即使是赋值给可以为空的常量,依然能保证它们的相等性(equality):

1
2
3
4
val a: Int = 10000
print(a == a) // Prints 'true'
val boxedA: Int? = a val anotherBoxedA: Int? = a
print(boxedA == anotherBoxedA) // Prints 'true'

宽向扩展

Kotlin中不存在隐式的宽向类型扩展,即Byte不能直接转换为Int,Int不能直接转换为Double,等等。

1
2
val b: Byte = 1 // OK, literals are checked statically
val i: Int = b // ERROR

每种number类型都能通过内置方法任意转换为另一种number类型。

Characters

字符类型用Char类表示。与Java不同,它不能被直接当做数字类型使用。

1
2
3
4
fun check(c: Char) {
if (c == 1) { // ERROR: incompatible types // ...
}
}

与Java一样,字符字面量使用单引号表示。转义字符使用反斜杠。特殊字符使用反斜杠加u加十六进制数表示。字符可以通过toInt()方法显式转换为Int。
与number一样,字符被赋值给一个nullable引用时,唯一性(identity,内存中地址相同)是不被保证的。

Booleans

用Boolean代表布尔类型,它有两个值:true和false。

布尔类型被赋值给nullable引用时,会被包装。但能保证唯一性。

1
2
3
4
5
val a: Boolean = true
println(a === a) // Prints 'true'
val boxedA: Boolean? = a
val anotherBoxedA: Boolean? = a
println(boxedA === anotherBoxedA) // Prints ‘true’

Arrays

在Java中,数组其实也是一种对象,只是我们不能显示地获得它的类型表示。在Kotlin中,用Array类来代表数组。它具有get和set方法(通过[]操作符重载了这两个函数),size属性以及其他有用的函数,如迭代器方法iterator()。

创建数组可以使用arrayOf()方法,并传递数组元素给它作为参数。例如arrayOf(1, 2, 3)就创建了一个数组[1, 2, 3]。arrayOfNulls()函数用于创建一个给定长度的数组,并用null元素填充它。

Kotlin也有特定的类代表原生类型的数组,如ByteArray、ShortArray、IntArray等。这些类与Array没有继承关系,但有对应的方法和属性集合。如IntArray有对应的工厂函数。

1
2
val x: IntArray = intArrayOf(1, 2, 3)
x[0] = x[1] + x[2]

Strings

字符串用String类表示。与Java一样,String是不可变类。String的元素是字符,与Java中必须使用charAtIndex()这样一个方法去访问元素不同,Kotlin中可以使用下标运算符[]去访问字符元素。String还能使用for循环很方便地访问。

1
2
3
for (c in str) {
println(c)
}

字符串字面量

Kotlin有两种字符串字面量:一种是可转义的字符串,它可能含有转义字符,它不能表示在多行,除非使用“+”将不同的字符串连接成一个字符串;另一种是原始(raw)字符串,能在多行表示并能包含任意文本。

转义字符串:

1
val s = "Hello, world!\n"

原始字符串:

1
2
3
val text = """
for (c in "foo") print(c)
"""

可以使用trimMargin()移除字符串开头和中间的空格,但空格的末尾必须跟上一个明确的分隔符,默认是“|”。

1
2
3
4
5
6
val text = """
|Tell me and I forget.
|Teach me and I remember.
|Involve me and I learn.
|(Benjamin Franklin) """.trimMargin()
"""

也可以显示指定分隔符,如trimMargin(“>”)

字符串模板

在Java中,如果我们在字符串中要加入变量,必须使用加号连接,或者使用格式化字符串。而在Kotlin中,可以使用字符串模板,这将使字符串变得更加简洁,看上去也更加舒服。字符串模板以美元符号开头,直接跟上一个变量名。

1
2
3
val i = 10
val s = "i = $i" // evaluates to "i = 10"
"""

或者是一个用大括号包裹的任意的表达式。

1
2
3
val s = "abc"
val str = "$s.length is ${s.length}" // evaluates to "abc.length is 3"
"""

模板在原始字符串和转义字符串中都支持。如果你在原始字符串中需要表示美元符号的字面值,可以像下面这样使用。

1
2
3
val price = """
${'$'}9.99
"""

RabbitMQ是一个常用的消息队列中间件,它能接收并转发消息。在消息通信模型中,系统可分为三部分:生产者,消息服务器和消费者。简单地说,生产者程序产生消息,发布到消息服务器;消费者程序连接到消息服务器,订阅到队列中。每当消息到达特定等队列时,RabbitMQ会将其发送给其中一个订阅/监听的消费者。
在RabbitMQ中,生产者并不是直接将消息交给某个消息队列的,而是将消息发送给交换器,发送时会指定投递的规则,这些规则称为路由键。交换器中最简单等一种就是direct交换器。它是一个以空白字符串为名称等默认交换器。当声明一个队列时,默认绑定到direct交换器。direct交换器的规则非常简单:如果路由键(队列名)匹配的话,消息就被投递到对应等队列。如下图所示

Java客户端

Java是当今流行的服务器应用程序开发语言。RabbitMQ也提供了Java的客户端SDK。用Java实现direct交换器中的生产者示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.wts.helloworld;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* Created by weitaosheng on 2017/5/1.
*/
public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
}

创建一个消息生产者等步骤:

  1. 新建连接工厂;
  2. 绑定RabbitMQ服务器地址(默认为localhost,端口5672);
  3. 获取连接;
  4. 建立频道(channel);
  5. 声明队列;
  6. 发布消息。
  7. 关闭频道;
  8. 关闭连接。

客户端示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* Created by weitaosheng on 2017/5/1.
*/
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for message. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}

创建客户端并订阅队列的步骤:

  1. 新建连接工厂;
  2. 绑定RabbitMQ服务器地址(默认为localhost,端口5672);
  3. 获取连接;
  4. 建立频道(channel);
  5. 声明队列;
  6. 声明一个消息消费者,本示例中是用一个匿名内部类来扩展了DefaltConsumer类,并重写了handleDelivery方法,该方法定义了接收到消息时进行等处理。

以下是客户端SDK关键的类和方法:
ConnectionFactory:连接工厂关键类,是客户端访问RabbitMQ服务器必须要先构造的类。从Connection的源代码可以看出,ConnectionFactory没有显示定义构造函数,因此使用的是编译器自动生成的默认构造函数。
获取连接的函数

1
2
3
4
public Connection newConnection() throws IOException, TimeoutException {
return newConnection(this.sharedExecutor, Collections.singletonList(new Address(getHost(), getPort())));
}

查看源码,sharedExecutor是一个ExecutorService的实例。此处为null,也就是没有使用线程池。连接时还需要地址和端口。

1
Channel createChannel() throws IOException;

createChannel:生成一个Channel类的实例。Connection是一条真实的TCP连接,Channel是Connection内等一条虚拟连接,它在RabbitMQ中会自动生成唯一的ID。通过一条TCP连接内多个虚拟连接的方式,可以提高性能,节约系统资源,因为TCP连接的创建和销毁是非常昂贵的。
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map arguments) throws IOException;
queueDeclare:声明队列。它的几个参数的含义是:队列名,是否持久化,是否是限制性等队列(仅限于此次连接),是否自动删除消息,其他参数

1
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;

basicPublish:发布消息,几个参数是:交换器名,路由键,是否设置了mandatory参数,消息参数,消息体(用byte数组装载)

Consumer是代表消费者等接口,DefaultConsumer是Consumer接口的一个默认实现。一般定义消费者只需扩展这个类即可。

1
2
3
4
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)

handleDelivery:消费者接收到消息时的回调方法。它的几个参数是:消费者标签,『信封』信息,消息头内容,消息体

在Java中,每一个线程都有一个内部锁。当我们使用synchronized关键字时,就是利用这个内部锁来实现线程对某个对象的锁定控制。
那么,如果某个对象中有两个方法,方法一和方法二都使用了synchronized关键字。如果线程一执行该对象的方法一,线程二执行该对象的方法二。如果线程1一直不释放该对象的内部锁的话,那么线程二应该无法执行该对象的方法二。下面就用代码来验证一下。
首先,定义一个Task类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package corejava.chapter14;
public class TaskC {
public synchronized void method1() throws InterruptedException {
while (true) {
System.out.println("我是方法1");
Thread.sleep(10000);
}
}
public synchronized void method2() throws InterruptedException {
while (true) {
System.out.println("我是方法2");
Thread.sleep(10000);
}
}
}

method1每隔10秒就会输出一句“我是方法1”。method2每隔10秒就会输出一句“我是方法2”。两个方法都使用了synchronized关键字。

再定义两个执行类ExecutorC1和Executor2。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package corejava.chapter14;
public class ExecutorC1 implements Runnable {
private TaskC taskC;
public ExecutorC1(TaskC taskC) {
this.taskC = taskC;
}
@Override
public void run() {
try {
taskC.method1();
} catch (Exception e) {
e.printStackTrace();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package corejava.chapter14;
public class ExecutorC2 implements Runnable {
private TaskC taskC;
public ExecutorC2(TaskC taskC) {
this.taskC = taskC;
}
@Override
public void run() {
try {
taskC.method2();
} catch (Exception e) {
e.printStackTrace();
}
}
}

执行对象1和执行对象2都使用同一个taskC去实例化,这样他们之间就会存在竞争。当thread1执行taskC的method1时,会一直持有taskC的内部锁,因此thread2将一直处于阻塞状态。

下面是运行输出结果

可见,确实如我们所料,虽然两个线程要执行的是不同的方法,但因为线程1一直“霸占”着内部锁,所以线程2始终在阻塞状态,得不到执行。

所以,如果我们在对某个类要使用synchronized关键字进行锁定控制时,一定要牢记它使用的是该类的内部锁。它是唯一的。当多个方法都用它来进行控制时,一定要考虑这一点。