Contents
  1. 1. 从一个问题说起
  2. 2. FutureTask的使用
  3. 3. FutureTask源码解析
  4. 4. 总结

从一个问题说起

假设有4个方法,可以获取四个值,这四个值可以用来构造一个用户的实例。如果要写一个方法,并发获取这4个值。

这个题目应该用Future来做。下面是我模拟的程序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
/**
* 服务类,有4个方法
*/
class Service {
public int get1() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
}
public int get2() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 2;
}
public int get3() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 3;
}
public int get4() {
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 4;
}
}
public class FutureTaskTest {
public static void main(String[] args) {
Service service = new Service();
/** 记录开始执行时间 */
long start = System.currentTimeMillis();
/**
* 用4个FutureTask来并行执行service的4个方法, 执行完之后一并
* 获取执行结果
*/
FutureTask<Integer> futureTask1 = new FutureTask<>(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return service.get1();
}
});
Thread thread1 = new Thread(futureTask1);
thread1.start();
FutureTask<Integer> futureTask2 = new FutureTask<>(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return service.get2();
}
});
Thread thread2 = new Thread(futureTask2);
thread2.start();
FutureTask<Integer> futureTask3 = new FutureTask<>(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return service.get3();
}
});
Thread thread3 = new Thread(futureTask3);
thread3.start();
FutureTask<Integer> futureTask4 = new FutureTask<>(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return service.get4();
}
});
Thread thread4 = new Thread(futureTask4);
thread4.start();
try {
int a = futureTask1.get();
int b = futureTask2.get();
int c = futureTask3.get();
int d = futureTask4.get();
long end = System.currentTimeMillis();
System.out.println(end - start);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}

执行结果,打印执行时间为4007毫秒,与预期相同,也就是并行执行4个任务中耗时最长的一个。

下面就来分析FutureTask的原理。

FutureTask的使用

Java中,使用线程执行任务一般实现Runnable接口。它只有一个run方法,没有返回值。因此当主线程需要用一个子线程去执行某个任务,并返回某个结果时,必须采用其他的方法。比如在Thread中添加一个实例,然后运行run方法时去修改这个值。但这有一个问题,就是主线程并不知道子线程何时执行结束。因此又引出其他的问题。显然仅靠Runnable并不是解决这类问题的好方法。

因此,Java提供了Callable接口,它代表一个能返回执行结果的任务。它的定义如下:

1
2
3
4
5
6
7
8
9
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}

这个接口有一个参数类型V,同时只有一个方法call。当call方法执行完之后,会返回一个V类型的实例。

但是我们遍览jdk的类库,发现并没有一个public的类是实现了Callable接口的。因此它是一个完全由用户实现的接口,必须搭配其他的接口和类才能使用。

于是java又提供了Future接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public interface Future<V> {
/**
* 取消任务执行,参数代表如果任务已经开始执行,是否要中断
*/
boolean cancel(boolean mayInterruptIfRunning);
/**
* 判断任务是否已经取消
*/
boolean isCancelled();
/**
* 判断任务是否执行完成
*/
boolean isDone();
/**
* 获取线程执行的结果
*/
V get() throws InterruptedException, ExecutionException;
/**
* 获取线程执行结果,并设置一个超时时间,若超过了则抛出异常
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

这个接口的注释说:这个接口代表了一次异步计算的结果。所谓异步计算,也就是代表一个线程。

FutureTask源码解析

通过上面的例子,我们可以通过推理和验证的方式来研究FutureTask的源码实现。首先FutureTask能作为参数传递给Thread类来实例化一个线程,说明它实现了Runnable接口。同时它也实现了Future接口,否则我们这篇文章也不会写它了。

于是我们看FutureTask的定义头:

1
public class FutureTask<V> implements RunnableFuture<V> {

而RunnableFuture的定义头:

1
public interface RunnableFuture<V> extends Runnable, Future<V>

果如所料。

再从上面的例子中可以看出,FutureTask的构造方法的参数是一个Callable接口。那说明它内部就是通过一个Callable实例来执行具体的逻辑的,并且Callable的实现类是由用户提供的。FutureTask的构造方法如下:

1
2
3
4
5
6
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}

从构造方法可以看出,FutureTask还有一个实例属性是state,并且在构造方法中被初始化为NEW。这个属性和可能的值定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 任务运行状态,初始化为NEW,这个状态变量只会在set方法、setException和cancel方法中被转化为终态
*
* Possible state transitions:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;

详细的状态转化在下面方法中介绍。

构造了FutureTask之后,就需要用一个Thread去执行它。Thread执行它持有的Runnable实例的run方法。那么我们看一下FutureTask的run方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public void run() {
// 若state不为NEW,或者不能将执行此FutureTask的线程赋值给FutureTask的Thread属性,则放弃执行
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
// 再次判断state必须为NEW才执行
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 执行Callable方法,获取结果
result = c.call();
ran = true;
} catch (Throwable ex) {
// 捕获异常时处理异常
result = null;
ran = false;
setException(ex);
}
// 执行正常结束时设置结果
if (ran)
set(result);
}
} finally {
// runner在将state设置为能阻止对run的并发调用前必须保持非null的值
runner = null;
// 再次读取state,若为正在中断中,或已中断,则进行相应处理
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}

进入run方法时,用了Oracle的私有api Unsafe的方法compareAndSwapObject来线程安全地将FutureTask持有的Thread实例初始化为当前执行的线程(也就是被主线程开启的子线程本身)。这个方法采用了CAS(Compare and Swap),也就是要修改的参数必须符合预期的值才能给它赋新值。这里是为了确保只有一个子线程能执行FutureTask,以避免并发执行带来的问题。

当执行完之后,无论是正常还是捕获到异常,都需要将runner置为null。而再次读state,则是若runner被中断了,需要进行相应处理。具体处理在后面介绍。

下面我们看get方法。get方法的作用是获取执行的结果。在看代码之前,我们可以设想,虽然FutureTask的执行只能由一个线程来完成。但是应该是允许多个线程来并发获取结果的。那它必然有某种数据结构来保存读取结果的线程。说这么多,其实这种数据结构就是WaitNode:

1
2
3
4
5
6
7
8
9
10
11
12
13
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;
/**
* 记录等待线程(即是读取执行结果的线程)的简单链表
*
*/
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}

get方法:

1
2
3
4
5
6
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}

首先判断state,若处于完成中,或者NEW。则调用awaitDone。否则直接调用report返回结果。

在我们的例子中看到,当在主线程调用FutureTask的get的时候,会一直等待到子线程将任务执行完成,再将结果返回给主线程。这个等待过程就是通过awaitDone方法来完成的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/**
* 等待完成,若已中断或超时则终止
*
* @param timed true if use timed waits
* @param nanos time to wait, if timed
* @return state upon completion
*/
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
// 若线程已中断,则从等待链表中移除当前线程节点
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
// 若非NEW或完成中状态,则将节点的线程置为null,并返回当前状态
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
// 若为完成中状态,则当前线程让出CPU执行时间,待下次竞争到CPU时再判断是否完成
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
// 若state为NEW,则新建等待队列节点
else if (q == null)
q = new WaitNode();
// 若当前线程所属的等待节点还没有插入链表,则插入到链表头
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
// 如果考虑超时时间,则计算时间并进行处理
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}

从上述代码看出,awaitDone的逻辑也不是很复杂,主要就是根据state的各种状态,进行新建等待节点,节点入等待列表等操作。

report方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* 任务完成时返回结果或抛出异常
*
* @param s completed state value
*/
@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}

outcome就是要返回的执行结果。那么它的值是在哪里赋的呢?就是我们前面提到过的set方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
*
* 若执行结果还没有赋值,或被取消时,设置执行结果
*
* @param v the value
*/
protected void set(V v) {
// 先将state从NEW转化为完成中
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
// 将state从NEW转化为正常结束
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}

当执行Callable的call方法发生异常时,会调用setException方法。

1
2
3
4
5
6
7
8
9
protected void setException(Throwable t) {
// 先将state从NEW转化为完成中
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
// 将state从NEW转化为发生异常
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}

finishCompletion主要完成一些扫尾工作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}

此方法的逻辑也很简单,就是遍历等待列表,将每一个节点都从列表删除,同时将每个等待线程解除阻塞。

以上分析了任务执行完成的情况,还有一种情况是执行被取消。它通过cancel方法来实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public boolean cancel(boolean mayInterruptIfRunning) {
// 此条件等价于: state != NEW || !UNSAFE.compareAndSwapInt(this, stateOffset, NEW,mayInterruptIfRunning ? INTERRUPTING : CANCELLED)
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}

当state为NEW时,返回fasle。若为NEW,则根据传入的参数,若传入true,则将state转化为正在中断,否则转化为已取消。

若传入参数为true,取消时还需中断执行任务的线程。之后将state设置为已中断。

总结

以上就是FutureTask的简单使用和源码解析。除了直接使用新建线程来执行FutureTask,还可以使用线程池,本文没有对此进行介绍。

分析FutureTask的源码,发现还是比较简单的。主要就是通过一个内部持有的Thread来执行任务,同时用一个等待列表来存放等待获取结果的线程。当等待线程尝试用get获取结果时,会先判断是否为完成中以后的状态,若是则直接返回结果。否则就一直轮询,等待状态更新。最后执行完成之后,则遍历等待列表,将所有等待线程都解除阻塞。当然还有处理异常等情况,此处不再分析。

Contents
  1. 1. 从一个问题说起
  2. 2. FutureTask的使用
  3. 3. FutureTask源码解析
  4. 4. 总结