Dubbo的同步调用
使用Dubbo很长时间了,基本使用的都是Dubbo的同步调用方式。Dubbo默认使用的是协议格式Dubbo协议,序列化方式是Hessian,传输层使用的是Tcp协议,默认使用netty框架。
Dubbo的同步调用是指,调用一个远程服务时,发起调用这个线程会一直阻塞(至少从使用者的感觉来说是这样的)到远程服务返回结果,或是超时为止。
那么Dubbo是如何实现这种同步调用机制的呢?我们知道RPC的本质是用TCP协议向另一个进程(可能在同一台机器上,也可能不在)发送请求,服务端进程接收到请求后,将请求解析为对本地方法的一个调用,调用之后将结果返回给客户端。客户端接收到返回之后,再将结果反序列化为需要的结果,返回给上层的业务。
Dubbo同步调用在客户端的实现
Dubbo的整个调用流程是:
1.服务提供者导出服务到本机的IP和端口上,即是建立一个TCP服务器;
2.服务提供者发布服务到注册中心;
3.服务消费者向注册中心订阅服务提供者的信息;
4.服务消费者建立远程服务的代理,并建立一个TCP客户端。
以上几步分别是服务提供者和服务消费者启动时做的事情,这之后消费者和提供者之间就建立了TCP长连接,实际调用服务时是无需经过注册中心的,直接通过TCP连接完成。
下面就来看一下消费者是如何发起到提供者的请求,以及如何实现对上层业务而言的同步调用,以及超时机制等。
Dubbo消费者在发起调用之后,经过重重阻隔,经历了各种包装的代理类之后(暂时忽略集群容错和负载均衡等),最终会到达下面这个类:
1 2 3 4 5 6
| public class DubboInvoker<T> extends AbstractInvoker<T> { @Override protected Result doInvoke(final Invocation invocation) throws Throwable { return (Result) currentClient.request(inv, timeout).get();
|
最后这个return语句返回的就是最终调用返回的结果。再进入这个request方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| final class HeaderExchangeChannel implements ExchangeChannel { @Override public ResponseFuture request(Object request, int timeout) throws RemotingException { if (closed) { throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!"); } Request req = new Request(); req.setVersion(Version.getProtocolVersion()); req.setTwoWay(true); req.setData(request); DefaultFuture future = new DefaultFuture(channel, req, timeout); try { channel.send(req); } catch (RemotingException e) { future.cancel(); throw e; } return future; }
|
我们看到,在这个方法中,先是创建了发送给提供者服务的请求request,之后创建了一个DefaultFuture的类。最好就调用channel将请求发送出去(此处的channel是dubbo定义的一个类的实例,但内部包装了netty的channel)。
我们看到,调用channel的send方法发送请求,但这个方法并没有返回值。而是在下方直接返回了这个DefaultFuture类的实例。
再回到上面DubboInvoke中,在request返回了DefaultFuture实例之后,就调用get方法去获取调用结果。因此再看get方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| @Override public Object get() throws RemotingException { return get(timeout); } @Override public Object get(int timeout) throws RemotingException { if (timeout <= 0) { timeout = Constants.DEFAULT_TIMEOUT; } if (!isDone()) { long start = System.currentTimeMillis(); lock.lock(); try { while (!isDone()) { done.await(timeout, TimeUnit.MILLISECONDS); if (isDone() || System.currentTimeMillis() - start > timeout) { break; } } } catch (InterruptedException e) { throw new RuntimeException(e); } finally { lock.unlock(); } if (!isDone()) { throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false)); } } return returnFromResponse(); }
|
到这个方法中就清除了,首先判断一下调用是否有返回结果,如果还没有返回,就先记录一下当前时间,之后用一个可重入锁将代码块加锁。之后在一个while循环条件中判断调用是否有返回。若没有返回就直接将线程进入等待状态,只有另一个线程向它发信号,才能唤醒它继续执行。唤醒之后,再次判断是否有返回,或是已经超时,则跳出循环。之后判断若没有返回,则已经超时。若有返回结果,则正常返回结果。
那么哪里有一个线程会来唤醒这个调用线程呢?
答案就是netty提供的机制,netty客户端在发送请求给服务端,当有结果返回时,会触发事先注册的处理器,将请求通知上层调用。最终会调用到DefaultFuture中的received方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| public static void received(Channel channel, Response response) { try { DefaultFuture future = FUTURES.remove(response.getId()); if (future != null) { future.doReceived(response); } else { logger.warn("The timeout response finally returned at " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) + ", response " + response + (channel == null ? "" : ", channel: " + channel.getLocalAddress() + " -> " + channel.getRemoteAddress())); } } finally { CHANNELS.remove(response.getId()); } } private void doReceived(Response res) { lock.lock(); try { response = res; if (done != null) { done.signal(); } } finally { lock.unlock(); } if (callback != null) { invokeCallback(callback); } }
|
首先是获取当前调用的DefaultFuture实例,之后调用它的doReceived方法。这个方法先加了锁,之后将返回结果赋给DefaultFuture的response(这个response是DefaultFuture的一个实例属性,也就是上层业务要获取的结果)。之后就是上面说的向线程发信号,唤醒等待中的调用线程。
以上就是Dubbo将底层的netty的异步调用转为同步调用。还有一个问题是dubbo是如何实现接口调用超时的。
答案在下面:
1 2 3 4 5 6 7 8 9 10
| public class DefaultFuture implements ResponseFuture { private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>(); static { Thread th = new Thread(new RemotingInvocationTimeoutScan(), "DubboResponseTimeoutScanTimer"); th.setDaemon(true); th.start(); }
|
DefaultFuture中用了一个FUTURES的map来保存每个具体调用的DefaultFuture实例。它的key是每次调用的id,这个id就是用来区分不同的调用的,每个请求都不一样。之后在初始化这个类时,启动一个守护线程,它就是用来实现超时的。它的run方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| private static class RemotingInvocationTimeoutScan implements Runnable { @Override public void run() { while (true) { try { for (DefaultFuture future : FUTURES.values()) { if (future == null || future.isDone()) { continue; } if (System.currentTimeMillis() - future.getStartTimestamp() > future.getTimeout()) { Response timeoutResponse = new Response(future.getId()); timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT); timeoutResponse.setErrorMessage(future.getTimeoutMessage(true)); DefaultFuture.received(future.getChannel(), timeoutResponse); } } Thread.sleep(30); } catch (Throwable e) { logger.error("Exception when scan the timeout invocation of remoting.", e); } } } }
|
逻辑很简单,就是每30毫秒轮询一次所有的DefaultFuture,若future已经有返回结果,则跳过;若没有返回,则计算一下当前时间是否超时,若超时,则构造一个超时的返回结果,之后调用上面提到的DefaultFuture的received方法。
总结
以上就是Dubbo同步调用在消费者端的大体实现。主要就是用了一个锁来将调用线程阻塞起来,之后netty接收到返回结果之后,会发信号唤醒调用线程。或是超时,也会返回一个超时的结果,并唤醒调用线程。这样就实现了对上层业务来说的同步调用。
因为Dubbo的流程非常复杂,因此本篇文章限于篇幅,无法将它的上下游流程勾勒得很清楚。只是提纲挈领地写出关键之处。对于整体流程的把握,还是需要自己去研究代码才行。