从一个问题说起 假设有4个方法,可以获取四个值,这四个值可以用来构造一个用户的实例。如果要写一个方法,并发获取这4个值。
这个题目应该用Future来做。下面是我模拟的程序。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
* 服务类,有4个方法
*/
class Service {
public int get1 () {
try {
Thread.sleep(1000 );
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1 ;
}
public int get2 () {
try {
Thread.sleep(2000 );
} catch (InterruptedException e) {
e.printStackTrace();
}
return 2 ;
}
public int get3 () {
try {
Thread.sleep(3000 );
} catch (InterruptedException e) {
e.printStackTrace();
}
return 3 ;
}
public int get4 () {
try {
Thread.sleep(4000 );
} catch (InterruptedException e) {
e.printStackTrace();
}
return 4 ;
}
}
public class FutureTaskTest {
public static void main (String[] args) {
Service service = new Service();
long start = System.currentTimeMillis();
* 用4个FutureTask来并行执行service的4个方法, 执行完之后一并
* 获取执行结果
*/
FutureTask<Integer> futureTask1 = new FutureTask<>(new Callable<Integer>() {
@Override
public Integer call () throws Exception {
return service.get1();
}
});
Thread thread1 = new Thread(futureTask1);
thread1.start();
FutureTask<Integer> futureTask2 = new FutureTask<>(new Callable<Integer>() {
@Override
public Integer call () throws Exception {
return service.get2();
}
});
Thread thread2 = new Thread(futureTask2);
thread2.start();
FutureTask<Integer> futureTask3 = new FutureTask<>(new Callable<Integer>() {
@Override
public Integer call () throws Exception {
return service.get3();
}
});
Thread thread3 = new Thread(futureTask3);
thread3.start();
FutureTask<Integer> futureTask4 = new FutureTask<>(new Callable<Integer>() {
@Override
public Integer call () throws Exception {
return service.get4();
}
});
Thread thread4 = new Thread(futureTask4);
thread4.start();
try {
int a = futureTask1.get();
int b = futureTask2.get();
int c = futureTask3.get();
int d = futureTask4.get();
long end = System.currentTimeMillis();
System.out.println(end - start);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
执行结果,打印执行时间为4007毫秒,与预期相同,也就是并行执行4个任务中耗时最长的一个。
下面就来分析FutureTask的原理。
FutureTask的使用 Java中,使用线程执行任务一般实现Runnable接口。它只有一个run方法,没有返回值。因此当主线程需要用一个子线程去执行某个任务,并返回某个结果时,必须采用其他的方法。比如在Thread中添加一个实例,然后运行run方法时去修改这个值。但这有一个问题,就是主线程并不知道子线程何时执行结束。因此又引出其他的问题。显然仅靠Runnable并不是解决这类问题的好方法。
因此,Java提供了Callable接口,它代表一个能返回执行结果的任务。它的定义如下:1
2
3
4
5
6
7
8
9
public interface Callable <V > {
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call () throws Exception ;
}
这个接口有一个参数类型V,同时只有一个方法call。当call方法执行完之后,会返回一个V类型的实例。
但是我们遍览jdk的类库,发现并没有一个public的类是实现了Callable接口的。因此它是一个完全由用户实现的接口,必须搭配其他的接口和类才能使用。
于是java又提供了Future接口:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public interface Future <V > {
* 取消任务执行,参数代表如果任务已经开始执行,是否要中断
*/
boolean cancel (boolean mayInterruptIfRunning) ;
* 判断任务是否已经取消
*/
boolean isCancelled () ;
* 判断任务是否执行完成
*/
boolean isDone () ;
* 获取线程执行的结果
*/
V get () throws InterruptedException, ExecutionException ;
* 获取线程执行结果,并设置一个超时时间,若超过了则抛出异常
*/
V get (long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
这个接口的注释说:这个接口代表了一次异步计算的结果。所谓异步计算,也就是代表一个线程。
FutureTask源码解析 通过上面的例子,我们可以通过推理和验证的方式来研究FutureTask的源码实现。首先FutureTask能作为参数传递给Thread类来实例化一个线程,说明它实现了Runnable接口。同时它也实现了Future接口,否则我们这篇文章也不会写它了。
于是我们看FutureTask的定义头:1
public class FutureTask <V > implements RunnableFuture <V > {
而RunnableFuture的定义头:1
public interface RunnableFuture <V > extends Runnable , Future <V >
果如所料。
再从上面的例子中可以看出,FutureTask的构造方法的参数是一个Callable接口。那说明它内部就是通过一个Callable实例来执行具体的逻辑的,并且Callable的实现类是由用户提供的。FutureTask的构造方法如下:
1
2
3
4
5
6
public FutureTask (Callable<V> callable) {
if (callable == null )
throw new NullPointerException();
this .callable = callable;
this .state = NEW;
}
从构造方法可以看出,FutureTask还有一个实例属性是state,并且在构造方法中被初始化为NEW。这个属性和可能的值定义如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
* 任务运行状态,初始化为NEW,这个状态变量只会在set方法、setException和cancel方法中被转化为终态
*
* Possible state transitions:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
private volatile int state;
private static final int NEW = 0 ;
private static final int COMPLETING = 1 ;
private static final int NORMAL = 2 ;
private static final int EXCEPTIONAL = 3 ;
private static final int CANCELLED = 4 ;
private static final int INTERRUPTING = 5 ;
private static final int INTERRUPTED = 6 ;
详细的状态转化在下面方法中介绍。
构造了FutureTask之后,就需要用一个Thread去执行它。Thread执行它持有的Runnable实例的run方法。那么我们看一下FutureTask的run方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public void run () {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this , runnerOffset,
null , Thread.currentThread()))
return ;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true ;
} catch (Throwable ex) {
result = null ;
ran = false ;
setException(ex);
}
if (ran)
set(result);
}
} finally {
runner = null ;
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
进入run方法时,用了Oracle的私有api Unsafe的方法compareAndSwapObject来线程安全地将FutureTask持有的Thread实例初始化为当前执行的线程(也就是被主线程开启的子线程本身)。这个方法采用了CAS(Compare and Swap),也就是要修改的参数必须符合预期的值才能给它赋新值。这里是为了确保只有一个子线程能执行FutureTask,以避免并发执行带来的问题。
当执行完之后,无论是正常还是捕获到异常,都需要将runner置为null。而再次读state,则是若runner被中断了,需要进行相应处理。具体处理在后面介绍。
下面我们看get方法。get方法的作用是获取执行的结果。在看代码之前,我们可以设想,虽然FutureTask的执行只能由一个线程来完成。但是应该是允许多个线程来并发获取结果的。那它必然有某种数据结构来保存读取结果的线程。说这么多,其实这种数据结构就是WaitNode:1
2
3
4
5
6
7
8
9
10
11
12
13
private volatile WaitNode waiters;
* 记录等待线程(即是读取执行结果的线程)的简单链表
*
*/
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
get方法:1
2
3
4
5
6
public V get () throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false , 0L );
return report(s);
}
首先判断state,若处于完成中,或者NEW。则调用awaitDone。否则直接调用report返回结果。
在我们的例子中看到,当在主线程调用FutureTask的get的时候,会一直等待到子线程将任务执行完成,再将结果返回给主线程。这个等待过程就是通过awaitDone方法来完成的。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
* 等待完成,若已中断或超时则终止
*
* @param timed true if use timed waits
* @param nanos time to wait, if timed
* @return state upon completion
*/
private int awaitDone (boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L ;
WaitNode q = null ;
boolean queued = false ;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
if (q != null )
q.thread = null ;
return s;
}
else if (s == COMPLETING)
Thread.yield();
else if (q == null )
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this , waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L ) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this , nanos);
}
else
LockSupport.park(this );
}
}
从上述代码看出,awaitDone的逻辑也不是很复杂,主要就是根据state的各种状态,进行新建等待节点,节点入等待列表等操作。
report方法:1
2
3
4
5
6
7
8
9
10
11
12
13
14
* 任务完成时返回结果或抛出异常
*
* @param s completed state value
*/
@SuppressWarnings ("unchecked" )
private V report (int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
outcome就是要返回的执行结果。那么它的值是在哪里赋的呢?就是我们前面提到过的set方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
*
* 若执行结果还没有赋值,或被取消时,设置执行结果
*
* @param v the value
*/
protected void set (V v) {
if (UNSAFE.compareAndSwapInt(this , stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this , stateOffset, NORMAL);
finishCompletion();
}
}
当执行Callable的call方法发生异常时,会调用setException方法。1
2
3
4
5
6
7
8
9
protected void setException (Throwable t) {
if (UNSAFE.compareAndSwapInt(this , stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this , stateOffset, EXCEPTIONAL);
finishCompletion();
}
}
finishCompletion主要完成一些扫尾工作。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private void finishCompletion () {
for (WaitNode q; (q = waiters) != null ;) {
if (UNSAFE.compareAndSwapObject(this , waitersOffset, q, null )) {
for (;;) {
Thread t = q.thread;
if (t != null ) {
q.thread = null ;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null )
break ;
q.next = null ;
q = next;
}
break ;
}
}
done();
callable = null ;
}
此方法的逻辑也很简单,就是遍历等待列表,将每一个节点都从列表删除,同时将每个等待线程解除阻塞。
以上分析了任务执行完成的情况,还有一种情况是执行被取消。它通过cancel方法来实现。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public boolean cancel (boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this , stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false ;
try {
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null )
t.interrupt();
} finally {
UNSAFE.putOrderedInt(this , stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true ;
}
当state为NEW时,返回fasle。若为NEW,则根据传入的参数,若传入true,则将state转化为正在中断,否则转化为已取消。
若传入参数为true,取消时还需中断执行任务的线程。之后将state设置为已中断。
总结 以上就是FutureTask的简单使用和源码解析。除了直接使用新建线程来执行FutureTask,还可以使用线程池,本文没有对此进行介绍。
分析FutureTask的源码,发现还是比较简单的。主要就是通过一个内部持有的Thread来执行任务,同时用一个等待列表来存放等待获取结果的线程。当等待线程尝试用get获取结果时,会先判断是否为完成中以后的状态,若是则直接返回结果。否则就一直轮询,等待状态更新。最后执行完成之后,则遍历等待列表,将所有等待线程都解除阻塞。当然还有处理异常等情况,此处不再分析。