疯狂的小鸡

多线程-java.util.Concurrent

字数统计: 6.8k阅读时长: 26 min
2018/10/11 Share

Java 5 添加了一个新的包到 Java 平台,java.util.concurrent 包。这个包包含有一系列能够让 Java 的并发编程变得更加简单轻松的类。

BlockingQueue

BlockingQueue 具有 4 组不同的方法用于插入、移除以及对队列中的元素进行检查。如果请求的操作不能得到立即执行的话,每个方法的表现也不同。这些方法如下:
BlockingQueue
四组不同的行为方式解释:

  • 抛异常:如果试图的操作无法立即执行,抛一个异常。

  • 特定值:如果试图的操作无法立即执行,返回一个特定的值(常常是 true / false)。

  • 阻塞:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。

  • 超时:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。

BlockingQueue 是个接口,你需要使用它的实现之一来使用 BlockingQueue。java.util.concurrent 具有以下 BlockingQueue 接口的实现:

ArrayBlockingQueue

ArrayBlockingQueue 是一个有界的阻塞队列,其内部实现是将对象放到一个数组里。有界也就意味着,它不能够存储无限多数量的元素。它有一个同一时间能够存储元素数量的上限。你可以在对其初始化的时候设定这个上限,但之后就无法对这个上限进行修改了。

ArrayBlockingQueue 内部以 FIFO(先进先出)的顺序对元素进行存储。队列中的头元素在所有元素之中是放入时间最久的那个,而尾元素则是最短的那个。

DelayQueue

DelayQueue 对元素进行持有直到一个特定的延迟到期。注入其中的元素必须实现 java.util.concurrent.Delayed 接口,接口如下:

1
2
3
public interface Delayed extends Comparable<Delayed< {
public long getDelay(TimeUnit timeUnit);
}

其中TimeUnit是一个枚举类型,它表明了将要延迟的时间段。

DelayQueue 将会在每个元素的 getDelay() 方法返回的值的时间段之后才释放掉该元素。如果返回的是 0 或者负值,延迟将被认为过期,该元素将会在 DelayQueue 的下一次 take 被调用的时候被释放掉。

LinkedBlockingQueue

LinkedBlockingQueue 内部以一个链式结构(链接节点)对其元素进行存储。如果需要的话,这一链式结构可以选择一个上限。如果没有定义上限,将使用 Integer.MAX_VALUE 作为上限。LinkedBlockingQueue 内部以 FIFO(先进先出)的顺序对元素进行存储。

PriorityBlockingQueue

PriorityBlockingQueue 是一个无界的并发队列。它使用了和类java.util.PriorityQueue 一样的排序规则。你无法向这个队列中插入 null 值。

所有插入到 PriorityBlockingQueue 的元素必须实现 java.lang.Comparable 接口。因此该队列中元素的排序就取决于你自己的 Comparable 实现。

注: PriorityBlockingQueue 对于具有相等优先级(compare() == 0)的元素并不强制任何特定行为。

注2:,如果你从一个 PriorityBlockingQueue 获得一个 Iterator 的话,该 Iterator 并不能保证它对元素的遍历是以优先级为序的。

SynchronousQueue

SynchronousQueue 是一个特殊的队列,它的内部同时只能够容纳单个元素。如果该队列已有一元素的话,试图向队列中插入一个新元素的线程将会阻塞,直到另一个线程将该元素从队列中抽走。同样,如果该队列为空,试图向队列中抽取一个元素的线程将会阻塞,直到另一个线程向队列中插入了一条新的元素。

BlockingDeque

BlockingDeque 类是一个双端队列,在不能够插入元素时,它将阻塞住试图插入元素的线程;在不能够抽取元素时,它将阻塞住试图抽取的线程。

在线程既是一个队列的生产者又是这个队列的消费者的时候可以使用到 BlockingDeque。如果生产者线程需要在队列的两端都可以插入数据,消费者线程需要在队列的两端都可以移除数据,这个时候也可以使用 BlockingDeque。

BlockingDeque方法总结:
BlockingDequeue

BlockingDeque 接口继承自 BlockingQueue 接口。LinkedBlockingDeque 类是它的一个实现。

ConcurrentMap

java.util.concurrent.ConcurrentMap 接口表示了一个能够对别人的访问(插入和提取)进行并发处理的 java.util.Map。

ConcurrentMap 除了从其父接口 java.util.Map 继承来的方法之外还有一些额外的原子性方法。

实现类ConcurrentHashMap

在你从中读取对象的时候 ConcurrentHashMap 并不会把整个 Map 锁住。此外,在你向其中写入对象的时候,ConcurrentHashMap 也不会锁住整个 Map。它的内部只是把 Map 中正在被写入的部分进行锁定。具体细节可看数据结构Java Map的实现

ConcurrentNavigableMap

ConcurrentNavigableMap 是一个支持并发访问的 java.util.NavigableMap,它还能让它的子 map 具备并发访问的能力。所谓的 “子 map” 指的是诸如 headMap(),subMap(),tailMap() 之类的方法返回的 map。

headMap()

headMap(T toKey)方法返回一个包含了小于给定 toKey 的 key 的子 map。

1
2
3
4
5
ConcurrentNavigableMap map = new ConcurrentSkipListMap();
map.put("1""one");
map.put("2""two");
map.put("3""three");
ConcurrentNavigableMap headMap = map.headMap("2");

headMap 将指向一个只含有键 “1” 的 ConcurrentNavigableMap。如果你对原始 map 里的元素做了改动,这些改动将影响到子 map 中的元素,因为map 集合持有的其实只是对象的引用)。

tailMap()

tailMap(T fromKey)方法返回一个包含了不小于给定 fromKey 的 key 的子 map。

subMap()

subMap() 方法返回原始 map 中,键介于 from(包含) 和 to (不包含) 之间的子 map。

闭锁 CountDownLatch

CountDownLatch 是一个并发构造,它允许一个或多个线程等待一系列指定操作的完成。

CountDownLatch 以一个给定的数量初始化。countDown() 每被调用一次,这一数量就减一。通过调用 await() 方法之一,线程可以阻塞等待这一数量到达零。

以下是一个简单示例。Decrementer 三次调用 countDown() 之后,等待中的 Waiter 才会从 await() 调用中释放出来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
CountDownLatch latch = new CountDownLatch(3);

Waiter waiter = new Waiter(latch);
Decrementer decrementer = new Decrementer(latch);

new Thread(waiter) .start();
new Thread(decrementer).start();

Thread.sleep(4000);

public class Waiter implements Runnable{

CountDownLatch latch = null;

public Waiter(CountDownLatch latch) {
this.latch = latch;
}

public void run() {
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println("Waiter Released");
}
}

public class Decrementer implements Runnable {

CountDownLatch latch = null;

public Decrementer(CountDownLatch latch) {
this.latch = latch;
}

public void run() {

try {
Thread.sleep(1000);
this.latch.countDown();

Thread.sleep(1000);
this.latch.countDown();

Thread.sleep(1000);
this.latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

栅栏 CyclicBarrier

java.util.concurrent.CyclicBarrier 类是一种同步机制,它能够对处理一些算法的线程实现同步。换句话讲,它就是一个所有线程必须等待的一个栅栏,直到所有线程都到达这里,然后所有线程才可以继续做其他事情。
CyclicBarrier
1)创建一个 CyclicBarrier
在创建一个 CyclicBarrier 的时候你需要定义有多少线程在被释放之前等待栅栏。

1
CyclicBarrier barrier = new CyclicBarrier(2);

2)等待一个 CyclicBarrier

1
barrier.await();

当然,你也可以为等待线程设定一个超时时间。等待超过了超时时间之后,即便还没有达成 N 个线程等待 CyclicBarrier 的条件,该线程也会被释放出来。

1
barrier.await(10, TimeUnit.SECONDS);

满足以下任何条件都可以让等待 CyclicBarrier 的线程释放:

  • 最后一个线程也到达 CyclicBarrier。

  • 调用 await())当前线程被其他线程打断。

  • 其他线程调用了这个线程的 interrupt() 方法。

  • 其他等待栅栏的线程被打断。

  • 其他等待栅栏的线程因超时而被释放。

  • 外部线程调用了栅栏的 CyclicBarrier.reset() 方法。

3)CyclicBarrier 行动
CyclicBarrier 支持一个栅栏行动,栅栏行动是一个 Runnable 实例,一旦最后等待栅栏的线程抵达,该实例将被执行。你可以在 CyclicBarrier 的构造方法中将 Runnable 栅栏行动传给它。

1
2
Runnable      barrierAction = ... ;
CyclicBarrier barrier       = new CyclicBarrier(2, barrierAction);

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
Runnable barrier1Action = new Runnable() {
public void run() {
System.out.println("BarrierAction 1 executed ");
}
};
Runnable barrier2Action = new Runnable() {
public void run() {
System.out.println("BarrierAction 2 executed ");
}
};

CyclicBarrier barrier1 = new CyclicBarrier(2, barrier1Action);
CyclicBarrier barrier2 = new CyclicBarrier(2, barrier2Action);

CyclicBarrierRunnable barrierRunnable1 =
new CyclicBarrierRunnable(barrier1, barrier2);

CyclicBarrierRunnable barrierRunnable2 =
new CyclicBarrierRunnable(barrier1, barrier2);

new Thread(barrierRunnable1).start();
new Thread(barrierRunnable2).start();

public class CyclicBarrierRunnable implements Runnable{

CyclicBarrier barrier1 = null;
CyclicBarrier barrier2 = null;

public CyclicBarrierRunnable(
CyclicBarrier barrier1,
CyclicBarrier barrier2) {

this.barrier1 = barrier1;
this.barrier2 = barrier2;
}

public void run() {
try {
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() +
" waiting at barrier 1");
this.barrier1.await();

Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() +
" waiting at barrier 2");
this.barrier2.await();

System.out.println(Thread.currentThread().getName() +
" done!");

} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}

以上代码控制台输出如下:

1
2
3
4
5
6
7
8
Thread-0 waiting at barrier 1
Thread-1 waiting at barrier 1
BarrierAction 1 executed
Thread-1 waiting at barrier 2
Thread-0 waiting at barrier 2
BarrierAction 2 executed
Thread-0 done!
Thread-1 done!

交换机 Exchanger

Exchanger 类表示一种两个线程可以进行互相交换对象的会和点。常用于两个线程在内存中批量交换数据。

当一个线程到达exchange调用点时,如果它的伙伴线程此前已经调用了此方法,那么它的伙伴会被调度唤醒并与之进行对象交换,然后各自返回。如果它的伙伴还没到达交换点,那么当前线程将会被挂起,直至伙伴线程到达——完成交换正常返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
Exchanger exchanger = new Exchanger();

ExchangerRunnable exchangerRunnable1 =
new ExchangerRunnable(exchanger, "A");

ExchangerRunnable exchangerRunnable2 =
new ExchangerRunnable(exchanger, "B");

new Thread(exchangerRunnable1).start();
new Thread(exchangerRunnable2).start();

public class ExchangerRunnable implements Runnable{

Exchanger exchanger = null;
Object object = null;

public ExchangerRunnable(Exchanger exchanger, Object object) {
this.exchanger = exchanger;
this.object = object;
}

public void run() {
try {
Object previous = this.object;

this.object = this.exchanger.exchange(this.object);

System.out.println(
Thread.currentThread().getName() +
" exchanged " + previous + " for " + this.object
);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

以上程序输出:

1
2
Thread-0 exchanged A for B
Thread-1 exchanged B for A

Exchanger常用于两个线程在内存中批量交换数据。例如一个线程负责对业务明细表进行查询统计,把统计的结果放置在内存缓冲区,另一个线程负责读取缓冲区中的统计结果并插入到业务统计表中。生产者在执行新的查询统计任务填入数据到缓冲区的同时,消费者正在批量插入生产者换入的上一次产生的数据,系统的吞吐量得到平滑的提升。

信号量 Semaphore

java.util.concurrent.Semaphore 类是一个计数信号量。这就意味着它具备两个主要方法:acquire()和release()

计数信号量由一个指定数量的 “许可” 初始化。每调用一次 acquire(),一个许可会被调用线程取走。每调用一次 release(),一个许可会被返还给信号量。因此,在没有任何 release() 调用时,最多有 N 个线程能够通过 acquire() 方法,N 是该信号量初始化时的许可的指定数量。这些许可只是一个简单的计数器。这里没啥奇特的地方。

信号量主要有两种用途:

1)保护一个重要(代码)部分防止一次超过 N 个线程进入。

1
2
3
4
5
6
Semaphore semaphore = new Semaphore(1);

//critical section
semaphore.acquire();
//do something
semaphore.release();

2)在两个线程之间发送信号。
如果你将一个信号量用于在两个线程之间传送信号,通常你应该用一个线程调用 acquire() 方法,而另一个线程调用 release() 方法。

如果没有可用的许可,acquire() 调用将会阻塞,直到一个许可被另一个线程释放出来。同理,如果无法往信号量释放更多许可时,一个 release() 调用也会阻塞。

通过这个可以对多个线程进行协调。比如,如果线程 1 将一个对象插入到了一个共享列表(list)之后之后调用了 acquire(),而线程 2 则在从该列表中获取一个对象之前调用了 release(),这时你其实已经创建了一个阻塞队列。信号量中可用的许可的数量也就等同于该阻塞队列能够持有的元素个数。

没有办法保证线程能够公平地可从信号量中获得许可。也就是说,无法担保掉第一个调用 acquire() 的线程会是第一个获得一个许可的线程。如果第一个线程在等待一个许可时发生阻塞,而第二个线程前来索要一个许可的时候刚好有一个许可被释放出来,那么它就可能会在第一个线程之前获得许可。

如果你想要强制公平,Semaphore 类有一个具有一个布尔类型的参数的构造子,通过这个参数以告知 Semaphore 是否要强制公平。强制公平会影响到并发性能,所以除非你确实需要它否则不要启用它。

1
Semaphore semaphore = new Semaphore(1true);

执行器服务 ExecutorService

java.util.concurrent.ExecutorService 接口表示一个异步执行机制,使我们能够在后台执行任务。因此一个 ExecutorService 很类似于一个线程池。实际上,存在于 java.util.concurrent 包里的 ExecutorService 实现就是一个线程池实现。

一个线程将一个任务委派给一个 ExecutorService 去异步执行。一旦该线程将任务委派给 ExecutorService,该线程将继续它自己的执行,独立于该任务的执行。
ExecutorService的生产:

使用 java.util.concurrent.Executors 类中的工厂方法可以方便的获取各种ExecutorService接口的实现类:

1
2
3
ExecutorService executorService1 = Executors.newSingleThreadExecutor();
ExecutorService executorService2 = Executors.newFixedThreadPool(10);
ExecutorService executorService3 = Executors.newScheduledThreadPool(10);

ExecutorService 使用:

1)execute(Runnable) 方法要求一个 java.lang.Runnable 对象,然后对它进行异步执行。没有办法得知被执行的 Runnable 的执行结果。

1
2
3
4
5
6
7
8
9
ExecutorService executorService = Executors.newSingleThreadExecutor();

executorService.execute(new Runnable() {
public void run() {
System.out.println("Asynchronous task");
}
});

executorService.shutdown();

2)submit(Runnable)方法也要求一个 Runnable 实现类,但它返回一个 Future 对象。这个 Future 对象可以用来检查 Runnable 是否已经执行完毕。

1
2
3
4
5
6
7
Future future = executorService.submit(new Runnable() {
public void run() {
System.out.println("Asynchronous task");
}
});

future.get(); //returns null if the task has finished correctly.

3)submit(Callable) 方法类似于 submit(Runnable) 方法,除了它所要求的参数类型之外。Callable 实例除了它的 call() 方法能够返回一个结果之外和一个 Runnable 很相像。

Runnable.run() 不能够返回一个结果。Callable 的结果可以通过 submit(Callable) 方法返回的 Future 对象进行获取。

1
2
3
4
5
6
7
8
Future future = executorService.submit(new Callable(){
public Object call() throws Exception {
System.out.println("Asynchronous Callable");
return "Callable Result";
}
});

System.out.println("future.get() = " + future.get());

4)invokeAny()
invokeAny() 方法要求一系列的 Callable 或者其子接口的实例对象。调用这个方法并不会返回一个 Future,但它返回其中一个 Callable 对象的结果。无法保证返回的是哪个 Callable 的结果 - 只能表明其中一个已执行结束。

如果其中一个任务执行结束(或者抛了一个异常),其他 Callable 将被取消。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
ExecutorService executorService = Executors.newSingleThreadExecutor();

Set<Callable<String>> callables = new HashSet<Callable<String>>();

callables.add(new Callable<String>() {
public String call() throws Exception {
return "Task 1";
}
});
callables.add(new Callable<String>() {
public String call() throws Exception {
return "Task 2";
}
});
callables.add(new Callable<String>() {
public String call() throws Exception {
return "Task 3";
}
});

String result = executorService.invokeAny(callables);

System.out.println("result = " + result);

executorService.shutdown();

上述代码将会打印出给定 Callable 集合中的一个的执行结果。 有时是 “Task 1”,有时是 “Task 2” 等等。

5)invokeAll()
invokeAll() 方法将调用你在集合中传给 ExecutorService 的所有 Callable 对象。invokeAll() 返回一系列的 Future 对象,通过它们你可以获取每个 Callable 的执行结果。

1
2
3
4
5
List<Future<String>> futures = executorService.invokeAll(callables);

for(Future<String> future : futures){
System.out.println("future.get = " + future.get());
}

6)shutdown()、shutdownNow()
使用完 ExecutorService 之后你应该将其关闭,以使其中的线程不再运行。

比如,如果你的应用是通过一个 main() 方法启动的,之后 main 方法退出了你的应用,如果你的应用有一个活动的 ExexutorService 它将还会保持运行。ExecutorService 里的活动线程阻止了 JVM 的关闭。

要终止 ExecutorService 里的线程你需要调用 ExecutorService 的 shutdown() 方法。ExecutorService 并不会立即关闭,但它将不再接受新的任务,而且一旦所有线程都完成了当前任务的时候,ExecutorService 将会关闭。在 shutdown() 被调用之前所有提交给 ExecutorService 的任务都被执行。

如果你想要立即关闭 ExecutorService,你可以调用 shutdownNow() 方法。这样会立即尝试停止所有执行中的任务,并忽略掉那些已提交但尚未开始处理的任务。无法担保执行任务的正确执行。可能它们被停止了,也可能已经执行结束。

Future

Future是一个接口,这个接口的方法如下:
1)cancel方法用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false。

参数mayInterruptIfRunning表示是否允许取消正在执行却没有执行完毕的任务,如果设置true,则表示可以取消正在执行过程中的任务。

如果任务已经完成,则无论mayInterruptIfRunning为true还是false,此方法肯定返回false,即如果取消已经完成的任务会返回false;

如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false;

如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,肯定返回true。

2)isCancelled方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。

3)isDone方法表示任务是否已经完成,若任务完成,则返回true;

4)get()方法用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回;

5)get(long timeout, TimeUnit unit)用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null。

线程池执行者 ThreadPoolExecutor

java.util.concurrent.ThreadPoolExecutor 是 ExecutorService 接口的一个实现。ThreadPoolExecutor 使用其内部池中的线程执行给定任务(Callable 或者 Runnable)。

ThreadPoolExecutor 包含的线程池能够包含不同数量的线程。池中线程的数量由以下变量决定:

corePoolSize: 当一个任务委托给线程池时,如果池中线程数量低于 corePoolSize,一个新的线程将被创建,即使池中可能尚有空闲线程。

maximumPoolSize: 如果内部任务队列已满,而且有至少 corePoolSize 正在运行,但是运行线程的数量低于 maximumPoolSize,一个新的线程将被创建去执行该任务。

注:除非你确实需要显式为 ThreadPoolExecutor 自定义所有参数,使用 java.util.concurrent.Executors 类中的工厂方法之一会更加方便。

定时执行者服务 ScheduledExecutorService

ScheduledExecutorService 是一个 ExecutorService, 它能够将任务延后执行,或者间隔固定时间多次执行。 ScheduledExecutorService 也是一个接口,它的实现类为ScheduledThreadPoolExecutor。

1
2
3
4
5
6
7
8
9
10
ScheduledExecutorService scheduledExecutorService =
Executors.newScheduledThreadPool(5);

ScheduledFuture scheduledFuture =
scheduledExecutorService.schedule(new MyCallable(),5,
TimeUnit.SECONDS);

System.out.println("result = " + scheduledFuture.get());

scheduledExecutorService.shutdown();

首先一个内置 5 个线程的 ScheduledExecutorService 被创建。之后一个 Callable 接口实现类被创建然后传递给 schedule() 方法。后边的俩参数定义了 Callable 将在 5 秒钟之后被执行。

ScheduledExecutorService还提供了以下方法:

1
2
3
4
schedule (Callable task, long delay, TimeUnit timeunit)
schedule (Runnable task, long delay, TimeUnit timeunit)
scheduleAtFixedRate (Runnable, long initialDelay, long period, TimeUnit timeunit)
scheduleWithFixedDelay (Runnable, long initialDelay, long period, TimeUnit timeunit)

ForkJoinPool

ForkJoinPool 在 Java 7 中被引入。它和 ExecutorService 很相似,除了一点不同。ForkJoinPool 让我们可以很方便地把任务分裂成几个更小的任务,这些分裂出来的任务也将会提交给 ForkJoinPool。任务可以继续分割成更小的子任务,只要它还能分割。ForkJoinPool 是一个特殊的线程池,它的设计是为了更好的配合 分叉-和-合并 任务分割的工作。

1)分叉
一个使用了分叉和合并原理的任务可以将自己分叉(分割)为更小的子任务,这些子任务可以被并发执行。通过把自己分割成多个子任务,每个子任务可以由不同的 CPU 并行执行,或者被同一个 CPU 上的不同线程执行。

只有当给的任务过大,把它分割成几个子任务才有意义。把任务分割成子任务有一定开销,因此对于小型任务,这个分割的消耗可能比每个子任务并发执行的消耗还要大。

2)合并
当一个任务将自己分割成若干子任务之后,该任务将进入等待所有子任务的结束之中。一旦子任务执行结束,该任务可以把所有结果合并到同一个结果。

并非所有类型的任务都会返回一个结果。如果这个任务并不返回一个结果,它只需等待所有子任务执行完毕。也就不需要结果的合并啦。

3)创建一个 ForkJoinPool
你可以通过其构造子创建一个 ForkJoinPool。作为传递给 ForkJoinPool 构造子的一个参数,你可以定义你期望的并行级别。并行级别表示你想要传递给 ForkJoinPool 的任务所需的线程或 CPU 数量。

1
ForkJoinPool forkJoinPool = new ForkJoinPool(4);

4)提交任务到 ForkJoinPool
就像提交任务到 ExecutorService 那样,把任务提交到 ForkJoinPool。你可以提交两种类型的任务。一种是没有任何返回值的(一个 “行动”),另一种是有返回值的(一个”任务”)。这两种类型分别由 RecursiveAction 和 RecursiveTask 表示。

RecursiveAction

RecursiveAction 是一种没有任何返回值的任务。它只是做一些工作,比如写数据到磁盘,然后就退出了。
一个 RecursiveAction 可以把自己的工作分割成更小的几块,这样它们可以由独立的线程或者 CPU 执行。
通过继承来实现一个 RecursiveAction。示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class MyRecursiveAction extends RecursiveAction {

private long workLoad = 0;

public MyRecursiveAction(long workLoad) {
this.workLoad = workLoad;
}

@Override
protected void compute() {

//if work is above threshold, break tasks up into smaller tasks
if(this.workLoad > 16) {
System.out.println("Splitting workLoad : " + this.workLoad);

List<MyRecursiveAction> subtasks =
new ArrayList<MyRecursiveAction>();

subtasks.addAll(createSubtasks());

for(RecursiveAction subtask : subtasks){
subtask.fork();
}

} else {
System.out.println("Doing workLoad myself: " + this.workLoad);
}
}

private List<MyRecursiveAction> createSubtasks() {
List<MyRecursiveAction> subtasks =
new ArrayList<MyRecursiveAction>();

MyRecursiveAction subtask1 = new MyRecursiveAction(this.workLoad / 2);
MyRecursiveAction subtask2 = new MyRecursiveAction(this.workLoad / 2);

subtasks.add(subtask1);
subtasks.add(subtask2);

return subtasks;
}

}
MyRecursiveAction myRecursiveAction = new MyRecursiveAction(24);

forkJoinPool.invoke(myRecursiveAction);

RecursiveTask

RecursiveTask 是一种会返回结果的任务。它可以将自己的工作分割为若干更小任务,并将这些子任务的执行结果合并到一个集体结果。可以有几个水平的分割和合并。通过继承来实现一个 RecursiveTask

锁 Lock、读写锁 ReadWriteLock

java.util.concurrent.locks.Lock 是一个类似于 synchronized 块的线程同步机制。但是 Lock 比 synchronized 块更加灵活、精细。
Lock 接口具有以下主要方法:

1)lock()
lock() 将 Lock 实例锁定。如果该 Lock 实例已被锁定,调用 lock() 方法的线程将会阻塞,直到 Lock 实例解锁。

2)lockInterruptibly()
lockInterruptibly() 方法将会被调用线程锁定,除非该线程被打断。此外,如果一个线程在通过这个方法来锁定 Lock 对象时进入阻塞等待,而它被打断了的话,该线程将会退出这个方法调用。Interupt()为Thread类中的方法。

3)tryLock()
tryLock() 方法试图立即锁定 Lock 实例。如果锁定成功,它将返回 true,如果 Lock 实例已被锁定该方法返回 false。这一方法永不阻塞。

4)tryLock(long timeout, TimeUnit timeUnit)
tryLock(long timeout, TimeUnit timeUnit) 的工作类似于 tryLock() 方法,除了它在放弃锁定 Lock 之前等待一个给定的超时时间之外。

5)unlock()
unlock() 方法对 Lock 实例解锁。一个 Lock 实现将只允许锁定了该对象的线程来调用此方法。其他(没有锁定该 Lock 对象的线程)线程对 unlock() 方法的调用将会抛一个未检查异常(RuntimeException)。

Lock接口的实现类:ReentrantLock, ReentrantReadWriteLock

Atomic类

Atomic类用于实现非阻塞算法,例如乐观锁等。Atomic类在语言层面不做处理,将其交给硬件—CPU和内存,利用CPU的多处理能力,实现硬件层面的阻塞,再加上volatile变量的特性即可实现基于原子操作的线程安全。以下是几个Atomic类示例:

原子性布尔 AtomicBoolean

1)创建一个 AtomicBoolean,默认false, 可带参。

1
2
AtomicBoolean atomicBoolean = new AtomicBoolean();
AtomicBoolean atomicBoolean = new AtomicBoolean(true);

2)获取 AtomicBoolean 的值

1
boolean value = atomicBoolean.get();

3) 设置 AtomicBoolean 的值

1
atomicBoolean.set(false);

4) 交换 AtomicBoolean 的值

1
boolean oldValue = atomicBoolean.getAndSet(false);

以上代码执行后 oldValue 变量的值为 true,atomicBoolean 实例将持有 false 值。代码成功将 AtomicBoolean 当前值 ture 交换为 false。

5)比较并设置 AtomicBoolean 的值
compareAndSet() 方法允许你对 AtomicBoolean 的当前值与一个期望值进行比较,如果当前值等于期望值的话,将会对 AtomicBoolean 设定一个新值。compareAndSet() 方法是原子性的,因此在同一时间之内有单个线程执行它。因此 compareAndSet() 方法可被用于乐观锁的简单实现。

1
2
3
4
boolean expectedValue = true;
boolean newValue      = false;

boolean wasNewValueSet = atomicBoolean.compareAndSet(expectedValue, newValue);

示例对 AtomicBoolean 的当前值与 true 值进行比较,如果相等,将 AtomicBoolean 的值更新为 false。

原子性整型 AtomicInteger

AtomicInteger 类包含有一些方法,通过它们你可以增加或减少 AtomicInteger 的值,并获取其值。这些方法如下:
1)addAndGet()
addAndGet() 方法给 AtomicInteger 增加了一个值,然后返回增加后的值。

2)getAndAdd()
getAndAdd() 方法为 AtomicInteger 增加了一个值,但返回的是增加以前的 AtomicInteger 的值。

3)getAndIncrement()、incrementAndGet()
getAndIncrement() 和 incrementAndGet() 方法类似于 getAndAdd() 和 addAndGet(),但每次只将 AtomicInteger 的值加 1。

4)decrementAndGet()、getAndDecrement()
每次将 AtomicInteger 的值减 1。

原子性引用型 AtomicReference

AtomicReference 提供了一个可以被原子性读和写的对象引用变量。原子性的意思是多个想要改变同一个 AtomicReference 的线程不会导致 AtomicReference 处于不一致的状态。AtomicReference 还有一个 compareAndSet() 方法,通过它你可以将当前引用于一个期望值(引用)进行比较,如果相等,在该 AtomicReference 对象内部设置一个新的引用。

1)创建泛型 AtomicReference
使用 Java 泛型来创建一个泛型 AtomicReference:

1
AtomicReference<String> atomicString =new AtomicReference<String>();

2)获取 AtomicReference 引用
你可以通过 AtomicReference 的 get() 方法来获取保存在 AtomicReference 里的引用。如果你的 AtomicReference 是非泛型的,get() 方法将返回一个 Object 类型的引用。

1
2
3
AtomicReference<String> atomicReference =
new AtomicReference<String>("first value referenced");
String reference = atomicReference.get();

参考文档:
Jenkov.com/java-util-concurrent

更多Java基础系列文章,参见Java基础大纲

CATALOG
  1. 1. BlockingQueue
    1. 1.1. ArrayBlockingQueue
    2. 1.2. DelayQueue
    3. 1.3. LinkedBlockingQueue
    4. 1.4. PriorityBlockingQueue
    5. 1.5. SynchronousQueue
  2. 2. BlockingDeque
  3. 3. ConcurrentMap
  4. 4. ConcurrentNavigableMap
    1. 4.1. headMap()
    2. 4.2. tailMap()
    3. 4.3. subMap()
  5. 5. 闭锁 CountDownLatch
  6. 6. 栅栏 CyclicBarrier
  7. 7. 交换机 Exchanger
  8. 8. 信号量 Semaphore
  9. 9. 执行器服务 ExecutorService
    1. 9.1. ExecutorService 使用:
    2. 9.2. Future
    3. 9.3. 线程池执行者 ThreadPoolExecutor
    4. 9.4. 定时执行者服务 ScheduledExecutorService
  10. 10. ForkJoinPool
    1. 10.1. RecursiveAction
    2. 10.2. RecursiveTask
  11. 11. 锁 Lock、读写锁 ReadWriteLock
  12. 12. Atomic类
    1. 12.1. 原子性布尔 AtomicBoolean
    2. 12.2. 原子性整型 AtomicInteger
    3. 12.3. 原子性引用型 AtomicReference