Contents
  1. 1. Dubbo的同步调用
  2. 2. Dubbo同步调用在客户端的实现
  3. 3. 总结

Dubbo的同步调用

使用Dubbo很长时间了,基本使用的都是Dubbo的同步调用方式。Dubbo默认使用的是协议格式Dubbo协议,序列化方式是Hessian,传输层使用的是Tcp协议,默认使用netty框架。

Dubbo的同步调用是指,调用一个远程服务时,发起调用这个线程会一直阻塞(至少从使用者的感觉来说是这样的)到远程服务返回结果,或是超时为止。

那么Dubbo是如何实现这种同步调用机制的呢?我们知道RPC的本质是用TCP协议向另一个进程(可能在同一台机器上,也可能不在)发送请求,服务端进程接收到请求后,将请求解析为对本地方法的一个调用,调用之后将结果返回给客户端。客户端接收到返回之后,再将结果反序列化为需要的结果,返回给上层的业务。

Dubbo同步调用在客户端的实现

Dubbo的整个调用流程是:
1.服务提供者导出服务到本机的IP和端口上,即是建立一个TCP服务器;
2.服务提供者发布服务到注册中心;
3.服务消费者向注册中心订阅服务提供者的信息;
4.服务消费者建立远程服务的代理,并建立一个TCP客户端。

以上几步分别是服务提供者和服务消费者启动时做的事情,这之后消费者和提供者之间就建立了TCP长连接,实际调用服务时是无需经过注册中心的,直接通过TCP连接完成。

下面就来看一下消费者是如何发起到提供者的请求,以及如何实现对上层业务而言的同步调用,以及超时机制等。

Dubbo消费者在发起调用之后,经过重重阻隔,经历了各种包装的代理类之后(暂时忽略集群容错和负载均衡等),最终会到达下面这个类:

1
2
3
4
5
6
public class DubboInvoker<T> extends AbstractInvoker<T> {
// 省略大量代码
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
//省略大量代码
return (Result) currentClient.request(inv, timeout).get();

最后这个return语句返回的就是最终调用返回的结果。再进入这个request方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final class HeaderExchangeChannel implements ExchangeChannel {
// 省略大量代码
@Override
public ResponseFuture request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// create request.
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = new DefaultFuture(channel, req, timeout);
try {
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}

我们看到,在这个方法中,先是创建了发送给提供者服务的请求request,之后创建了一个DefaultFuture的类。最好就调用channel将请求发送出去(此处的channel是dubbo定义的一个类的实例,但内部包装了netty的channel)。

我们看到,调用channel的send方法发送请求,但这个方法并没有返回值。而是在下方直接返回了这个DefaultFuture类的实例。

再回到上面DubboInvoke中,在request返回了DefaultFuture实例之后,就调用get方法去获取调用结果。因此再看get方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Override
public Object get() throws RemotingException {
return get(timeout);
}
@Override
public Object get(int timeout) throws RemotingException {
if (timeout <= 0) {
timeout = Constants.DEFAULT_TIMEOUT;
}
if (!isDone()) {
long start = System.currentTimeMillis();
lock.lock();
try {
while (!isDone()) {
done.await(timeout, TimeUnit.MILLISECONDS);
if (isDone() || System.currentTimeMillis() - start > timeout) {
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
if (!isDone()) {
throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
}
}
return returnFromResponse();
}

到这个方法中就清除了,首先判断一下调用是否有返回结果,如果还没有返回,就先记录一下当前时间,之后用一个可重入锁将代码块加锁。之后在一个while循环条件中判断调用是否有返回。若没有返回就直接将线程进入等待状态,只有另一个线程向它发信号,才能唤醒它继续执行。唤醒之后,再次判断是否有返回,或是已经超时,则跳出循环。之后判断若没有返回,则已经超时。若有返回结果,则正常返回结果。

那么哪里有一个线程会来唤醒这个调用线程呢?

答案就是netty提供的机制,netty客户端在发送请求给服务端,当有结果返回时,会触发事先注册的处理器,将请求通知上层调用。最终会调用到DefaultFuture中的received方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public static void received(Channel channel, Response response) {
try {
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
future.doReceived(response);
} else {
logger.warn("The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
+ ", response " + response
+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()
+ " -> " + channel.getRemoteAddress()));
}
} finally {
CHANNELS.remove(response.getId());
}
}
private void doReceived(Response res) {
lock.lock();
try {
response = res;
if (done != null) {
done.signal();
}
} finally {
lock.unlock();
}
if (callback != null) {
invokeCallback(callback);
}
}

首先是获取当前调用的DefaultFuture实例,之后调用它的doReceived方法。这个方法先加了锁,之后将返回结果赋给DefaultFuture的response(这个response是DefaultFuture的一个实例属性,也就是上层业务要获取的结果)。之后就是上面说的向线程发信号,唤醒等待中的调用线程。

以上就是Dubbo将底层的netty的异步调用转为同步调用。还有一个问题是dubbo是如何实现接口调用超时的。

答案在下面:

1
2
3
4
5
6
7
8
9
10
public class DefaultFuture implements ResponseFuture {
private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>();
static {
Thread th = new Thread(new RemotingInvocationTimeoutScan(), "DubboResponseTimeoutScanTimer");
th.setDaemon(true);
th.start();
}

DefaultFuture中用了一个FUTURES的map来保存每个具体调用的DefaultFuture实例。它的key是每次调用的id,这个id就是用来区分不同的调用的,每个请求都不一样。之后在初始化这个类时,启动一个守护线程,它就是用来实现超时的。它的run方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private static class RemotingInvocationTimeoutScan implements Runnable {
@Override
public void run() {
while (true) {
try {
for (DefaultFuture future : FUTURES.values()) {
if (future == null || future.isDone()) {
continue;
}
if (System.currentTimeMillis() - future.getStartTimestamp() > future.getTimeout()) {
// create exception response.
Response timeoutResponse = new Response(future.getId());
// set timeout status.
timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);
timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));
// handle response.
DefaultFuture.received(future.getChannel(), timeoutResponse);
}
}
Thread.sleep(30);
} catch (Throwable e) {
logger.error("Exception when scan the timeout invocation of remoting.", e);
}
}
}
}

逻辑很简单,就是每30毫秒轮询一次所有的DefaultFuture,若future已经有返回结果,则跳过;若没有返回,则计算一下当前时间是否超时,若超时,则构造一个超时的返回结果,之后调用上面提到的DefaultFuture的received方法。

总结

以上就是Dubbo同步调用在消费者端的大体实现。主要就是用了一个锁来将调用线程阻塞起来,之后netty接收到返回结果之后,会发信号唤醒调用线程。或是超时,也会返回一个超时的结果,并唤醒调用线程。这样就实现了对上层业务来说的同步调用。
因为Dubbo的流程非常复杂,因此本篇文章限于篇幅,无法将它的上下游流程勾勒得很清楚。只是提纲挈领地写出关键之处。对于整体流程的把握,还是需要自己去研究代码才行。

Contents
  1. 1. Dubbo的同步调用
  2. 2. Dubbo同步调用在客户端的实现
  3. 3. 总结