0%

java 并发

东西比较多,做个笔记。方便日后查阅学习。

一、进程和线程

进程

进程是系统进行资源分配和调度的基本单位,各个进程之间不会相互影响,因为系统给它们分配了不同的空间和资源,它分为单进程和多进程。可以将进程理解为一个正在执行的程序,比如一款游戏。

单进程与多进程的概述

单进程的计算机一次只能做一件事情,而多进程的计算机可以做到一次做不同的事情,比如一边听音乐,一边听打游戏,这两件事情虽然感觉起来是在同时一起进行的,但其实是CPU在做着程序间的高效切换,这才让我们觉得是同时进行的。

线程

线程是程序执行的最小单位,一个进程可由一个或多个线程组成,在一款运行的游戏中通常会有界面。线程就是程序执行的任务,它是程序使用CPU的基本单位,因此也可以说线程是依赖于进程的。

更新线程、游戏逻辑线程等,线程切换的开销远小于进程切换的开销。

单线程与多线程的概述

单线程也就是做的事情专一,不会分神去做别的事,也就是程序只有一条执行路径;多线程就是可以分出多条路去做同一件事情,也就是程序有多条执行路径,比如三个伙伴迷路了,大家分别去问路人路线,最后大家在目的地集合,因此多线程的存在,不是提高程序的执行速度,其实是为了提高应用程序的使用率,也可以说程序的执行其实都是在抢CPU的资源,也就是抢CPU的执行权,而其中的某一个进程如果执行路径比较多,就会有更高的几率抢到CPU的执行权,但这一过程是随机的,不知道哪一个线程会在哪一个时刻占到这个资源,所以线程的执行有随机性

img

蓝色框表示进程,黄色框表示线程。进程拥有代码、数据等资源,这些资源是共享的,3个线程都可

以访问,同时每个线程又拥有私有的栈空间。

二、线程的状态

线程的五种状态:

  1)新建状态(New):线程对象实例化后就进入了新建状态。

  2)就绪状态(Runnable):线程对象实例化后,其他线程调用了该对象的start()方法,虚拟机便会启动该线程,处于就绪状态的线程随时可能被调度执行。

ps:处于线程就绪队列(尽管是采用队列形式,事实上,把它称为可运行池而不是可运行队列。因为cpu的调度不一定是按照先进先出的顺序来调度的),等待系统为其分配CPU。等待状态并不是执行状态,当系统选定一个等待执行的Thread对象后,它就会从等待执行状态进入执行状态,系统挑选的动作称之为“cpu调度”。一旦获得CPU,线程就进入运行状态并自动调用自己的run方法。

  3)运行状态(Running):线程获得了时间片,开始执行。只能从就绪状态进入运行状态。

它可以变成阻塞状态、就绪状态和死亡状态。

处于就绪状态的线程,如果获得了cpu的调度,就会从就绪状态变为运行状态,执行run()方法中的任务。如果该线程失去了cpu资源,就会又从运行状态变为就绪状态。重新等待系统分配资源。也可以对在运行状态的线程调用yield()方法,它就会让出cpu资源,再次变为就绪状态。

注: 当发生如下情况是,线程会从运行状态变为阻塞状态:

​ ①、线程调用sleep方法主动放弃所占用的系统资源

​ ②、线程调用一个阻塞式IO方法,在该方法返回之前,该线程被阻塞

​ ③、线程试图获得一个同步监视器,但更改同步监视器正被其他线程所持有

​ ④、线程在等待某个通知(notify)

​ ⑤、程序调用了线程的suspend方法将线程挂起。不过该方法容易导致死锁,所以程序应该尽量避免使用该方法。

当线程的run()方法执行完,或者被强制性地终止,例如出现异常,或者调用了stop()、desyory()方法等等,就会从运行状态转变为死亡状态。

  4)阻塞状态(Blocked):线程因为某个原因暂停执行,并让出CPU的使用权后便进入了阻塞状态。

ps: 在阻塞状态的线程不能进入就绪队列。只有当引起阻塞的原因消除时,如睡眠时间已到,或等待的I/O设备空闲下来,线程便转入就绪状态,重新到就绪队列中排队等待,被系统选中后从原来停止的位置开始继续运行。有三种方法可以暂停Threads执行:

    等待阻塞:调用运行线程的wait()方法,虚拟机会把该线程放入等待池。

    同步阻塞:运行线程获取对象的同步锁时,该锁已被其他线程获得,虚拟机会把该线程放入锁定池。

    其他线程:调用运行线程的sleep()方法或join()方法,或线程发出I/O请求时,进入阻塞状态。

  5)结束状态(Dead):线程正常执行完或异常退出时,进入了结束状态。

ps: 这个线程对象也许是活的,但是,它已经不是一个单独执行的线程。线程一旦死亡,就不能复生。 如果在一个死去的线程上调用start()方法,会抛出java.lang.IllegalThreadStateException异常。

三、使用线程

实现Runnable接口

通过实现Runnable接口创建线程类的具体步骤和具体代码如下:

• 定义Runnable接口的实现类,并重写该接口的run()方法;

• 创建Runnable实现类的实例,并以此实例作为Thread的target对象,即该Thread对象才是真正的线程对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class ThreadTest {
public static void main(String[] args){
Runnable runnable = new MyThread();     
/*Runnable接口中只有一个run()方法,它非Thread类子类的类提供的一种激活方式。一个类实现Runnable接口后,并不代表该类是一个“线程”类,不能直接运行,必须通过Thread实例才能创建并运行线程。*/
/**
注:直接调用Thread类或Runnable类对象的run()方法是无法启动线程的,这只是一个简单的方法调用必须通过Thread方法中的start()才行。*/
Thread thread = new Thread(runnable); //将Runnable对象传递给Thread构造器
thread.start();
}
}//实现了Runnable接口
class MyThread implements Runnable{
@Override
public void run() {
int count = 7;
while(count>0){
System.out.println(count);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
count--;
}
}
}

实现Callable 接口

与 Runnable 相比,Callable 可以有返回值,返回值通过 FutureTask 进行封装。

通过Callable和Future创建线程的具体步骤和具体代码如下:

• 创建Callable接口的实现类,并实现call()方法,该call()方法将作为线程执行体,并且有返回值。
• 创建Callable实现类的实例,使用FutureTask类来包装Callable对象,该FutureTask对象封装了该Callable对象的call()方法的返回值。
• 使用FutureTask对象作为Thread对象的target创建并启动新线程。
• 调用FutureTask对象的get()方法来获得子线程执行结束后的返回值其中,Callable接口(也只有一个方法)定义如下:

Callable接口(也只有一个方法)定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public interface Callable   { 
V call() throws Exception;
}
步骤1:创建实现Callable接口的类MyCallable(略);
步骤2:创建一个类对象:
Callable oneCallable = new MyCallable();
步骤3:由Callable创建一个FutureTask对象:
FutureTask oneTask = new FutureTask(oneCallable);
注释: FutureTask是一个包装器,它通过接受Callable来创建,它同时实现了 Future和Runnable接口。
步骤4:由FutureTask创建一个Thread对象:
Thread oneThread = new Thread(oneTask);
步骤5:启动线程:
oneThread.start();

1
2
3
4
5
6
7
public class SomeCallable<V> extends OtherClass implements Callable<V> {
@Override
public V call() throws Exception {
// TODO Auto-generated method stub
return null;
}
}
1
2
3
4
5
6
7
8
Callable<V> oneCallable = new SomeCallable<V>();   
//由Callable<Integer>创建一个FutureTask<Integer>对象:
FutureTask<V> oneTask = new FutureTask<V>(oneCallable);
//注释:FutureTask<Integer>是一个包装器,它通过接受Callable<Integer>来创建,它同时实现了Future和Runnable接口。
//由FutureTask<Integer>创建一个Thread对象:
Thread oneThread = new Thread(oneTask);
oneThread.start();
//至此,一个线程就创建完成了。

继承Thread类

Thread类本质上是实现了Runnable接口的一个实例,代表一个线程的实例。启动线程的唯一方法就是通过Thread类的start()实例方法。start()方法是一个native方法,它将启动一个新线程,并执行run()方法。这种方式实现多线程很简单,通过自己的类直接extend Thread,并复写run()方法,就可以启动新线程并执行自己定义的run()方法。

任何线程只能启动一次,然后多次调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class ThreadTest {
public static void main(String[] args){
Thread thread = new MyThread(); //创建线程
//任何线程只能启动一次,然后多次调用
thread.start();  //启动线程
}
}//继承Thread类
class MyThread extends Thread{
@Override
public void run() {
int count = 7;
while(count>0){
System.out.println(count);
try {
Thread.sleep(1000);  
} catch (InterruptedException e) {
e.printStackTrace();
}
count--;
}
}
}

Thread类实现了Runnable接口,在Thread类中,有一些比较关键的属性,比如name是表示Thread的名字,可以通过Thread类的构造器中的参数来指定线程名字,priority表示线程的优先级(最大值为10,最小值为1,默认值为5),daemon表示线程是否是守护线程,target表示要执行的任务。

以下是关系到线程运行状态的几个方法:

1)start方法

  start()用来启动一个线程,当调用start方法后,系统才会开启一个新的线程来执行用户定义的子任务,在这个过程中,会为相应的线程分配需要的资源。

2)run方法

  run()方法是不需要用户来调用的,当通过start方法启动一个线程之后,当线程获得了CPU执行时间,便进入run方法体去执行具体的任务。所以,继承Thread类必须重写run方法,在run方法中定义具体要执行的任务。

3)sleep方法

  sleep方法有两个重载版本:

1 sleep(long millis) //参数为毫秒
2 sleep(long millis, int nanoseconds) //第一参数为毫秒,第二个参数为纳秒

  sleep相当于让线程睡眠,交出CPU,让CPU去执行其他的任务。

  如果需要让当前正在执行的线程暂停一段时间,并进入阻塞状态,则可以通过调用Thread类的静态sleep()方法来实现。

  当当前线程调用sleep()方法进入阻塞状态后,在其睡眠时间内,该线程不会获得执行机会,即使系统中没有其他可执行线程,处于sleep()中的线程也不会执行,因此sleep()方法常用来暂停程序的执行

但是有一点要非常注意,sleep方法不会释放锁,也就是说如果当前线程持有对某个对象的锁,则即使调用sleep方法,其他线程也无法访问这个对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class Test {

private int i = 10;
private Object object = new Object();

public static void main(String[] args) throws IOException {
Test test = new Test();
MyThread thread1 = test.new MyThread();
MyThread thread2 = test.new MyThread();
thread1.start();
thread2.start();
}


class MyThread extends Thread{
@Override
public void run() {
synchronized (object) {
i++;
System.out.println("i:"+i);
try {
System.out.println("线程"+Thread.currentThread().getName()+"进入睡眠状态");
Thread.currentThread().sleep(1000);
} catch (InterruptedException e) {
// TODO: handle exception
}
System.out.println("线程"+Thread.currentThread().getName()+"睡眠结束");
i++;
System.out.println("i:"+i);
}
}
}
}

输出结果:

img

注:

(1)sleep是静态方法,最好不要用Thread的实例对象调用它,因为它睡眠的始终是当前正在运行的线程,而不是调用它的线程对象,它只对正在运行状态的线程对象有效。如下面的例子:

1
2
3
4
5
6
7
8
9
10
11
12
public class Test1 {  
public static void main(String[] args) throws InterruptedException {
System.out.println(Thread.currentThread().getName());
MyThread myThread=new MyThread();
myThread.start();
myThread.sleep(1000);//这里sleep的就是main线程,而非myThread线程
Thread.sleep(10);
for(int i=0;i<100;i++){
System.out.println("main"+i);
}
}
}

(2)Java线程调度是Java多线程的核心,只有良好的调度,才能充分发挥系统的性能,提高程序的执行效率。但是不管程序员怎么编写调度,只能最大限度的影响线程执行的次序,而不能做到精准控制。因为使用sleep方法之后,线程是进入阻塞状态的,只有当睡眠的时间结束,才会重新进入到就绪状态,而就绪状态进入到运行状态,是由系统控制的,我们不可能精准的去干涉它,所以如果调用Thread.sleep(1000)使得线程睡眠1秒,可能结果会大于1秒。

4)yield方法

  yield()方法和sleep()方法有点相似,它也是Thread类提供的一个静态方法,它也可以让当前正在执行的线程暂停,但它不会阻塞该线程,它只是将该线程转入到就绪状态。即让当前线程暂停一下,让系统的线程调度器重新调度一次,完全可能的情况是:当某个线程调用了yield()方法暂停之后,线程调度器又将其调度出来重新执行。

  调用yield方法会让当前线程交出CPU权限,让CPU去执行其他的线程。它跟sleep方法类似,同样不会释放锁但是yield不能控制具体的交出CPU的时间,另外,当某个线程调用了yield()方法之后,只有优先级与当前线程相同或者比当前线程更高的处于就绪状态的线程才会获得执行机会。

  注意,调用yield方法并不会让线程进入阻塞状态,而是让线程重回就绪状态,它只需要等待重新获取CPU执行时间,这一点是和sleep方法不一样的。

yield方法直接回到了就绪状态,是没有缓冲的阻塞的。

5)join方法

  join方法有三个重载版本:

1 join()
2 join(long millis) //参数为毫秒
3 join(long millis,int nanoseconds) //第一参数为毫秒,第二个参数为纳秒

  假如在main线程中,调用thread.join方法,则main方法会等待thread线程执行完毕或者等待一定的时间。如果调用的是无参join方法,则等待thread执行完毕,如果调用的是指定了时间参数的join方法,则等待一定的事件。

实际上调用join方法是调用了Object的wait方法,这个可以通过查看源码得知:

img

 wait方法会让线程进入阻塞状态,并且会释放线程占有的锁,并交出CPU执行权限。

由于wait方法会让线程释放对象锁,所以join方法同样会让线程释放对一个对象持有的锁。

6)wait方法

wait() 方法需要和 notify() 及 notifyAll() 两个方法一起介绍,这三个方法用于协调多个线程对共享数据的存取,所以必须在 synchronized 语句块内使用,也就是说,调用 wait(),notify() 和 notifyAll() 的任务在调用这些方法前必须拥有对象的锁。

注意,它们都是 Object 类的方法,而不是 Thread 类的方法。

wait() 方法与 sleep() 方法的不同之处在于,wait() 方法会释放对象的“锁标志”。当调用某一对象的 wait() 方法后,会使当前线程暂停执行,并将当前线程放入对象等待池中,直到调用了 notify() 方法后,将从对象等待池中移出任意一个线程并放入锁标志等待池中,只有锁标志等待池中的线程可以获取锁标志,它们随时准备争夺锁的拥有权。当调用了某个对象的 notifyAll() 方法,会将对象等待池中的所有线程都移动到该对象的锁标志等待池。

除了使用 notify() 和 notifyAll() 方法,还可以使用带毫秒参数的 wait(long timeout) 方法,效果是在延迟 timeout 毫秒后,被暂停的线程将被恢复到锁标志等待池。

此外,wait(),notify() 及 notifyAll() 只能在 synchronized 语句中使用,但是如果使用的是 ReenTrantLock 实现同步,该如何达到这三个方法的效果呢?解决方法是使用 ReenTrantLock.newCondition() 获取一个 Condition 类对象,然后 Condition 的 await(),signal() 以及 signalAll() 分别对应上面的三个方法。

sleep、join、yeild方法之间的区别

sleep方法是一个静态方法,让当前正在执行的线程休眠(暂停执行),而且在睡眠的过程是不释放资源的,保持着锁。该方法既可以让其他同优先级或者高优先级的线程得到执行的机会,也可以让低优先级的线程得到执行机会。但是 sleep() 方法不会释放“锁标志”,也就是说如果有 synchronized 同步块,其他线程仍然不能访问共享数据。

作用:
1、暂停当前线程一段时间;
2、让出CPU,特别是不想让高优先级的线程让出CPU给低优先级的线程

yeild方法同样也是一个静态方法,暂停当前正在执行的线程,线程由运行中状态进入就绪状态,重新与其他线程一起参与线程的调度。yield() 方法和 sleep() 方法类似,也不会释放“锁标志”,区别在于,它没有参数,即 yield() 方法只是使当前线程重新回到可执行状态,所以执行 yield() 的线程有可能在进入到可执行状态后马上又被执行,另外 yield() 方法只能使同优先级或者高优先级的线程得到执行机会,这也和 sleep() 方法不同

对于sleep或者wait方法,他们都将进入特定的状态,伴随着状态的切换,也就意味着等待某些条件的发生,才能够继续,比如条件满足,或者到时间等但是yield方法不涉及这些事情,他针对的是时间片的划分与调度,所以对开发者来说只是临时让一下,让一下他又不会死,就只是再等等。yield方法将会暂停当前正在执行的线程对象,并执行其他线程,他始终都是RUNNABLE状态

作用:
线程让步,顾名思义,就是说当一个线程使用了这个方法之后,它就会把自己CPU执行的时间让掉,让自己或者其它的线程运行。但是,这种让步只对同优先级或者更高优先级的线程而言,同时,让步具有不确定性,当前线程也会参与调度,即有可能又被重新调度,那么就没有达到让出CPU的效果了。

ps:

①、sleep方法暂停当前线程后,会进入阻塞状态,只有当睡眠时间到了,才会转入就绪状态。而yield方法调用后 ,是直接进入就绪状态,所以有可能刚进入就绪状态,又被调度到运行状态。

②、sleep方法声明抛出了InterruptedException,所以调用sleep方法的时候要捕获该异常,或者显示声明抛出该异常。而yield方法则没有声明抛出任务异常。

③、sleep方法比yield方法有更好的可移植性,通常不要依靠yield方法来控制并发线程的执行。

JDK中提供三个版本的join方法:

1
2
3
void join()           //当前线程等该加入该线程后面,等待该线程终止。    
void join(long millis) //当前线程等待该线程终止的时间最长为 millis 毫秒。 如果在millis时间内,该线程没有执行完,那么当前线程进入就绪状态,重新等待cpu调度
void join(long millis,int nanos) // 等待该线程终止的时间最长为 millis 毫秒 + nanos 纳秒。如果在millis时间内,该线程没有执行完,那么当前线程进入就绪状态,重新等待cpu调度

作用:
join方法的作用是父线程等待子线程执行完成后再执行,换句话说就是将异步执行的线程合并为同步的线程。

join相当于让其他的线程(特定的)进行插队处理,自己再继续处理。例如:主线程中调用启动线程(调用start),然后调用该线程的join方法,可以达到主线程等待工作线程运行结束才执行的效果,并且join要在start调用后。

sleep就是被监视了,在特定的条件下(中断或者是自己醒来)才能恢复到就绪状态。

yield就是一种礼让,自己直接就是就绪状态。https://www.cnblogs.com/noteless/p/10443446.html

7)interrupt方法

  interrupt,顾名思义,即中断的意思。单独调用interrupt方法可以使得处于阻塞状态的线程抛出一个异常,也就说,它可以用来中断一个正处于阻塞状态的线程;另外,通过interrupt方法和isInterrupted()方法来停止正在运行的线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class Test {

public static void main(String[] args) throws IOException {
Test test = new Test();
MyThread thread = test.new MyThread();
thread.start();
try {
Thread.currentThread().sleep(2000);
} catch (InterruptedException e) {

}
thread.interrupt();
}

class MyThread extends Thread{
@Override
public void run() {
try {
System.out.println("进入睡眠状态");
Thread.currentThread().sleep(10000);
System.out.println("睡眠完毕");
} catch (InterruptedException e) {
System.out.println("得到中断异常");
}
System.out.println("run方法执行完毕");
}
}
}

输出结果:

img

如上可知,interrupt方法可以中断处于阻塞状态的线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class Test {

public static void main(String[] args) throws IOException {
Test test = new Test();
MyThread thread = test.new MyThread();
thread.start();
try {
Thread.currentThread().sleep(2000);
} catch (InterruptedException e) {

}
thread.interrupt();
}

class MyThread extends Thread{
@Override
public void run() {
int i = 0;
while(i<Integer.MAX_VALUE){
System.out.println(i+" while循环");
i++;
}
}
}
}

运行该程序会发现,while循环会一直运行直到变量i的值超出Integer.MAX_VALUE。所以说直接调用interrupt方法不能中断正在运行中的线程。

但是如果配合isInterrupted()能够中断正在运行的线程,因为调用interrupt方法相当于将中断标志位置为true,那么可以通过调用isInterrupted()判断中断标志是否被置位来中断线程的执行。比如下面这段代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class Test {
public static void main(String[] args) throws IOException {
Test test = new Test();
MyThread thread = test.new MyThread();
thread.start();
try {
Thread.currentThread().sleep(2000);
} catch (InterruptedException e) {

}
thread.interrupt();
}

class MyThread extends Thread{
@Override
public void run() {
int i = 0;
while(!isInterrupted() && i<Integer.MAX_VALUE){
System.out.println(i+" while循环");
i++;
}
}
}
}

运行会发现,打印若干个值之后,while循环就停止打印了。

  但是一般情况下不建议通过这种方式来中断线程,一般会在MyThread类中增加一个属性 isStop来标志是否结束while循环,然后再在while循环中判断isStop的值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class MyThread extends Thread{
private volatile boolean isStop = false;
@Override
public void run() {
int i = 0;
while(!isStop){
i++;
}
}

public void setStop(boolean stop){
this.isStop = stop;
}
}

那么就可以在外面通过调用setStop方法来终止while循环。

8)interrupted方法

interrupted()函数是Thread静态方法,用来检测当前线程的interrupt状态,检测完成后,状态清空。通过下面的interrupted源码我们能够知道,此方法首先调用isInterrupted方法,而isInterrupted方法是一个重载的native方法private native boolean isInterrupted(boolean ClearInterrupted) 通过方法的注释能够知道,用来测试线程是否已经中断,参数用来决定是否重置中断标志。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static boolean interrupted() {
return currentThread().isInterrupted(true);
}
public boolean isInterrupted() {
return isInterrupted(false);
}
/**

Tests if some Thread has been interrupted. The interrupted state

is reset or not based on the value of ClearInterrupted that is

passed.
*/
private native boolean isInterrupted(boolean ClearInterrupted);

9)stop方法

stop方法已经是一个废弃的方法,它是一个不安全的方法。因为调用stop方法会直接终止run方法的调用,并且会抛出一个ThreadDeath错误,如果线程持有某个对象锁的话,会完全释放锁,导致对象状态不一致。所以stop方法基本是不会被用到的。

关系到线程属性的几个方法

  1)getId

  用来得到线程ID

  2)getName和setName

  用来得到或者设置线程名称。

  3)getPriority和setPriority

  用来获取和设置线程优先级。

  4)setDaemon和isDaemon

  用来设置线程是否成为守护线程和判断线程是否是守护线程。

  守护线程和用户线程的区别在于:守护线程依赖于创建它的线程,而用户线程则不依赖。举个简单的例子:如果在main线程中创建了一个守护线程,当main方法运行完毕之后,守护线程也会随着消亡。而用户线程则不会,用户线程会一直运行直到其运行完毕。在JVM中,像垃圾收集器线程就是守护线程。

  Thread类有一个比较常用的静态方法currentThread()用来获取当前线程。

Thread类中的方法同线程状态的关系

注意几点:

  • 线程创建之后,不会立即进入就绪状态,因为线程的运行需要一些条件(比如内存资源,譬如程序计数器、Java栈、本地方法栈都是线程私有的,所以需要为线程分配一定的内存空间),只有线程运行需要的所有条件满足了,才进入就绪状态。

  • 当线程进入就绪状态后,不代表立刻就能获取CPU执行时间,也许此时CPU正在执行其他的事情,因此它要等待。当得到CPU执行时间之后,线程便真正进入运行状态

  • 线程在运行状态过程中,可能有多个原因导致当前线程不继续运行下去,比如用户主动让线程睡眠(睡眠一定的时间之后再重新执行)、用户主动让线程等待,或者被同步块给阻塞,此时就对应着多个状态:time waiting(睡眠或等待一定的事件)、waiting(等待被唤醒)、blocked(阻塞)。

实现接口 VS 继承 Thread

实现接口会更好一些,因为:

  • Java 不支持多重继承,因此继承了 Thread 类就无法继承其它类,但是可以实现多个接口;
  • 类可能只要求可执行就行,继承整个 Thread 类开销过大。

使用ExecutorService、Callable、Future实现有返回结果的线程

ExecutorService、Callable、Future三个接口实际上都是属于Executor框架。返回结果的线程是在JDK1.5中引入的新特征,有了这种特征就不需要再为了得到返回值而大费周折了。而且自己实现了也可能漏洞百出。

可返回值的任务必须实现Callable接口。类似的,无返回值的任务必须实现Runnable接口。

执行Callable任务后,可以获取一个Future的对象,在该对象上调用get就可以获取到Callable任务返回的Object了。

注意:get方法是阻塞的,即:线程无返回结果,get方法会一直等待。

再结合线程池接口ExecutorService就可以实现传说中有返回结果的多线程了。

下面提供了一个完整的有返回结果的多线程测试例子,在JDK1.5下验证过没问题可以直接使用。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import java.util.concurrent.*;  
import java.util.Date;
import java.util.List;
import java.util.ArrayList;

/**
* 有返回值的线程
*/
@SuppressWarnings("unchecked")
public class Test {
public static void main(String[] args) throws ExecutionException,
InterruptedException {
System.out.println("----程序开始运行----");
Date date1 = new Date();

int taskSize = 5;
// 创建一个线程池
ExecutorService pool = Executors.newFixedThreadPool(taskSize);
// 创建多个有返回值的任务
List<Future> list = new ArrayList<Future>();
for (int i = 0; i < taskSize; i++) {
Callable c = new MyCallable(i + " ");
// 执行任务并获取Future对象
Future f = pool.submit(c);
// System.out.println(">>>" + f.get().toString());
list.add(f);
}
// 关闭线程池
pool.shutdown();

// 获取所有并发任务的运行结果
for (Future f : list) {
// 从Future对象上获取任务的返回值,并输出到控制台
System.out.println(">>>" + f.get().toString());
}

Date date2 = new Date();
System.out.println("----程序结束运行----,程序运行时间【"
+ (date2.getTime() - date1.getTime()) + "毫秒】");
}
}

class MyCallable implements Callable<Object> {
private String taskNum;

MyCallable(String taskNum) {
this.taskNum = taskNum;
}

public Object call() throws Exception
{
System.out.println(">>>" + taskNum + "任务启动");
Date dateTmp1 = new Date();
Thread.sleep(1000);
Date dateTmp2 = new Date();
long time = dateTmp2.getTime() - dateTmp1.getTime();
System.out.println(">>>" + taskNum + "任务终止");
return taskNum + "任务返回运行结果,当前任务时间【" + time + "毫秒】";
}
}

代码说明:
上述代码中Executors类,提供了一系列工厂方法用于创建线程池,返回的线程池都实现了ExecutorService接口。

1
2
3
4
5
6
7
8
9
10
public static ExecutorService newFixedThreadPool(int nThreads) 
创建固定数目线程的线程池。
public static ExecutorService newCachedThreadPool()
创建一个可缓存的线程池,调用execute 将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。
public static ExecutorService newSingleThreadExecutor()
创建一个单线程化的Executor。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代Timer类。

ExecutoreService提供了submit()方法,传递一个Callable,或Runnable,返回Future。如果Executor后台线程池还没有完成Callable的计算,这调用返回Future对象的get()方法,会阻塞直到计算完成。

设置线程的优先级

每个线程执行时都有一个优先级的属性,优先级高的线程可以获得较多的执行机会,而优先级低的线程则获得较少的执行机会。与线程休眠类似,线程的优先级仍然无法保障线程的执行次序。只不过,优先级高的线程获取CPU资源的概率较大,优先级低的也并非没机会执行。

每个线程默认的优先级都与创建它的父线程具有相同的优先级,在默认情况下,main线程具有普通优先级。

注:Thread类提供了setPriority(int newPriority)和getPriority()方法来设置和返回一个指定线程的优先级,其中setPriority方法的参数是一个整数,范围是1~·0之间,也可以使用Thread类提供的三个静态常量:

1
2
3
MAX_PRIORITY   =10
MIN_PRIORITY =1
NORM_PRIORITY =5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class Test1 {  
public static void main(String[] args) throws InterruptedException {
new MyThread("高级", 10).start();
new MyThread("低级", 1).start();
}
}

class MyThread extends Thread {
public MyThread(String name,int pro) {
super(name);//设置线程的名称
setPriority(pro);//设置线程的优先级
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
System.out.println(this.getName() + "线程第" + i + "次执行!");
}
}
}

注:虽然Java提供了10个优先级别,但这些优先级别需要操作系统的支持。不同的操作系统的优先级并不相同,而且也不能很好的和Java的10个优先级别对应。所以我们应该使用MAX_PRIORITY、MIN_PRIORITY和NORM_PRIORITY三个静态常量来设定优先级,这样才能保证程序最好的可移植性。

正确结束线程

Thread.stop()、Thread.suspend、Thread.resume、Runtime.runFinalizersOnExit这些终止线程运行的方法已经被废弃了,使用它们是极端不安全的!想要安全有效的结束一个线程,可以使用下面的方法:

  • 正常执行完run方法,然后结束掉;
  • 控制循环条件和判断条件的标识符来结束掉线程。
1
2
3
4
5
6
7
8
9
10
11
12
13
class MyThread extends Thread {  
int i=0;
boolean next=true;
@Override
public void run() {
while (next) {
if(i==10)
next=false;
i++;
System.out.println(i);
}
}
}

四、基础线程机制

执行器(executor)

执行器 ( Executor ) 类有许多静态工厂方法用来构建线程池 。

类图如下:

Executor 英文意思是执行器,顾名思义,就是执行任务,所以该接口只有一个执行任务的方法:

1
void execute(Runnable command);

Executor 管理多个异步任务的执行,而无需程序员显式地管理线程的生命周期。这里的异步是指多个任务的执行互不干扰,不需要进行同步操作。

主要有三种 Executor:

  • CachedThreadPool:一个任务创建一个线程;
  • FixedThreadPool:所有任务只能使用固定大小的线程;
  • SingleThreadExecutor:相当于大小为 1 的 FixedThreadPool。

ExecutorService

ExecutorService 继承自 Executor,正如其名字一样,它定义了一个服务,定义了一个完成的线程池的行为。可以提交任务,执行任务,关闭服务。

ExecutorService提供了两个方法来达到这个目的——shutdwon()会等待正在执行的任务执行完而shutdownNow()会终止所有正在执行的任务并立即关闭execuotr

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class Executors1 {

public static void main(String[] args) {
test1(3);
}

private static void test1(long seconds) {
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
try {
TimeUnit.SECONDS.sleep(seconds);
String name = Thread.currentThread().getName();
System.out.println("task finished: " + name);
}
catch (InterruptedException e) {
System.err.println("task interrupted");
}
});
stop(executor);
}

static void stop(ExecutorService executor) {
//Executors类提供了便利的工厂方法来创建不同类型的 executor services。
//在这个示例中我们使用了一个单线程线程池的 executor。
try {
System.out.println("attempt to shutdown executor");
executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
System.err.println("termination interrupted");
}
finally {
if (!executor.isTerminated()) {
System.err.println("killing non-finished tasks");
}
executor.shutdownNow();
System.out.println("shutdown finished");
}
}
}

使用ExecutorService、Callable、Future实现有返回结果的线程

Callbale也可以像runnbales一样提交给 executor services。但是callables的结果怎么办?
因为submit()不会等待任务完成,executor service不能直接返回callable的结果。不过,executor 可以返回一个Future类型的结果,它可以用来在稍后某个时间取出实际的结果。

1
2
3
4
5
6
7
8
9
ExecutorService executor = Executors.newFixedThreadPool(1);
Future<Integer> future = executor.submit(task);//返回一个futrure类型的结果

System.out.println("future done? " + future.isDone());

Integer result = future.get();//然后再把值的拿出来

System.out.println("future done? " + future.isDone());
System.out.print("result: " + result);

在将callable提交给exector之后,我们先通过调用isDone()来检查这个future是否已经完成执行。

在调用get()方法时,当前线程会阻塞等待,直到callable在返回实际的结果之前执行完成。

Future与底层的executor service紧密的结合在一起。记住,如果你关闭executor,所有的未中止的future都会抛出异常。

1
2
executor.shutdownNow();
future.get();

我们这次创建executor的方式与上一个例子稍有不同。我们使用newFixedThreadPool(1)来创建一个单线程线程池的 executor service。 这等同于使用newSingleThreadExecutor

超时

任何future.get()调用都会阻塞,然后等待直到callable中止。在最糟糕的情况下,一个callable持续运行——因此使你的程序将没有响应。我们可以简单的传入一个时长来避免这种情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
ExecutorService executor = Executors.newFixedThreadPool(1);

Future<Integer> future = executor.submit(() -> {
try {
TimeUnit.SECONDS.sleep(2);//加入时长
return 123;
}
catch (InterruptedException e) {
throw new IllegalStateException("task interrupted", e);
}
});

future.get(1, TimeUnit.SECONDS);

运行上面的代码将会产生一个TimeoutException

1
2
Exception in thread "main" java.util.concurrent.TimeoutException
at java.util.concurrent.FutureTask.get(FutureTask.java:205)

invokeAll

Executors支持通过invokeAll()一次批量提交多个callable。这个方法结果一个callable的集合,然后返回一个future的列表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
ExecutorService executor = Executors.newWorkStealingPool();

List<Callable<String>> callables = Arrays.asList(
() -> "task1",
() -> "task2",
() -> "task3");

executor.invokeAll(callables)
.stream()
.map(future -> {
try {
return future.get();
}
catch (Exception e) {
throw new IllegalStateException(e);
}
})
.forEach(System.out::println);

invokeAny

批量提交callable的另一种方式就是invokeAny(),它的工作方式与invokeAll()稍有不同。在等待future对象的过程中,这个方法将会阻塞直到第一个callable中止然后返回这一个callable的结果。

为了测试这种行为,我们利用这个帮助方法来模拟不同执行时间的callable。这个方法返回一个callable,这个callable休眠指定 的时间直到返回给定的结果。

1
2
3
4
5
6
Callable<String> callable(String result, long sleepSeconds) {
return () -> {
TimeUnit.SECONDS.sleep(sleepSeconds);
return result;
};
}

我们利用这个方法创建一组callable,这些callable拥有不同的执行时间,从1分钟到3分钟。通过invokeAny()将这些callable提交给一个executor,返回最快的callable的字符串结果-在这个例子中为任务2:

1
2
3
4
5
6
7
8
9
10
11
ExecutorService executor = Executors.newWorkStealingPool();

List<Callable<String>> callables = Arrays.asList(
callable("task1", 2),
callable("task2", 1),
callable("task3", 3));

String result = executor.invokeAny(callables);
System.out.println(result);

// => task2

上面这个例子又使用了另一种方式来创建executor——调用newWorkStealingPool()。这个工厂方法是Java8引入的,返回一个ForkJoinPool类型的 executor,它的工作方法与其他常见的execuotr稍有不同。与使用一个固定大小的线程池不同,ForkJoinPools使用一个并行因子数来创建,默认值为主机CPU的可用核心数。

ScheduledExecutor

我们已经学习了如何在一个 executor 中提交和运行一次任务。为了持续的多次执行常见的任务,我们可以利用调度线程池。

ScheduledExecutorService支持任务调度,持续执行或者延迟一段时间后执行。

下面的实例,调度一个任务在延迟3分钟后执行:

1
2
3
4
5
6
7
8
9
ScheduledExecutorService executor =  Executors.newScheduledThreadPool(1);

Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());
ScheduledFuture<?> future = executor.schedule(task, 3, TimeUnit.SECONDS);

TimeUnit.MILLISECONDS.sleep(1337);

long remainingDelay = future.getDelay(TimeUnit.MILLISECONDS);
System.out.printf("Remaining Delay: %sms", remainingDelay);

调度一个任务将会产生一个专门的future类型——ScheduleFuture,它除了提供了Future的所有方法之外,他还提供了getDelay()方法来获得剩余的延迟。在延迟消逝后,任务将会并发执行。

为了调度任务持续的执行,executors 提供了两个方法scheduleAtFixedRate()scheduleWithFixedDelay()。第一个方法用来以固定频率来执行一个任务,比如,下面这个示例中,每分钟一次:

1
2
3
4
5
6
7
ScheduledExecutorService executor =   Executors.newScheduledThreadPool(1);

Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());

int initialDelay = 0;
int period = 1;
executor.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.SECONDS);

另外,这个方法还接收一个初始化延迟,用来指定这个任务首次被执行等待的时长。

请记住:scheduleAtFixedRate()并不考虑任务的实际用时。所以,如果你指定了一个period为1分钟而任务需要执行2分钟,那么线程池为了性能会更快的执行。

在这种情况下,你应该考虑使用scheduleWithFixedDelay()。这个方法的工作方式与上我们上面描述的类似。不同之处在于等待时间 period 的应用是在一次任务的结束和下一个任务的开始之间。例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
ScheduledExecutorService executor =         Executors.newScheduledThreadPool(1);

Runnable task = () -> {
try {
TimeUnit.SECONDS.sleep(2);
System.out.println("Scheduling: " + System.nanoTime());
}
catch (InterruptedException e) {
System.err.println("task interrupted");
}
};

executor.scheduleWithFixedDelay(task, 0, 1, TimeUnit.SECONDS);

这个例子调度了一个任务,并在一次执行的结束和下一次执行的开始之间设置了一个1分钟的固定延迟。初始化延迟为0,任务执行时间为0。所以我们分别在0s,3s,6s,9s等间隔处结束一次执行。

如你所见,scheduleWithFixedDelay()在你不能预测调度任务的执行时长时是很有用的。

线程执行器和不使用线程执行器的对比(优缺点)

1.线程执行器分离了任务的创建和执行,通过使用执行器,只需要实现Runnable接口的对象,然后把这些对象发送给执行器即可。

2.使用线程池来提高程序的性能。当发送一个任务给执行器时,执行器会尝试使用线程池中的线程来执行这个任务。避免了不断创建和销毁线程导致的性能开销。

3.执行器可以处理实现了Callable接口的任务。Callable接口类似于Runnable接口,却提供了两方面的增强:

a.Callable主方法名称为call(),可以返回结果

b.当发送一个Callable对象给执行器时,将获得一个实现了Future接口的对象。可以使用这个对象来控制Callable对象的状态和结果。

4.提供了一些操作线程任务的功能

后台(守护)线程

​ 守护线程使用的情况较少,但并非无用,举例来说,JVM的垃圾回收、内存管理等线程都是守护线程。还有就是在做数据库应用时候,使用的数据库连接池,连接池本身也包含着很多后台线程,监控连接个数、超时时间、状态等等。调用线程对象的方法setDaemon(true),则可以将其设置为守护线程。守护线程的用途为:

  • 守护线程通常用于执行一些后台作业,例如在你的应用程序运行时播放背景音乐,在文字编辑器里做自动语法检查、自动保存等功能。
  • Java的垃圾回收也是一个守护线程。守护线的好处就是你不需要关心它的结束问题。例如你在你的应用程序运行的时候希望播放背景音乐,如果将这个播放背景音乐的线程设定为非守护线程,那么在用户请求退出的时候,不仅要退出主线程,还要通知播放背景音乐的线程退出;如果设定为守护线程则不需要了。

setDaemon方法的详细说明:

1
2
3
4
5
6
7
public final void setDaemon(boolean on)        将该线程标记为守护线程或用户线程。当正在运行的线程都是守护线程时,Java 虚拟机退出。    
该方法必须在启动线程前调用。 该方法首先调用该线程的 checkAccess 方法,且不带任何参数。这可能抛出 SecurityException(在当前线程中)。
参数:
on - 如果为 true,则将该线程标记为守护线程。
抛出:
IllegalThreadStateException - 如果该线程处于活动状态。
SecurityException - 如果当前线程无法修改该线程。

注:JRE判断程序是否执行结束的标准是所有的前台执线程行完毕了,而不管后台线程的状态,因此,在使用后台县城时候一定要注意这个问题。

线程池

构建一个新的线程是有一定代价的,因为涉及与操作系统的交互 。如果程序中创建了大量的生命期很短的线程,应该使用线程池 ( thread pool )

一个线程池中包含许多准备运行的空闲线程 。将 Runnable 对象交给线程池 ,就会有一个线程调用 run 方法。当 run方法退出时,线程不会死亡 , 而是在池中准备为下一个请求提供服务 。另一个使用线程池的理由是减少并发线程的数目。

创建大量线程会大大降低性能甚至使虚拟机崩溃。如果有一个会创建许多线程的算法,应该使用一个线程数 “ 固定的 ”线程池以限制并发线程的总数。

ThreadPoolExecutor类

java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类。(该类的结构层次可以参见上诉的执行器)

1、在ThreadPoolExecutor类中提供了四个构造方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class ThreadPoolExecutor extends AbstractExecutorService {
.....
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue);

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
...
}

从上面的代码可以得知,ThreadPoolExecutor继承了AbstractExecutorService类,并提供了四个构造器,事实上,通过观察每个构造器的源码具体实现,发现前面三个构造器都是调用的第四个构造器进行的初始化工作。

  下面解释下一下构造器中各个参数的含义:

  • corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
  • maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;
  • keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
  • unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:
1
2
3
4
5
6
7
TimeUnit.DAYS;               //天
TimeUnit.HOURS; //小时
TimeUnit.MINUTES; //分钟
TimeUnit.SECONDS; //秒
TimeUnit.MILLISECONDS; //毫秒
TimeUnit.MICROSECONDS; //微妙
TimeUnit.NANOSECONDS; //纳秒
  • workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:
1
2
3
ArrayBlockingQueue;
LinkedBlockingQueue;
SynchronousQueue;

ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。

  • threadFactory:线程工厂,主要用来创建线程;
  • handler:表示当拒绝处理任务时的策略,有以下四种取值:
1
2
3
4
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

线程池的执行策略

ThreadPoolExecutor 执行任务的逻辑示意图如下:

2、ThreadPoolExecutor类的结构层析

从上面给出的ThreadPoolExecutor类的代码可以知道,ThreadPoolExecutor继承了AbstractExecutorService,我们来看一下AbstractExecutorService的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public abstract class AbstractExecutorService implements ExecutorService {


protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { };
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { };
public Future<?> submit(Runnable task) {};
public <T> Future<T> submit(Runnable task, T result) { };
public <T> Future<T> submit(Callable<T> task) { };
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
};
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
};
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
};
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
};
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
};
}

AbstractExecutorService是一个抽象类,它实现了ExecutorService接口。

  我们接着看ExecutorService接口的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public interface ExecutorService extends Executor {

void shutdown();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;

<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

而ExecutorService又是继承了Executor接口,我们看一下Executor接口的实现:

1
2
3
public interface Executor {
void execute(Runnable command);
}

Executor是一个顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型,从字面意思可以理解,就是用来执行传进去的任务的;

  然后ExecutorService接口继承了Executor接口,并声明了一些方法:submit、invokeAll、invokeAny以及shutDown等;

  抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的所有方法;

  然后ThreadPoolExecutor继承了类AbstractExecutorService。

3、在ThreadPoolExecutor类中有几个非常重要的方法

1
2
3
4
execute()
submit()
shutdown()
shutdownNow()

execute()方法实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。

submit()方法是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果。

  shutdown()和shutdownNow()是用来关闭线程池的。

  还有很多其他的方法:

  比如:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等获取与线程池相关属性的方法,可以自行查阅API。

线程池的实现原理

https://www.cnblogs.com/exe19/p/5359885.html

四、中断

一个线程执行完毕之后会自动结束,如果在运行过程中发生异常也会提前结束。

InterruptedException

通过调用一个线程的 interrupt() 来中断该线程,如果该线程处于阻塞、限期等待或者无限期等待状态,那么就会抛出 InterruptedException,从而提前结束该线程。但是不能中断 I/O 阻塞和 synchronized 锁阻塞。

对于以下代码,在 main() 中启动一个线程之后再中断它,由于线程中调用了 Thread.sleep() 方法,因此会抛出一个 InterruptedException,从而提前结束线程,不执行之后的语句。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class InterruptExample {

private static class MyThread1 extends Thread {
@Override
public void run() {
try {
Thread.sleep(2000);
System.out.println("Thread run");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
1
2
3
4
5
6
public static void main(String[] args) throws InterruptedException {
Thread thread1 = new MyThread1();
thread1.start();
thread1.interrupt();
System.out.println("Main run");
}
1
2
3
4
5
6
Main run
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at InterruptExample.lambda$main$0(InterruptExample.java:5)
at InterruptExample$$Lambda$1/713338599.run(Unknown Source)
at java.lang.Thread.run(Thread.java:745)

interrupted()

如果一个线程的 run() 方法执行一个无限循环,并且没有执行 sleep() 等会抛出 InterruptedException 的操作,那么调用线程的 interrupt() 方法就无法使线程提前结束。

但是调用 interrupt() 方法会设置线程的中断标记,此时调用 interrupted() 方法会返回 true。因此可以在循环体中使用 interrupted() 方法来判断线程是否处于中断状态,从而提前结束线程。

1
2
3
4
5
6
7
8
9
10
11
12
public class InterruptExample {

private static class MyThread2 extends Thread {
@Override
public void run() {
while (!interrupted()) {
// ..
}
System.out.println("Thread end");
}
}
}
1
2
3
4
5
public static void main(String[] args) throws InterruptedException {
Thread thread2 = new MyThread2();
thread2.start();
thread2.interrupt();
}
1
Thread end

Executor 的中断操作

调用 Executor 的 shutdown() 方法会等待线程都执行完毕之后再关闭,但是如果调用的是 shutdownNow() 方法,则相当于调用每个线程的 interrupt() 方法。

以下使用 Lambda 创建线程,相当于创建了一个匿名内部线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> {
try {
Thread.sleep(2000);
System.out.println("Thread run");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
executorService.shutdownNow();
System.out.println("Main run");
}
1
2
3
4
5
6
7
8
Main run
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at ExecutorInterruptExample.lambda$main$0(ExecutorInterruptExample.java:9)
at ExecutorInterruptExample$$Lambda$1/1160460865.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

如果只想中断 Executor 中的一个线程,可以通过使用 submit() 方法来提交一个线程,它会返回一个 Future<?> 对象,通过调用该对象的 cancel(true) 方法就可以中断线程。

1
2
3
4
Future<?> future = executorService.submit(() -> {
// ..
});
future.cancel(true);

五、线程同步

java允许多线程并发控制,当多个线程同时操作一个可共享的资源变量时(如数据的增删改查),将会导致数据不准确,相互之间产生冲突,因此加入同步锁以避免在该线程没有完成操作之前,被其他线程的调用,从而保证了该变量的唯一性和准确性。

1、使用重入锁(Lock)实现线程同步

​ 在JavaSE5.0中新增了一个java.util.concurrent包来支持同步。ReentrantLock类是可重入、互斥、实现了Lock接口的锁,它与使用synchronized方法和快具有相同的基本行为和语义,并且扩展了其能力。ReenreantLock类的常用方法有:

1
2
3
ReentrantLock() : 创建一个ReentrantLock实例         
lock() : 获得锁
unlock() : 释放锁

注:ReentrantLock()还有一个可以创建公平锁的构造方法,但由于能大幅度降低程序运行效率,不推荐使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//只给出要修改的代码,其余代码与上同
class Bank {

private int account = 100;
//需要声明这个锁
private Lock lock = new ReentrantLock();
public int getAccount() {
return account;
}
//这里不再需要synchronized
public void save(int money) {
lock.lock();//
try{
account += money;
}finally{
lock.unlock();//解锁线程
}

}

lock.lock();确保只有一个线程进入临界区,一旦一个线程进入之后,会获得锁对象,其他线程无法通过lock语句。当其他线程调用lock时,它们会被阻塞,知道第一个线程释放锁对象。

lock.unlock();解锁操作,一定要放到finally里,因为如果try语句里出了问题,锁必须被释放,否则其他线程将永远被阻塞

因为系统会随机为线程分配资源,所以在线程获得锁对象之后,可能被系统剥夺运行权,这时候其他线程来访问,但是发现有锁,进不去,只能等拿到锁对象的线程把里面的代码执行完毕后,释放锁,第二个线程才能运行。

2、synchronzied关键字

前面我们讲了ReentrantLock锁对象的使用,但是在系统里面我们不一定要使用ReentrantLock锁,Java中还提供了一个内部的隐式锁,关键字是synchronized.

举个例子:

1
2
3
public synchronized void Method() {
//do some work...
}

synchronized关键字说明

  总的说来,synchronized关键字可以作为函数的修饰符,也可作为函数内的语句,也就是平时说的同步方法和同步语句块。如果再细的分类,synchronized可作用于instance变量(成员变量)、object reference(对象实例引用)、static函数和class literals(类名称字面常量)身上。

在进一步阐述之前,我们需要明确几点:

  • 无论synchronized关键字加在方法上还是对象上,它取得的锁都是对象,而不是把一段代码或函数当作锁――而且同步方法很可能还会被其他线程的对象访问。

  • 每个对象只有一个锁(lock)与之相关联。JVM会给类的每个实例化的对象赋予一个单独的锁。

  • 实现同步是要很大的系统开销作为代价的,甚至可能造成死锁,所以尽量避免无谓的同步控制。

  注意:在同步块和同步方法中,是给类或类的对象进行加锁,而不是给方法加锁。所谓的需要获得对象的锁才能执行方法,也是针对线程而言的。

synchronized 方法:

通过在方法声明中加入 synchronized关键字来声明 synchronized 方法。如下:

1
2
3
4
5
6
class MyClass{
private String name;
public synchronized void setName(String name){
this.name = name;
}
}

  synchronized方法控制对类成员变量的访问:每个类实例对应一把锁,每个 synchronized 方法都必须获得调用该方法的类实例的锁方能执行,否则所属线程阻塞,方法一旦执行,就独占该锁,直到从该方法返回时才将锁释放,此后被阻塞的线程方能获得该锁,重新进入可执行状态。这种机制确保了同一时刻对于每一个类实例,其所有声明为 synchronized 的成员函数中至多只有一个处于可执行状态(因为至多只有一个能够获得该类实例对应的锁),从而有效避免了类成员变量的访问冲突(只要所有可能访问类成员变量的方法均被声明为 synchronized)。

  在 Java 中,不光是类实例,每一个类也对应一把锁,这样我们也可将类的静态成员函数声明为 synchronized ,以控制其对类的静态成员变量的访问。

  synchronized 方法的缺陷:若将一个大的方法声明为synchronized 将会大大影响效率,典型地,若将线程类的方法 run()声明为 synchronized ,由于在线程的整个生命期内它一直在运行,因此将导致它对本类任何 synchronized 方法的调用都永远不会成功。

synchronized 块:

  synchronized 块是这样一个代码块,其中的代码必须获得对象 syncObject (如前所述,可以是类实例或类)的锁方能执行,具体机制同前所述。由于可以针对任意代码块,且可任意指定上锁的对象,故灵活性较高。

  通过 synchronized关键字来声明synchronized 块。语法如下:

1
2
3
synchronized(syncObject){  
//允许访问控制的代码  
} 

3. 同步一个类

1
2
3
4
5
public void func() {
synchronized (SynchronizedExample.class) {
// ...
}
}

作用于整个类,也就是说两个线程调用同一个类的不同对象上的这种同步语句,也会进行同步。

1
2
3
4
5
6
7
8
9
10
public class SynchronizedExample {

public void func2() {
synchronized (SynchronizedExample.class) {
for (int i = 0; i < 10; i++) {
System.out.print(i + " ");
}
}
}
}
1
2
3
4
5
6
7
public static void main(String[] args) {
SynchronizedExample e1 = new SynchronizedExample();
SynchronizedExample e2 = new SynchronizedExample();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> e1.func2());
executorService.execute(() -> e2.func2());
}
1
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9

synchronized (this)的理解说明

  • 当一个线程正在执行object的一个synchronized(this)同步代码块时,该线程就获得了这个object的对象锁。
  • 当两个并发线程访问同一个对象object中的这个synchronized(this)同步代码块时,一个时间内只能有一个线程得到执行。另一个线程必须等待当前线程执行完这个代码块以后才能执行该代码块。  
  • 当一个线程访问object的一个synchronized(this)同步代码块时,其他线程对object中所有其它synchronized(this)同步代码块的访问将被阻塞。  
  • 但是,当一个线程访问object的一个synchronized(this)同步代码块时,另一个线程仍然可以访问该object中的除synchronized(this)同步代码块以外的部分。

总结

同步块、同步方法的锁定说明:

  • 对于同步的方法或者代码块来说,必须获得对象锁才能够进入同步方法或者代码块进行操作;
  • 如果采用普通方法级别的同步,则对象锁即为该方法所在的对象,如果是静态方法,对象锁即指该方法所在的类的锁(类的锁,对所有实例化对象都是唯一的)。
  • 对于代码块,对象锁即指synchronized(obj)中的obj;
  • 静态方法则一定会同步,非静态方法需在单例模式才生效,推荐用静态方法

实现同步的一些技巧

  搞清楚synchronized锁定的是哪个对象,就能帮助我们设计更安全的多线程程序。 还有一些技巧可以让我们对共享资源的同步访问更加安全:

  • 定义private的instance变量(成员变量)+对应的get()方法,而不要定义public/protected的instance变量。如果将变量定义为public,对象在外界可以绕过同步方法的控制而直接取得它,并改动它。这也是JavaBean的标准实现方式之一。
  • 如果instance变量是一个对象(如数组或ArrayList),那上述方法仍然不安全,因为当外界对象通过get()方法拿到这个instance对象的引用后,又将其指向另一个对象,那么这个private变量也就变了,岂不是很危险。这个时候就需要将get()方法也加上synchronized同步,并且,只返回这个private对象的clone(),这样,调用端得到的就是对象副本的引用了。
  • 还有,比较常用的就有:Collections.synchronizedMap(new HashMap()),当然这个MAP就是生命在类中的全局变量,就是一个线程安全的HashMap,web的application是全web容器公用的,所以要使用线程安全来保证数据的正确。

ava中多线程锁释放的条件:

  • 执行完同步代码块,就会释放锁。(synchronized)
  • 在执行同步代码块的过程中,遇到异常而导致线程终止,锁也会被释放。(exception)
  • 在执行同步代码块的过程中,执行了锁所属对象的wait()方法,这个线程会释放锁,进入对象的等待池。(wait)

比较

1. 锁的实现

synchronized 是 JVM 实现的,而 ReentrantLock 是 JDK 实现的。

2. 性能

新版本 Java 对 synchronized 进行了很多优化,例如自旋锁等,synchronized 与 ReentrantLock 大致相同。

3. 等待可中断

当持有锁的线程长期不释放锁的时候,正在等待的线程可以选择放弃等待,改为处理其他事情。

ReentrantLock 可中断,而 synchronized 不行。

4. 公平锁

公平锁是指多个线程在等待同一个锁时,必须按照申请锁的时间顺序来依次获得锁。

synchronized 中的锁是非公平的,ReentrantLock 默认情况下也是非公平的,但是也可以是公平的。

5. 锁绑定多个条件

一个 ReentrantLock 可以同时绑定多个 Condition 对象。

3、Conditional条件对象

通常,线程拿到锁对象之后,却发现需要满足某一条件才能继续向下执行。

拿银行程序来举例子,我们需要转账方账户有足够的资金才能转出到目标账户,这时候需要用到ReentrantLock对象,因为如果我们已经完成转账方账户有足够的资金的判断之后,线程被其他线程中断,等其他线程执行完之后,转账方的钱又没有了足够的资金,这时候因为系统已经完成了判断,所以会继续向下执行,然后银行系统就会出现问题。

举例:

1
2
3
4
public void Transfer(int from, int to, double amount) {
if (Accounts[from] > amount)//系统在结束判断之后被剥夺运行权,然后账户通过网银转出所有钱,银行凉凉
DoTransfer(from, to, amount);
}

这时候我们就需要使用ReentrantLock对象了,我们修改一下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void Transfer(int from, int to, double amount) {
ReentrantLock locker = new ReentrantLock();
locker.lock();
try {
while (Accounts[from] < amount) {
//等待有足够的钱
}
DoTransfer(from, to, amount);
} catch (Exception ex) {
ex.printStackTrace();
} finally {
locker.unlock();
}
}

但是这样又有了问题,当前线程获取了锁对象之后,开始执行代码,发现钱不够,进入等待状态,然后其他线程又因为锁的原因无法给该账户转账,就会一直进入等待状态。

这个问题如何解决呢?

条件对象登场!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void Transfer(int from, int to, double amount) {
ReentrantLock locker = new ReentrantLock();
Condition sufficientFunds = locker.newCondition();//条件对象,
lock.lock();
try {
while (Accounts[from] < amount) {
sufficientFunds.await();
//等待有足够的钱
}
DoTransfer(from, to, amount);
sufficientFunds.signalAll();
} catch (Exception ex) {
ex.printStackTrace();
} finally {
locker.unlock();
}
}

条件对象的关键字是:Condition一个锁对象可以有一个或多个相关的条件对象。可以通过锁对象.newCondition方法获得一个条件对象.

在进入锁之前,我们创建一个条件,然后如果金额不足,在这里调用条件对象的await方法,通知系统当前线程进入挂起状态,让其他线程执行。这样你这次调用会被锁定,然后系统可以再次调用该方法给其他账户转账,当每一次转账完成后,执行转账操作的线程在底部调用signalAll通知所有线程可以继续运行了,因为我们有可能是转足够的钱给当前账户,这时候有可能该线程会继续执行(不一定是你,是通知所有线程,如果通知的线程还是不符合条件,会继续调用await方法,并完成转账操作,然后通知其他挂起的线程。

你说为啥不直接通知当前线程?不行,可以调用signal方法只通知一个线程,但是如果这个线程操作的账户还是没钱(不是转账给这个账户的情况),那这个线程又进入等待了,这时候已经没有线程能通知其他线程了,程序死锁,所以还是用signal比较保险。

以上是使用ReentrantLock+Condition对象,那你说我要是使用synchronized隐式锁怎么办?

也可以,而且不需要

1
2
3
4
5
6
7
8
public void Transfer(int from, int to, double amount) {
while (Accounts[from] < amount) {
wait();//这个wait方法是定义在Object类里面的,可以直接用,和条件对象的await一样,挂起线程
//等待有足够的钱
}
DoTransfer(from, to, amount);
notifyAll();//通知其他挂起的线程
}

Object类里面定义了wait、notifyAll、notify方法,对应await、signalAll和signal方法,用来操作隐式锁,synchronized只能有一个条件,而ReentrantLock显式声明的锁可以用绑定多个Condition条件.

4、同步代码块

即有synchronized关键字修饰的语句块。

被该关键字修饰的语句块会自动被加上内置锁,从而实现同步。

1
2
3
4
5
public void method3(SomeObject obj){
synchronized(obj) { //锁定的是对象obj的对象锁
//…..
}
}

这时,锁就是obj这个对象,谁拿到这个锁谁就可以运行它所控制的那段代码。当有一个明确的对象作为锁时,就可以按以上方式来写程序;当没有明确的对象作为锁时,但还想让一段代码同步时,可以创建一个特殊的instance变量(必须是一个对象)来充当锁,此时代码如下:

1
2
3
4
5
6
7
8
class Foo implements Runnable{
private byte[] lock = new byte[0]; // 特殊的instance变量;也可以用String常量作为锁
Public void methodA(){
synchronized(lock) { //… }
}
//…..
}
//注:零长度的byte数组对象创建起来将比任何对象都经济――查看编译后的字节码:生成零长度的byte[]对象只需3条操作码,而Object lock = new Object()则需要7行操作码。
1
2
3
4
5
6
7
8
Object locker = new Object();
synchronized (locker) {
*//do some work*
}
*//也可以直接锁当前类的对象*
sychronized(this){
*//do some work*
}

以上代码会获得Object类型locker对象的锁,这种锁是一个特殊的锁,在上面的代码中,创建这个Object类对象只是单纯用来使用其持有的锁.

这种机制叫做同步块,应用场景也很广:有的时候,我们并不是整个一个方法都需要同步,只是方法里的部分代码块需要同步,这种情况下,我们如果将这个方法声明为synchronized,尤其是方法很大的时候,会造成很大的资源浪费。所以在这种情况下我们可以使用synchronized关键字来声明同步块:

1
2
3
4
5
6
public void Method() {
//do some work without synchronized
synchronized (this) {
//do some synchronized operation
}
}

例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class Bank {  

private int count =0;//账户余额

//存钱
public void addMoney(int money){

synchronized (this) {
count +=money;
}
System.out.println(System.currentTimeMillis()+"存进:"+money);
}

//取钱
public void subMoney(int money){

synchronized (this) {
if(count-money < 0){
System.out.println("余额不足");
return;
}
count -=money;
}
System.out.println(+System.currentTimeMillis()+"取出:"+money);
}

//查询
public void lookMoney(){
System.out.println("账户余额:"+count);
}
}

注:同步是一种高开销的操作,因此应该尽量减少同步的内容。通常没有必要同步整个方法,使用synchronized代码块同步关键代码即可。

5、监视器的概念

锁和条件是同步中一个很重要的工具,但是它们并不是面向对象的。多年来,Java的研究人员努力寻找一种方法,可以在不需要考虑如何加锁的情况下,就能保证多线程的安全性。最成功的的一个解决方案叫做monitor监视器,这个对象内置于每一个Object变量中,相当于一个许可证。拿到许可证就可以进行操作,没有拿到则需要阻塞等待。

监视器具有以下特性:

1.监视器是只包含私有域的类

2.每个监视器对象都有一个相关的锁

3.使用监视器对象的锁对所有的方法进行加锁(举个例子:如果调用obj.Method方法,obj对象的锁会在方法调用的时候自动获得,当方法结束或返回之后会自动释放该锁因为所有的域都是私有的,这样可以确保一个线程在操作类对象的时候,没有其他线程可以访问里面的域

4.该锁对象可以有任意多个相关条件

其实我们使用的synchronized关键字就是使用了monitor来实现加锁解锁,所以又被称为内部锁因为Object类实现了监视器,所以对象又被内置于任何一个对象之中。这就是我们为什么可以使用synchronized(locker)的方式锁定一个代码块了,其实只是用到了locker对象中内置的monitor而已。每一个对象的monitor类又是唯一的,所以就是唯一的许可证,拿到许可证的线程才可以执行,执行完后释放对象的monitor才可以被其他线程获取。

举个例子:

1
2
3
synchronized (this) {
//do some synchronized operation
}

它在字节码文件中会被编译为:

1
2
3
monitorenter;//get monitor,enter the synchronized block
//do some synchronized operation
monitorexit;//leavel the synchronized block,release the monitor

6、死锁

产生死锁的必要条件:

互斥条件:进程要求对所分配的资源进行排它性控制,即在一段时间内某资源仅为一进程所占用。
请求和保持条件:当进程因请求资源而阻塞时,对已获得的资源保持不放。
不剥夺条件:进程已获得的资源在未使用完之前,不能剥夺,只能在使用完时由自己释放。
环路等待条件:在发生死锁时,必然存在一个进程–资源的环形链。

为什么倾向于使用signalAll和notifyAll方式,如果假设使用signal和notify,随机选择的线程发现自己还是不能运行,那么它再次被阻塞。这样就又会造成死锁现象。

7、锁测试和超时

线程在调用lock方法获得另一个线程持有的锁的时候,很可能发生阻塞。应该更加谨慎的申请锁,tryLock方法试图申请一个锁,如果申请成功,返回true,否则,立刻返回false,线程就会离开去做别的事,而不是被阻塞等待锁对象。

语法:

1
2
3
4
5
6
7
8
9
10
11
12
ReentrantLock locker = new ReentrantLock();
if (locker.tryLock()) {
try {
//do some work
} catch (Exception ex) {
ex.printStackTrace();
} finally {
locker.unlock();
}
} else {
//do other work
}

也可以给其指定超时参数,单位有SECONDSMILLISECONDSMICROSEONDSMANOSECONDS.

1
2
3
4
5
6
7
8
9
10
11
12
ReentrantLock locker = new ReentrantLock();
if (locker.tryLock(1000, TimeUnit.MILLISECONDS)) {
try {
//do some work
} catch (Exception ex) {
ex.printStackTrace();
} finally {
locker.unlock();
}
} else {
//do other work
}

lock方法不能被中断,如果一个线程在调用了lock方法后等待锁的时候被中断,中断线程在获得锁之前一直处于阻塞状态。

如果带有超时参数的tryLock方法,那么如果等待期间线程被中断,会抛出InterruptedException异常,这是一个很好的特性,允许程序打破死锁。

8、读写锁

eentrantLock类属于java.util.concurrent.locks包,这个包底下还有一个ReentrantReaderWriterLock类,如果使用多线程对数据读的操作很多,但是写的操作很少的话,可以使用这个类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock():

public void Read() {
Lock readLocker = rwl.readLock();//创建读取锁对象
readLocker.lock();//使用读取锁对象加锁
try {
//do some work
} catch (Exception ex) {
ex.printStackTrace();
} finally {
readLocker.unlock();
}
}

public void Write() {
Lock writeLocker = rwl.writeLock();//创建写入锁对象
writeLocker.lock();//使用写入锁对象加锁
try {
//do some work
} catch (Exception ex) {
ex.printStackTrace();
} finally {
writeLocker.unlock();
}
}

9、使用特殊域变量(volatile)实现线程同步

• volatile关键字为域变量的访问提供了一种免锁机制;

• 使用volatile修饰域相当于告诉虚拟机该域可能会被其他线程更新;

• 因此每次使用该域就要重新计算,而不是使用寄存器中的值;

• volatile不会提供任何原子操作,它也不能用来修饰final类型的变量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
public class SynchronizedThread {

class Bank {

private volatile int account = 100;

public int getAccount() {
return account;
}

/**
* 用同步方法实现
*
* @param money
*/
public synchronized void save(int money) {
account += money;
}

/**
* 用同步代码块实现
*
* @param money
*/
public void save1(int money) {
synchronized (this) {
account += money;
}
}
}

class NewThread implements Runnable {
private Bank bank;

public NewThread(Bank bank) {
this.bank = bank;
}

@Override
public void run() {
for (int i = 0; i < 10; i++) {
// bank.save1(10);
bank.save(10);
System.out.println(i + "账户余额为:" +bank.getAccount());
}
}

}

/**
* 建立线程,调用内部类
*/
public void useThread() {
Bank bank = new Bank();
NewThread new_thread = new NewThread(bank);
System.out.println("线程1");
Thread thread1 = new Thread(new_thread);
thread1.start();
System.out.println("线程2");
Thread thread2 = new Thread(new_thread);
thread2.start();
}

public static void main(String[] args) {
SynchronizedThread st = new SynchronizedThread();
st.useThread();
}

注:多线程中的非同步问题主要出现在对域的读写上,如果让域自身避免这个问题,则就不需要修改操作该域的方法。用final域,有锁保护的域和volatile域可以避免非同步的问题。

10、final变量

上一节已经了解到,除非使用锁或volatile修饰符,否则无法从多个线程安全地读取一个域。

还有一种情况可以安全地访问一个共享域,即这个域声明为final时。考虑以下声明:

finalMap<String,Double〉accounts=newHashKap<>0;

其他线程会在构造函数完成构造之后才看到这个accounts变量。

如果不使用final,就不能保证其他线程看到的是accounts更新后的值,它们可能都只是看到null,而不是新构造的HashMap。

当然,对这个映射表的操作并不是线程安全的。如果多个线程在读写这个映射表,仍然需要进行同步

11、线程的局部变量

线程间有时要避免共享变量,使用ThreadLocal辅助类为各个线程提供各自的实例。

例如,SimpleDateFormat类不是线程安全的。

1
2
3
public static final SimpleDateFormat dateFormat = 
new SimpleDateFormat("yyyy-MM-dd");

如果两个线程都执行以下操作:

1
String dateStamp = dateFormat.format(new Date());

结果可能很混乱,因为dateFormat使用的内部数据结构可能会被并发的访问所破坏。当然可以使用同步,但开销很大;或者也可以在需要时构造一个局部SimpleDateFormat对象,不过这也太浪费了。

要为每个线程构造一个实例,可以使用以下代码:

1
2
public static final ThreadLocal<SimpleDateFormat> dateFormat = 
ThreadLocal.withInitial(()->new SimpleDateFormat("yyyy-MM-dd"));

要访问具体的格式化方法,可以调用:

1
String dateStamp = dateFormat.get().format(new Date());

在一个给定线程中首次调用get时,会调用initialValue方法。在此之后,get方法会返回属于当前线程的那个实例。

在多个线程中生成随机数也存在类似的问题。java.util.Random类是线程安全的。但是如果多个线程需要等待一个共享的随机数生成器,这会很低效。

可以使用ThreadLocal辅助类为各个线程提供一个单独的生成器,不过Java SE 7还另外提供了一个便利类。只需要做以下调用:

1
int random = ThreadLocalRandom.current().nextInt(upperBound);

ThreadLocalRandom.current()调用会返回特定于当前线程的Random类实例。

六、线程通信

1、借助于Object类的wait()、notify()和notifyAll()实现通信

​ 线程执行wait()后,就放弃了运行资格,处于冻结状态;

​ 线程运行时,内存中会建立一个线程池,冻结状态的线程都存在于线程池中,notify()执行时唤醒的也是线程池中的线程,线程池中有多个线程时唤醒第一个被冻结的线程。
​ notifyall(), 唤醒线程池中所有线程。
注: (1) wait(), notify(),notifyall()都用在同步里面,因为这3个函数是对持有锁的线程进行操作,而只有同步才有锁,所以要使用在同步中;
​ (2) wait(),notify(),notifyall(), 在使用时必须标识它们所操作的线程持有的锁,因为等待和唤醒必须是同一锁下的线程;而锁可以是任意对象,所以这3个方法都是Object类中的方法。

单个消费者生产者例子如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
class Resource{  //生产者和消费者都要操作的资源  
private String name;
private int count=1;
private boolean flag=false;
public synchronized void set(String name){
if(flag)
try{wait();}catch(Exception e){}
this.name=name+"---"+count++;
System.out.println(Thread.currentThread().getName()+"...生产者..."+this.name);
flag=true;
this.notify();
}
public synchronized void out(){
if(!flag)
try{wait();}catch(Exception e){}
System.out.println(Thread.currentThread().getName()+"...消费者..."+this.name);
flag=false;
this.notify();
}
}
class Producer implements Runnable{
private Resource res;
Producer(Resource res){
this.res=res;
}
public void run(){
while(true){
res.set("商品");
}
}
}
class Consumer implements Runnable{
private Resource res;
Consumer(Resource res){
this.res=res;
}
public void run(){
while(true){
res.out();
}
}
}
public class ProducerConsumerDemo{
public static void main(String[] args){
Resource r=new Resource();
Producer pro=new Producer(r);
Consumer con=new Consumer(r);
Thread t1=new Thread(pro);
Thread t2=new Thread(con);
t1.start();
t2.start();
}
}//运行结果正常,生产者生产一个商品,紧接着消费者消费一个商品。

七、同步器J.U.C-AQS

多线程并发的执行,之间通过某种 共享 状态来同步,只有当状态满足 xxxx 条件,才能触发线程执行 xxxx 。这个共同的语义可以称之为同步器。

可以认为以上所有的锁机制都可以基于同步器定制来实现的。

而juc(java.util.concurrent)里的思想是 将这些场景抽象出来的语义通过统一的同步框架来支持。

juc 里所有的这些锁机制都是基于 AQS ( AbstractQueuedSynchronizer )框架上构建的。下面简单介绍下 AQS( AbstractQueuedSynchronizer )。 可以参考Doug Lea的论文The java.util.concurrent Synchronizer Framework(http://gee.cs.oswego.edu/dl/papers/aqs.pdf)

Java中多线程开发时,离不开线程的分工协作,常用的多线程的同步器有如下几种:

1、CountDownLatch(倒计时门闩)

应用场景:等待一组线程任务完成后在继续执行当前线程。

用法:定义一个CountDownLatch变量latch,在当前线程中调用latch.await()方法,在要等待的一组线程中执行完后调用latch.countDown()方法,这样当该线程都调用过latch.countDown()方法后就开始执行当前线程latch.await()后的方法。


倒计时门闩会导致一条或多条线程在“门口”一直等待,直到另一条线程打开这扇门,线程才得以继续运行。他是由一个计数变量和两个操作组成的,这两个操作分别是“导致一条线程等待直到。

计数变为0”以及“递减计数变量”。

捕获.PNG

例如:

以下代码是用倒计时门闩实现的一个是所有线程同时执行同时结束之后,才能继续执行主线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Test {
final static int NTHREADS = 3;
public static void main(String[] args) {
final CountDownLatch startSignal = new CountDownLatch(1);
final CountDownLatch doneSignal = new CountDownLatch(NTHREADS);
Runnable r = new Runnable() {

@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + "进入等待");
startSignal.await();//3个线程进入等待,直到startSignal.countDown()被调用
System.out.println(Thread.currentThread().getName() + "开始执行任务");
Thread.sleep(200);
doneSignal.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
ExecutorService es = Executors.newFixedThreadPool(NTHREADS);
for (int i = 0; i < NTHREADS; i++) {
es.execute(r);
}
try {
Thread.sleep(1000);
startSignal.countDown();
//3个线程全部开始执行任务,主线程进入等待
System.out.println(Thread.currentThread().getName() + "进入等待");
//直到3个线程全部结束任务,doneSignal.countDown()被调用,主线程开始执行
doneSignal.await();
System.out.println(Thread.currentThread().getName() + "开始执行");
es.shutdownNow();

} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

2、CyclicBarrier(同步屏障)

应用场景:等待一组线程到达某个点后一起执行,该组线程达到指定点后可以再次循环执行。也可用于一组线程达达某个点后再执行某个方法。

用来控制多个线程互相等待,只有当多个线程都到达时,这些线程才会继续执行。

用法:定义一个CyclicBarrier变量barrier,线程达到某个约定点时调用barrier.await()方法,当该组所有线程都调用了barrier.await()方法后改组线程一起向下执行。

CyclicBarrier和CountDownLatch的区别

  • CountDownLatch的计数器只能使用一次。而CyclicBarrier的计数器可以使用reset() 方法重置。所以CyclicBarrier能处理更为复杂的业务场景,比如如果计算发生错误,可以重置计数器,并让线程们重新执行一次。
  • CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得CyclicBarrier阻塞的线程数量。isBroken方法用来知道阻塞的线程是否被中断。

若有多条线程,他们到达屏障时将会被阻塞,只有当所有线程都到达屏障时才能打开屏障,


所有线程同时执行,若有这样的需求可以使用同步屏障。此外,当屏障打开的同时还能指定执行的任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 创建同步屏障对象,并制定需要等待的线程个数 和 打开屏障时需要执行的任务
CyclicBarrier barrier = new CyclicBarrier(3,new Runnable(){
public void run(){
//当所有线程准备完毕后触发此任务
}
});
// 启动三条线程
for( int i=0; i<3; i++ ){
new Thread( new Runnable(){
public void run(){
// 等待,(每执行一次barrier.await,同步屏障数量-1,直到为0时,打开屏障)
barrier.await();
// 任务
任务代码……
}
} ).start();
}

倒计时门闩 与 同步屏障 的区别 倒计时门闩只会阻塞一条线程,目的是为了让该条任务线程满足条件后执行;

而同步屏障会阻塞所有线程,目的是为了让所有线程同时执行

3、Semaphore(信号量)

应用场景:对于一组有限制都资源访问。比如餐厅有5个位置但同时有7个人要吃饭,则要控制7个人对餐位的并发实用。

用法:定义Semaphore变量semaphore包含受限的资源个数,每个人要来用餐时先调用semaphore.acquire()方法获取一个餐位(若没有餐位,则阻塞等待),用完餐后调用semaphore.release()释放餐位给其它人用。

信号量维护了一组许可证,以约束访问被限制资源的线程数。当没有可用

的许可证时,线程的获取尝试会一直阻塞,直到其它的线程释放一个许可证。

【信号量
一个信号量管理多个许可证。为了通过信号量,线程通过调用acquire()请求许可。其实没有实际的许可对象,信号连也仅仅是维护一个计数器。
许可的数目是固定的,由此限制了线程通过的数量当一个线程执行完之后,应该调用release()释放许可证,让其他线程有机会执行。事实上,
任意一个线程都有可以释放任意个数的许可证,这可能会增加许可证的个数。所以我建议,如果不是非常明确的知道为什么要释放多个许可证,就一定
是让获得许可证的线程是放一个许可证。

【常用方法
1.构造函数:
  Semaphore(int permits):创建具有给定许可数和非公平设置的Semaphore

​   Semaphore(int permits,boolean fair):此类的构造方法可选地接受一个公平 参数。当设置为 false 时(默认也是false),此类不对线程获取许可的顺序做任何保证。

​ 特别地,闯入是允许的,也就是说可以在已经等待的线程前为调用 acquire() 的线程分配一个许可,从逻辑上说,就是新线程将自己置于等待线程队列的头部。
​ 当公平设置为 true 时,信号量保证对于任何调用获取方法的线程而言,都按照处理它们调用这些方法的顺序(即先进先出;FIFO)来选择线程、获得许可。
​ 注意,FIFO 排序必然应用到这些方法内的指定内部执行点。所以,可能某个线程先于另一个线程调用了 acquire,但是却在该线程之后到达排序点,并且从方法返回时也类似。

2.Semaphore还提供一些其他方法:
int availablePermits() :返回此信号量中当前可用的许可证数。
int getQueueLength():返回正在等待获取许可证的线程数。
boolean hasQueuedThreads() :是否有线程正在等待获取许可证。
void reducePermits(int reduction) :减少reduction个许可证。是个protected方法。
Collection getQueuedThreads() :返回所有等待获取许可证的线程集合。是个protected方法。

【补充
当许可证的个数为1时,可以充当互斥锁使用。

示例代码:

只能同时有5个线程访问的信号量

1
`// 创建信号量对象,并给予3个资源Semaphore semaphore = new Semaphore(3);// 开启10条线程for ( int i=0; i<10; i++ ) {    new Thread( new Runnbale(){        public void run(){            // 获取资源,若此时资源被用光,则阻塞,直到有线程归还资源            semaphore.acquire();            // 任务代码            ……            // 释放资源            semaphore.release();        }    } ).start();}`

4、Exchanger交换器

Exchanger

  • 交换值是同步的;
  • 成对的线程之间交换数据;
  • 可看成是双向的同步队列;
  • 可应用于演算法、流水线设计;

Exchanger类中的主要方法就是:exchange(V x)方法,成对的两个线程之间,都调用了该方法,就能在两个线程彼此都准备好数据后,成功的交换数据给对方,然后各自返回。如果想支持成对的两个线程之间,一个没耐性,等的时间过长,或者被打断了就不交换数据了,可以使用exchange(V x, long timeout, TimeUnit unit)方法。

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import java.util.concurrent.*;
import java.util.concurrent.locks.*;
import java.util.concurrent.atomic.*;

//球线程
class BallTask implements Runnable
{
private Exchanger<String> e;
public BallTask(Exchanger<String> e){
this.e = e;
}

public void run(){
try{
long sleepTime = (long)(Math.random() * 2500) ;
String tName = Thread.currentThread().getName();
System.out.println(tName+"正在买球,用时["+sleepTime+"]才买到球,赶紧去换鱼...");
Thread.sleep(sleepTime);
//这里的str即为交换的东西
String str = e.exchange(tName+":的球");
System.out.println("【"+tName+":的球】换到了-->【"+str+"】");
}

catch(Exception e){
}
finally{
}
}
}

//鱼线程
class FishTask implements Runnable
{
private Exchanger<String> e;
public FishTask(Exchanger<String> e){
this.e = e;
}

public void run(){
try{
long sleepTime = (long)(Math.random() * 2500) ;
String tName = Thread.currentThread().getName();
System.out.println(tName+"正在钓鱼,用时["+sleepTime+"]才钓到鱼,赶紧去换球...");
Thread.sleep(sleepTime);
String str = e.exchange(tName+":的鱼");
System.out.println("【"+tName+":的鱼】换到了-->【"+str+"】");
}
catch(Exception e){
}
finally{
}
}
}

public class ExchangerTest
{
public static void main(String[] args)
{
Exchanger<String> e = new Exchanger<String>();

BallTask bTask = new BallTask(e); //任务:球线程
FishTask fTask = new FishTask(e); //任务:鱼线程

Thread bThread = new Thread(bTask,"Ball");
Thread fThread = new Thread(fTask,"Fish");

bThread.start();
fThread.start();

System.out.println("我是主线程,准备看看你们交易情况...\n\r");

try{
//Thread类中的join方法的主要作用就是同步,它可以使得线程之间的并行执行变为串行执行。
bThread.join();
fThread.join();
}catch(Exception ep){}

System.out.println("\n\r我是主线程,已看到你们的交易结果...");
}
}

Exchanger和Semaphore区别
Exchanger交换器和Semaphore信号量在关于生产者消费者《产1消1模式》运用的区别:

1·Exchanger交换器:成对的两个线程,各个线程有各个线程的自己数据V,A线程拥有V1,B线程拥有V2,V1<…>V2互换。
2·Semaphore信号量:成对的两个线程,只需一个数据池即可,生产者生产数据注入数据池,消费者从数据池取走数据消费。
3·Exchanger交换器:两个线程之间的通讯仅仅一个Exchanger实例即可。
4·Semaphore信号量:两个线程之间的通讯需要两个信号量,生产信号指示灯,消费信号指示灯。
5·Exchanger和Semaphore的共同点:两个线程之间需要同步通讯。生产的过快,没用,必须等消费完了,才能进行下一生产1;同理,消费的过快,也没用,必须等生产完了,才能进行下一消费1。

5、同步队列与等待队列(待看)

书上:

【同步队列是一种将生产者与消费者线程配对的机制。当一个线程调用SynchronousQueue的put方法时,它会阻塞直到另一个线程调用take方法为止,反之亦然。与Exchanger的情况不同,数据仅仅沿一个方向传递,从生产者到消费者。即使SynchronousQueue类实现了BlockingQueue接口,概念上讲,它依然不是一个队列。它没有包含任何元素,它的size方法总是返回0。】

简单的理解是同步队列存放着竞争同步资源的线程的引用(不是存放线程),而等待队列存放着待唤醒的线程的引用。

同步队列中存放着一个个节点,当线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成为一个节点并将其加入同步队列,首节点表示的获取同步状态成功的线程节点。

在这里插入图片描述

Condition维护着一个等待队列与同步队列相似。主要针对await和signal的操作。
在这里插入图片描述

例子:

这里实现了三个多线程的run方法。A线程输出A然后通知B,然后B通知C。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public static class ThreadA extends Thread{
@Override
public void run(){
try{
lock.lock();
System.out.println("A进程输出" + " : " + ++index);
conditionB.signal();
conditionA.await();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
}

public static class ThreadB extends Thread{
@Override
public void run(){
try{
lock.lock();
System.out.println("B进程输出" + " : " + ++index);
conditionC.signal();
conditionB.await();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
}

public static class ThreadC extends Thread{
@Override
public void run(){
try{
lock.lock();
System.out.println("C进程输出" + " : " + ++index);
conditionA.signal();
conditionC.await();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class CondtionTest {

public static ReentrantLock lock = new ReentrantLock();
public static Condition conditionA = lock.newCondition();
public static Condition conditionB = lock.newCondition();
public static Condition conditionC = lock.newCondition();
public static int index = 0;
public static void main(String[] args){
ThreadA threadA = new ThreadA();
ThreadB threadB = new ThreadB();
ThreadC threadC = new ThreadC();

threadA.start();//(1)
threadB.start();//(2)
threadC.start();//(3)
}
}

当(1)(2)(3)三个线程被调用时,因为三个线程同时竞争lock,这里假设线程A拿到了lock(线程A虽然是看起来是先start(),但是正在的调用还是看调度程序的,所以这里只能假设是A线程拿到同步资源)。首节点表示的是正在操作同步资源的线程。所以现在的同步队列是:
在这里插入图片描述

接着线程A输出了:“A进程输出 : 1”。然后调用conditionB.signal(),其实这一步的signal是没什么意义的,因为conditionB现在没有线程是可以被唤醒的。
当conditionA.await()被执行到的时候,线程A同步队列中被移除,对应操作是锁的释放; 线程A(节点A)接着被加入到ConditionA等待队列,因为线程需要singal信号。

同步队列
在这里插入图片描述

A等待队列
在这里插入图片描述

现在在同步队列中的首节点是B节点,那么B线程占用了同步资源就可以开始运行了。先是输出“B进程输出 : 2”,同样的signal操作也是没有意义的,因为conditionC是没有可以被唤醒的线程。当conditionB.await()被执行到的时候,线程B同步队列中被移除,线程B(节点B)接着被加入到ConditionB等待队列

同步队列
在这里插入图片描述

B等待队列
在这里插入图片描述

终于轮到了C线程占用同步资源了,再输出“C进程输出:3”之后,调用conditionA.signal(),注意这个signal是有用的
因为在conditionA的等待队列中A线程是在等待的,把它取出来加入到同步队列中去竞争,但是这个时候线程A还没唤醒。首节点还是C

同步队列
在这里插入图片描述

接着conditionC.await()被执行。线程C同步队列中被移除,线程C(节点C)接着被加入到ConditionC等待队列

同步队列
在这里插入图片描述

C等待队列
在这里插入图片描述

注意到同步队列中的首节点已经变回了节点A了。所以线程A在刚刚等待的地方继续执行,最后释放了lock。但是线程B和线程C最后也没有其他线程去唤醒,状态一直为WAITING,而线程A的状态为TERMINATED。

6、定时器

定时器是一个应用十分广泛的线程工具,可用于调度多个定时任务以后台线程的方式执行,在Java中,可以通过Timer和TimerTask类来实现定义调度的功能。

Timer类

1
2
3
4
5
public Timer()默认的构造方法
public void schedule(TimerTask task, long delay) 在指定的延迟后执行指定的任务
public void schedule(TimerTask task,long delay,long period) 计划重复固定延迟执行指定的任务,在指定的延迟后开始
public void schedule(TimerTask task, Date time) 在指定的时间计划指定的任务
public void schedule(TimerTask task, Date firstTime, long period) 计划重复固定延迟执行指定的任务,在指定的开始时间

TimerTask类

1
2
public abstract void run()	定时器任务执行的动作
public boolean cancel() 取消此定时器任务

代码演示

1、输出爆炸啦

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import java.util.Timer;
import java.util.TimerTask;

//继承TimerTask,需要重新rum方法
public class Time extends TimerTask {
@Override
public void run() {
System.out.println("爆炸啦");
}
}
import java.util.Timer;

public class TimeTest {
public static void main(String[] args) {
//new一个Timer对象,用来调方法
Timer timer = new Timer();
//调用Timer对象的方法schedule,第一个参数必须是TimerTask对象,Time继承了它因此也是这个对象,第二个参数表示在2秒后运行run方法,这个参数只有在第一次使用run方法,最后一个参数是每隔1秒,运行一次run方法
timer.schedule(new Time(),2000,1000);
}
}

这个结果是,不停的输出爆炸啦,要想停掉的话,可以加入cancel()方法,在run方法最后加入这个话,结果会输出一个爆炸啦,因为执行第一次完后就执行到这一语句,定时器就取消了,如下。

2、

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
javapublic class Time extends TimerTask {
//定义一个Timer对象,到时候可以用它来调用cancel方法
Timer time;
//通过构造器给Timer对象赋值
public Time(Timer time){
this.time=time;
}
@Override
public void run() {
System.out.println("爆炸啦");
//取消定时器
time.cancel();
}
}
public class TimeTest {
public static void main(String[] args) {
Timer timer = new Timer();
timer.schedule(new Time(timer),2000,1000);
}
}

结果

爆炸啦

八、J.U.C-其他组件

fork-join框架

​ Fork/Join框架是Java 7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。Fork/Join框架要完成两件事情:

  1.任务分割:首先Fork/Join框架需要把大的任务分割成足够小的子任务,如果子任务比较大的话还要对子任务进行继续分割

  2.执行任务并合并结果:分割的子任务分别放到双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都放在另外一个队列里,启动一个线程从队列里取数据,然后合并这些数据。

Fork/Join框架简化了并行程序的原因有:

  • 它简化了线程的创建,在框架中线程是自动被创建和管理。
  • 它自动使用多个处理器,因此程序可以扩展到使用可用处理器。

由于支持真正的并行执行,Fork/Join框架可以显著减少计算时间,并提高解决图像处理、视频处理、大数据处理等非常大问题的性能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class ForkJoinExample extends RecursiveTask<Integer> {

private final int threshold = 5;
private int first;
private int last;

public ForkJoinExample(int first, int last) {
this.first = first;
this.last = last;
}

@Override
protected Integer compute() {
int result = 0;
if (last - first <= threshold) {
// 任务足够小则直接计算
for (int i = first; i <= last; i++) {
result += i;
}
} else {
// 拆分成小任务
int middle = first + (last - first) / 2;
ForkJoinExample leftTask = new ForkJoinExample(first, middle);
ForkJoinExample rightTask = new ForkJoinExample(middle + 1, last);
leftTask.fork();
rightTask.fork();
result = leftTask.join() + rightTask.join();
}
return result;
}
}
1
2
3
4
5
6
public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkJoinExample example = new ForkJoinExample(1, 10000);
ForkJoinPool forkJoinPool = new ForkJoinPool();
Future result = forkJoinPool.submit(example);
System.out.println(result.get());
}

关于Fork/Join框架的一个有趣的地方是:它使用工作窃取算法来平衡线程之间的负载:如果一个工作线程没有事情要做,它可以从其他仍然忙碌的线程窃取任务。

Fork/Join框架在java.util.concurrent包下被实现。它的核心有4个类:

  • ForkJoinTask: 这是一个抽象任务类,并且运行在ForkJoinPool中。
  • ForkJoinPool:这是一个线程池管理并运行众多ForkJoinTask任务。
  • RecursiveAction: ForkJoinTask的子类,这个类没有返回值。
  • RecursiveTask: ForkJoinTask的子类,有返回值。

基本上,我们解决问题的代码是在RecursiveAction或者RecursiveTask中进行的,然后将任务提交由ForkJoinPool`执行,ForkJoinPool处理从线程管理到多核处理器的利用等各种事务。

我们先来理解一下这些类中的关键方法。

ForkJoinTask

这是一个运行在ForkJoinPool中的抽象的任务类。类型V指定了任务的返回结果。ForkJoinTask是一个类似线程的实体,它表示任务的轻量级抽象,而不是实际的执行线程。该机制允许由ForkJoinPool中的少量实际线程管理大量任务。其关键方法是:

  • final ForkJoinTask fork()
  • final V join()
  • final V invoke()

fork()方法提交并执行异步任务,该方法返回ForkJoinTask并且调用线程继续运行。

join()方法等待任务直到返回结果。

invoke()方法是组合了fork()join(),它开始一个任务并等待结束返回结果。

此外,ForkJoinTask中还提供了用于一次调用多个任务的两个静态方法

  • static void invokeAll(ForkJoinTask task1, ForkJoinTask task2) :执行两个任务
  • static void invokeAll(ForkJoinTask<?>… taskList):执行任务集合

RecursiveAction

这是一个递归的ForkJoinTask子类,不返回结果。Recursive意思是任务可以通过分治策略分成自己的子任务。

我们必须重写compute()方法,并将计算代码写在其中:

1
protected abstract void compute();

RecursiveTask

RecursiveAction一样,但是RecursiveTask有返回结果,结果类型由V指定。我们仍然需要重写compute()方法:

1
protected abstract V compute();

ForkJoinPool

任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务(工作窃取算法)。

这是Fork/Join框架的核心类。它负责线程的管理和ForkJoinTask的执行,为了执行ForkJoinTask,首先需要获取到ForkJoinPool的实例。

有两种构造器方式可以获取ForkJoinPool的实例,第一种使用构造器创建:

  • ForkJoinPool(): 使用默认的构造器创建实例,该构造器创建出的池与系统中可用的处理器数量相等
  • ForkJoinPool(int parallelism):该构造器指定处理器数量,创建具有自定义并行度级别的池,该级别的并行度必须大于0,且不超过可用处理器的实际数量。

获取ForkJoinPool实例的第二种方法是使用以下ForkJoinPool的静态方法获取公共池实例:

1
public static ForkJoinPool commonPool();

这种方式创建的池不受shutdown()或者shutdownNow()方法的影响,但是他会在System.exit()时会自动中止。任何依赖异步任务处理的程序在主体程序中止前都应该调用awaitQuiescence()方法。该方式是静态的,可以自动被使用。

e.g.

在创建好ForkJoinPool实例之后,可以使用下面的方法执行任务:

  • T invoke(ForkJoinTask task):执行指定任务并返回结果,该方法是异步的,调用的线程会一直等待直到该方法返回结果,对于RecursiveAction任务来说,参数类型是Void.
  • void execute(ForkJoinTask<?> task):异步执行指定的任务,调用的线程一直等待知道任务完成才会继续执行。

另外,也可以通过ForkJoinTask自己拥有的方法fork()invoke()执行任务。在这种情况下,如果任务还没在ForkJoinPool中运行,那么commonPool()将会自动被使用。

值得注意的一点是:ForkJoinPool使用的是守护线程,当所有的用户线程被终止是它也会被终止,这意味着可以不必显示的关闭ForkPoolJoin(虽然这样也可以)。如果是common pool的情况下,调用shutdown没有任何效果,应为这个池总是可用的。

使用RecursiveAction

假设要对一个很大的数字数组进行变换,为了简单简单起见,转换只需要将数组中的每个元素乘以指定的数字。下面的代码用于转换任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import java.util.concurrent.*;

public class ArrayTransform extends RecursiveAction {
int[] array;
int number;
int threshold = 100_000;
int start;
int end;

public ArrayTransform(int[] array, int number, int start, int end) {
this.array = array;
this.number = number;
this.start = start;
this.end = end;
}

@Override
protected void compute() {
if (end - start < threshold) {
computeDirectly();
} else {
int middle = (end + start) / 2;

ArrayTransform subTask1 = new ArrayTransform(array, number, start, middle);
ArrayTransform subTask2 = new ArrayTransform(array, number, middle, end);

invokeAll(subTask1, subTask2);
}
}

protected void computeDirectly() {
for (int i = start; i < end; i++) {
array[i] = array[i] * number;
}
}
}

可以看到,这是一个RecursiveAction的子类,我们重写了compute()方法。

数组和数字从它的构造函数传递。参数start和end指定要处理的数组中的元素的范围。如果数组的大小大于阈值,这有助于将数组拆分为子数组,否则直接对整个数组执行计算。

观察else中的代码片段:

1
2
3
4
5
6
7
8
9
10
11
12
protected void compute() {
if (end - start < threshold) {
computeDirectly();
} else {
int middle = (end + start) / 2;

ArrayTransform subTask1 = new ArrayTransform(array, number, start, middle);
ArrayTransform subTask2 = new ArrayTransform(array, number, middle, end);

invokeAll(subTask1, subTask2);
}
}

这里,将数组分成两个部分,并分别创建他们的子任务,反过来,子任务也可以递归的进一步划分为更小的子任务,直到其大小小于直接调用computeDirectly();方法的的阈值。

然后,在main函数中创建ForkJoinPool执行任务:

1
2
3
ArrayTransform mainTask = new ArrayTransform(array, number, 0, SIZE);
ForkJoinPool pool = new ForkJoinPool();
pool.invoke(mainTask);

或者使用common pool执行任务:

1
2
ArrayTransform mainTask = new ArrayTransform(array, number, 0, SIZE);
mainTask.invoke();

这里是全部的测试程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import java.util.*;
import java.util.concurrent.*;

public class ForkJoinRecursiveActionTest {
static final int SIZE = 10_000_000;
static int[] array = randomArray();

public static void main(String[] args) {

int number = 9;

System.out.println("数组中的初始元素: ");
print();
//ArrayTransform 是一个继承RecursiveAction的类,在覆写compute方法时改写成递归方法
ArrayTransform mainTask = new ArrayTransform(array, number, 0, SIZE);
//ForkJoinPool是一个线程池管理并运行众多ForkJoinTask任务。
ForkJoinPool pool = new ForkJoinPool();
//invoke()方法是组合了fork()和join(),它开始一个任务并等待结束返回结果。
pool.invoke(mainTask);
System.out.println("并行计算之后的元素:");
print();
}

static int[] randomArray() {
int[] array = new int[SIZE];
Random random = new Random();

for (int i = 0; i < SIZE; i++) {
array[i] = random.nextInt(100);
}

return array;
}

static void print() {
for (int i = 0; i < 10; i++) {
System.out.print(array[i] + ", ");
}
System.out.println();
}
}

如您所见,使用随机生成的1,000万个元素数组进行测试。由于数组太大,我们在计算前后只打印前10个元素,看效果如何:

1
2
3
4
数组中的初始元素:
42, 98, 43, 14, 9, 92, 33, 18, 18, 76,
并行计算之后的元素:
378, 882, 387, 126, 81, 828, 297, 162, 162, 684,

使用RecursiveTask

这个例子中,展示了如何使用带有返回值的任务,下面的任务计算在一个大数组中出现偶数的次数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import java.util.concurrent.*;

public class ArrayCounter extends RecursiveTask<Integer> {
int[] array;
int threshold = 100_000;
int start;
int end;

public ArrayCounter(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}

protected Integer compute() {
if (end - start < threshold) {
return computeDirectly();
} else {
int middle = (end + start) / 2;

ArrayCounter subTask1 = new ArrayCounter(array, start, middle);
ArrayCounter subTask2 = new ArrayCounter(array, middle, end);

invokeAll(subTask1, subTask2);


return subTask1.join() + subTask2.join();
}
}

protected Integer computeDirectly() {
Integer count = 0;

for (int i = start; i < end; i++) {
if (array[i] % 2 == 0) {
count++;
}
}

return count;
}
}

如你所见,这个类是RecursiveTask的子类并且重写了compute()方法,并且返回了一个整型的结果。

这里还使用了join()方法去合并子任务的结果:

1
return subTask1.join() + subTask2.join();

测试程序就和RecursiveAction的一样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import java.util.*;
import java.util.concurrent.*;

public class ForkJoinRecursiveTaskTest {
static final int SIZE = 10_000_000;
static int[] array = randomArray();

public static void main(String[] args) {

ArrayCounter mainTask = new ArrayCounter(array, 0, SIZE);
ForkJoinPool pool = new ForkJoinPool();
Integer evenNumberCount = pool.invoke(mainTask);

System.out.println("偶数的个数: " + evenNumberCount);
}

static int[] randomArray() {
int[] array = new int[SIZE];
Random random = new Random();

for (int i = 0; i < SIZE; i++) {
array[i] = random.nextInt(100);
}

return array;
}

}

运行程序就会看到如下的结果:

1
偶数的个数: 5000045

并行性试验

这个例子展示并行性的级别如何影响计算时间:

ArrayCounter类让阈值可以通过构造器传入:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import java.util.concurrent.*;

public class ArrayCounter extends RecursiveTask<Integer> {
int[] array;
int threshold;
int start;
int end;

public ArrayCounter(int[] array, int start, int end, int threshold) {
this.array = array;
this.start = start;
this.end = end;
this.threshold = threshold;
}

protected Integer compute() {
if (end - start < threshold) {
return computeDirectly();
} else {
int middle = (end + start) / 2;

ArrayCounter subTask1 = new ArrayCounter(array, start, middle, threshold);
ArrayCounter subTask2 = new ArrayCounter(array, middle, end, threshold);

invokeAll(subTask1, subTask2);


return subTask1.join() + subTask2.join();
}
}

protected Integer computeDirectly() {
Integer count = 0;

for (int i = start; i < end; i++) {
if (array[i] % 2 == 0) {
count++;
}
}

return count;
}
}

测试程序将并行度级别和阈值作为参数传递:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import java.util.*;
import java.util.concurrent.*;

public class ParallelismTest {
static final int SIZE = 10_000_000;

static int[] array = randomArray();

public static void main(String[] args) {
int threshold = Integer.parseInt(args[0]);
int parallelism = Integer.parseInt(args[1]);

long startTime = System.currentTimeMillis();

ArrayCounter mainTask = new ArrayCounter(array, 0, SIZE, threshold);
ForkJoinPool pool = new ForkJoinPool(parallelism);
Integer evenNumberCount = pool.invoke(mainTask);

long endTime = System.currentTimeMillis();

System.out.println("偶数的个数: " + evenNumberCount);

long time = (endTime - startTime);
System.out.println("执行时间: " + time + " ms");
}

static int[] randomArray() {
int[] array = new int[SIZE];
Random random = new Random();

for (int i = 0; i < SIZE; i++) {
array[i] = random.nextInt(100);
}

return array;
}

}

该程序允许您使用不同的并行度和阈值轻松测试性能。注意,它在最后打印执行时间。尝试用不同的参数多次运行这个程序,并观察执行时间。

结论

  • Fork/Join框架的设计简化了java语言的并行程序
  • ForkJoinPoolFork/Join框架的核心,它允许多个ForkJoinTask请求由少量实际线程执行,每个线程运行在单独的处理核心上
  • 既可以通过构造器也可以通过静态方法common pool去获取ForkJoinPool的实例
  • ForkJoinTask是一个抽象类,它表示的任务比普通线程更轻。通过覆盖其compute()方法实现计算逻辑
  • RecursiveAction是一个没有返回值的ForkJoinTask
  • RecursiveTask是一个有返回值的ForkJoinTask
  • ForkJoinPool与其它池的不同之处在于,它使用了工作窃取算法,该算法允许一个线程完成了可以做的事情,从仍然繁忙的其他线程窃取任务
  • ForkJoinPool中的线程是守护线程,不必显式地关闭池
  • 执行一个ForkJoinTask既可以通过调用它自己的invoke()fork()方法,也可以提交任务给ForkJoinPool并调用它的invoke()或者execute()方法
  • 直接使用ForkJoinTask自身的方法执行任务,如果它还没运行在ForkJoinPool中那么将运行在common pool
  • ForkJoinTask中使用join()方法,可以合并子任务的结果
  • invoke()方法会等待子任务完成,但是execute()方法不会

阻塞队列

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

当试图向队列添加元素而队列已满,或是想从队列移出元素而队列为空的时候,阻塞队列(blockingqueue)导致线程阻塞。

阻塞队列提供了四种处理方法:

方法\处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
检查方法 element() peek() 不可用 不可用

异常:是指当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException(“Queue full”)异常。当队列为空时,从队列里获取元素时会抛出NoSuchElementException异常 。

  • 返回特殊值:插入方法会返回是否成功,成功则返回true。移除方法,则是从队列里拿出一个元素,如果没有则返回null
  • 一直阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到拿到数据,或者响应中断退出。当队列空时,消费者线程试图从队列里take元素,队列也会阻塞消费者线程,直到队列可用。
  • 超时退出:当阻塞队列满时,队列会阻塞生产者线程一段时间,如果超过一定的时间,生产者线程就会退出。

BlockingQueue的核心方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public interface BlockingQueue<E> extends Queue<E> {

//将给定元素设置到队列中,如果设置成功返回true, 否则抛出异常。如果是往限定了长度的队列中设置值,推荐使用offer()方法。
boolean add(E e);

//将给定的元素设置到队列中,如果设置成功返回true, 否则返回false. e的值不能为空,否则抛出空指针异常。
boolean offer(E e);

//将元素设置到队列中,如果队列中没有多余的空间,该方法会一直阻塞,直到队列中有多余的空间。
void put(E e) throws InterruptedException;

//将给定元素在给定的时间内设置到队列中,如果设置成功返回true, 否则返回false.
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;

//从队列中获取值,如果队列中没有值,线程会一直阻塞,直到队列中有值,并且该方法取得了该值。
E take() throws InterruptedException;

//在给定的时间里,从队列中获取值,如果没有取到会抛出异常。
E poll(long timeout, TimeUnit unit)
throws InterruptedException;

//获取队列中剩余的空间。
int remainingCapacity();

//从队列中移除指定的值。
boolean remove(Object o);

//判断队列中是否拥有该值。
public boolean contains(Object o);

//将队列中值,全部移除,并发设置到给定的集合中。
int drainTo(Collection<? super E> c);

//指定最多数量限制将队列中值,全部移除,并发设置到给定的集合中。
int drainTo(Collection<? super E> c, int maxElements);
}

在深入之前先了解下下ReentrantLock 和 Condition:
重入锁ReentrantLock:
ReentrantLock锁在同一个时间点只能被一个线程锁持有;而可重入的意思是,ReentrantLock锁,可以被单个线程多次获取。
ReentrantLock分为“公平锁”和“非公平锁”。它们的区别体现在获取锁的机制上是否公平。“锁”是为了保护竞争资源,防止多个线程同时操作线程而出错,ReentrantLock在同一个时间点只能被一个线程获取(当某线程获取到“锁”时,其它线程就必须等待);ReentraantLock是通过一个FIFO的等待队列来管理获取该锁所有线程的。在“公平锁”的机制下,线程依次排队获取锁;而“非公平锁”在锁是可获取状态时,不管自己是不是在队列的开头都会获取锁。
主要方法:

  • lock()获得锁
  • lockInterruptibly()获得锁,但优先响应中断
  • tryLock()尝试获得锁,成功返回true,否则false,该方法不等待,立即返回
  • tryLock(long time,TimeUnit unit)在给定时间内尝试获得锁
  • unlock()释放锁

Condition:await()、signal()方法分别对应之前的Object的wait()和notify()

  • 和重入锁一起使用
  • await()是当前线程等待同时释放锁
  • awaitUninterruptibly()不会在等待过程中响应中断
  • signal()用于唤醒一个在等待的线程,还有对应的singalAll()方法

阻塞队列的成员

队列 有界性 数据结构
ArrayBlockingQueue bounded(有界) 加锁 arrayList
LinkedBlockingQueue optionally-bounded 加锁 linkedList
PriorityBlockingQueue unbounded 加锁 heap
DelayQueue unbounded 加锁 heap
SynchronousQueue bounded 加锁
LinkedTransferQueue unbounded 加锁 heap
LinkedBlockingDeque unbounded 无锁 heap

下面分别简单介绍一下:

  • ArrayBlockingQueue:是一个用数组实现的有界阻塞队列,此队列按照先进先出(FIFO)的原则对元素进行排序。支持公平锁和非公平锁。【注:每一个线程在获取锁的时候可能都会排队等待,如果在等待时间上,先获取锁的线程的请求一定先被满足,那么这个锁就是公平的。反之,这个锁就是不公平的。公平的获取锁,也就是当前等待时间最长的线程先获取锁】
  • LinkedBlockingQueue:一个由链表结构组成的有界队列,此队列的长度为Integer.MAX_VALUE。此队列按照先进先出的顺序进行排序。
  • PriorityBlockingQueue: 一个支持线程优先级排序的无界队列,默认自然序进行排序,也可以自定义实现compareTo()方法来指定元素排序规则,不能保证同优先级元素的顺序。
  • DelayQueue: 一个实现PriorityBlockingQueue实现延迟获取的无界队列,在创建元素时,可以指定多久才能从队列中获取当前元素。只有延时期满后才能从队列中获取元素。(DelayQueue可以运用在以下应用场景:1.缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。2.定时任务调度。使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,从比如TimerQueue就是使用DelayQueue实现的。)
  • SynchronousQueue: 一个不存储元素的阻塞队列,每一个put操作必须等待take操作,否则不能添加元素。支持公平锁和非公平锁。SynchronousQueue的一个使用场景是在线程池里。Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。
  • LinkedTransferQueue: 一个由链表结构组成的无界阻塞队列,相当于其它队列,LinkedTransferQueue队列多了transfer和tryTransfer方法。
  • LinkedBlockingDeque: 一个由链表结构组成的双向阻塞队列。队列头部和尾部都可以添加和移除元素,多线程并发时,可以将锁的竞争最多降到一半。

ArrayBlockingQueue、LinkedBlockingQueue以及DelayQueue介绍

https://www.cnblogs.com/bjxq-cs88/p/9759571.html

九、线程安全的集合

首先要明白线程的工作原理,jvm有一个main memory,而每个线程有自己的working memory,一个线程对一个variable进行操作时,都要在自己的working memory里面建立一个copy,操作完之后再写入main memory。多个线程同时操作同一个variable,就可能会出现不可预知的结果。根据上面的解释,很容易想出相应的scenario。

而用synchronized的关键是建立一个monitor,这个monitor可以是要修改的variable也可以其他你认为合适的object比如method,然后通过给这个monitor加锁来实现线程安全,每个线程在获得这个锁之后,要执行完load到workingmemory -> use&assign -> store到mainmemory 的过程,才会释放它得到的锁。这样就实现了所谓的线程安全。

什么是线程安全?线程安全是怎么完成的(原理)? 线程安全就是说多线程访问同一代码,不会产生不确定的结果。编写线程安全的代码是低依靠线程同步。

1、早期线程安全的集合

1、Vector、ArrayList、LinkedList

Vector和ArrayList在使用上非常相似,都可用来表示一组数量可变的对象应用的集合,并且可以随机地访问其中的元素。

   Vector的方法都是同步的(Synchronized),是线程安全的(thread-safe),而ArrayList的方法不是,由于线程的同步必然要影响性能,因此,ArrayList的性能比Vector好。

ArrayList和LinkedList区别

   对于处理一列数据项,Java提供了两个类ArrayList和LinkedList,ArrayList的内部实现是基于内部数组Object[],所以从概念上讲,它更象数组,但LinkedList的内部实现是基于一组连接的记录,所以,它更像一个链表结构,所以,它们在性能上有很大的差别。

   从上面的分析可知,在ArrayList的前面或中间插入数据时,你必须将其后的所有数据相应的后移,这样必然要花费较多时间,所以,当你的操作是在一列数据的后面添加数据而不是在前面或中间,并且需要随机地访问其中的元素时,使用ArrayList会提供比较好的性能

   而访问链表中的某个元素时,就必须从链表的一端开始沿着连接方向一个一个元素地去查找,直到找到所需的元素为止,所以,当你的操作是在一列数据的前面或中间添加或删除数据,并且按照顺序访问其中的元素时,就应该使用LinkedList了。

   如果在编程中,1,2两种情形交替出现,这时,你可以考虑使用List这样的通用接口,而不用关心具体的实现,在具体的情形下,它的性能由具体的实现来保证。

2、HashTable,HashMap,HashSet

HashTable和HashMap采用相同的存储机制,二者的实现基本一致,不同的是:

1)、HashMap是非线程安全的,HashTable是线程安全的,内部的方法基本都是synchronized。

2)、HashTable不允许有null值的存在。

在HashTable中调用put方法时,如果key为null,直接抛出NullPointerException。其它细微的差别还有,比如初始化Entry数组的大小等等,但基本思想和HashMap一样。

HashSet:

1、HashSet基于HashMap实现,无容量限制。

2、HashSet是非线程安全的。

3、HashSet不保证有序。

HashMap:

1、HashMap采用数组方式存储key,value构成的Entry对象,无容量限制。

2、HashMap基于Key hash查找Entry对象存放到数组的位置,对于hash冲突采用链表的方式来解决。

3、HashMap在插入元素时可能会要扩大数组的容量,在扩大容量时须要重新计算hash,并复制对象到新的数组中。

4、HashMap是非线程安全的。

5、HashMap遍历使用的是Iterator

HashTable

1、HashTable是线程安全的。

2、HashTable中无论是Key,还是Value都不允许为null。

3、HashTable遍历使用的是Enumeration。

TreeSet,TreeMap

TreeSet:

1、TreeSet基于TreeMap实现,支持排序。

2、TreeSet是非线程安全的。

从对HashSet和TreeSet的描述来看,TreeSet和HashSet一样,也是完全基于Map来实现的,并且都不支持get(int)来获取指定位置的元素(需要遍历获取),另外TreeSet还提供了一些排序方面的支持。例如传入Comparator实现、descendingSet以及descendingIterator等。

TreeMap:

1、TreeMap是一个典型的基于红黑树的Map实现,因此它要求一定要有Key比较的方法,要么传入Comparator实现,要么key对象实现Comparable接口。

2、TreeMap是非线程安全的。

2、Collections包装方法

Vector和HashTable被弃用后,它们被ArrayList和HashMap代替,但它们不是线程安全的,所以Collections工具类中提供了相应的包装方法把它们包装成线程安全的集合

1
2
3
4
5
List<E> synArrayList = Collections.synchronizedList(new ArrayList<E>());

Set<E> synHashSet = Collections.synchronizedSet(new HashSet<E>());

Map<K,V> synHashMap = Collections.synchronizedMap(new HashMap<K,V>());

Collections针对每种集合都声明了一个线程安全的包装类,在原集合的基础上添加了锁对象,集合中的每个方法都通过这个锁对象实现同步

3、java.util.concurrent包中的集合

在java.util.concurrent包中,不但包含了我们本篇要说的线程安全的集合,还涉及到了多线程、CAS、线程锁等相关内容,可以说是完整覆盖了Java并发的知识栈。

对于Java开发人员来说,学好java.util.concurrent包下的内容,是一个必备的功课,也是逐渐提升自己的一个重要阶段。

1.ConcurrentHashMap

ConcurrentHashMap和HashTable都是线程安全的集合,它们的不同主要是加锁粒度上的不同。HashTable的加锁方法是给每个方法加上synchronized关键字,这样锁住的是整个Table对象。而ConcurrentHashMap是更细粒度的加锁
在JDK1.8之前,ConcurrentHashMap加的是分段锁,也就是Segment锁,每个Segment含有整个table的一部分,这样不同分段之间的并发操作就互不影响
JDK1.8对此做了进一步的改进,它取消了Segment字段,直接在table元素上加锁,实现对每一行进行加锁,进一步减小了并发冲突的概率

2.CopyOnWriteArrayList和CopyOnWriteArraySet

它们是加了写锁的ArrayList和ArraySet,锁住的是整个对象,但读操作可以并发执行

3.其他

除此之外还有ConcurrentSkipListMap、ConcurrentSkipListSet、ConcurrentLinkedQueue、ConcurrentLinkedDeque等,至于为什么没有ConcurrentArrayList,原因是无法设计一个通用的而且可以规避ArrayList的并发瓶颈的线程安全的集合类,只能锁住整个list,这用Collections里的包装类就能办到

Collection集合:

List:

1
CopyOnWriteArrayList

Set:

1
2
CopyOnWriteArraySet
ConcurrentSkipListSet

Queue:

1
2
3
4
5
6
7
8
9
10
BlockingQueue:
LinkedBlockingQueue
DelayQueue
PriorityBlockingQueue
ConcurrentLinkedQueue
TransferQueue:
LinkedTransferQueue
BlockingDeque:
LinkedBlockingDeque
ConcurrentLinkedDeque

Map集合:

Map:

1
2
3
4
ConcurrentMap:
ConcurrentHashMap
ConcurrentSkipListMap
ConcurrentNavigableMap

通过以上可以看出,java.util.concurrent包为每一类集合都提供了线程安全的实现。

线程安全有以下几种实现方式:

不可变

不可变(Immutable)的对象一定是线程安全的,不需要再采取任何的线程安全保障措施。只要一个不可变的对象被正确地构建出来,永远也不会看到它在多个线程之中处于不一致的状态。多线程环境下,应当尽量使对象成为不可变,来满足线程安全。

不可变的类型:

  • final 关键字修饰的基本数据类型
  • String
  • 枚举类型
  • Number 部分子类,如 Long 和 Double 等数值包装类型,BigInteger 和 BigDecimal 等大数据类型。但同为 Number 的原子类 AtomicInteger 和 AtomicLong 则是可变的。

对于集合类型,可以使用 Collections.unmodifiableXXX() 方法来获取一个不可变的集合。

1
2
3
4
5
6
7
public class ImmutableExample {
public static void main(String[] args) {
Map<String, Integer> map = new HashMap<>();
Map<String, Integer> unmodifiableMap = Collections.unmodifiableMap(map);
unmodifiableMap.put("a", 1);
}
}
1
2
3
Exception in thread "main" java.lang.UnsupportedOperationException
at java.util.Collections$UnmodifiableMap.put(Collections.java:1457)
at ImmutableExample.main(ImmutableExample.java:9)

Collections.unmodifiableXXX() 先对原始的集合进行拷贝,需要对集合进行修改的方法都直接抛出异常。

1
2
3
public V put(K key, V value) {
throw new UnsupportedOperationException();
}

互斥同步

synchronized 和 ReentrantLock。

非阻塞同步

互斥同步最主要的问题就是线程阻塞和唤醒所带来的性能问题,因此这种同步也称为阻塞同步。

互斥同步属于一种悲观的并发策略,总是认为只要不去做正确的同步措施,那就肯定会出现问题。无论共享数据是否真的会出现竞争,它都要进行加锁(这里讨论的是概念模型,实际上虚拟机会优化掉很大一部分不必要的加锁)、用户态核心态转换、维护锁计数器和检查是否有被阻塞的线程需要唤醒等操作。

随着硬件指令集的发展,我们可以使用基于冲突检测的乐观并发策略:先进行操作,如果没有其它线程争用共享数据,那操作就成功了,否则采取补偿措施(不断地重试,直到成功为止)。这种乐观的并发策略的许多实现都不需要将线程阻塞,因此这种同步操作称为非阻塞同步。

1. CAS

乐观锁需要操作和冲突检测这两个步骤具备原子性,这里就不能再使用互斥同步来保证了,只能靠硬件来完成。硬件支持的原子性操作最典型的是:比较并交换(Compare-and-Swap,CAS)。CAS 指令需要有 3 个操作数,分别是内存地址 V、旧的预期值 A 和新值 B。当执行操作时,只有当 V 的值等于 A,才将 V 的值更新为 B。

2. AtomicInteger

J.U.C 包里面的整数原子类 AtomicInteger 的方法调用了 Unsafe 类的 CAS 操作。

以下代码使用了 AtomicInteger 执行了自增的操作。

1
2
3
4
5
private AtomicInteger cnt = new AtomicInteger();

public void add() {
cnt.incrementAndGet();
}

以下代码是 incrementAndGet() 的源码,它调用了 Unsafe 的 getAndAddInt() 。

1
2
3
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}

以下代码是 getAndAddInt() 源码,var1 指示对象内存地址,var2 指示该字段相对对象内存地址的偏移,var4 指示操作需要加的数值,这里为 1。通过 getIntVolatile(var1, var2) 得到旧的预期值,通过调用 compareAndSwapInt() 来进行 CAS 比较,如果该字段内存地址中的值等于 var5,那么就更新内存地址为 var1+var2 的变量为 var5+var4。

可以看到 getAndAddInt() 在一个循环中进行,发生冲突的做法是不断的进行重试。

1
2
3
4
5
6
7
8
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

return var5;
}

3. ABA

如果一个变量初次读取的时候是 A 值,它的值被改成了 B,后来又被改回为 A,那 CAS 操作就会误认为它从来没有被改变过。

J.U.C 包提供了一个带有标记的原子引用类 AtomicStampedReference 来解决这个问题,它可以通过控制变量值的版本来保证 CAS 的正确性。大部分情况下 ABA 问题不会影响程序并发的正确性,如果需要解决 ABA 问题,改用传统的互斥同步可能会比原子类更高效。

无同步方案

要保证线程安全,并不是一定就要进行同步。如果一个方法本来就不涉及共享数据,那它自然就无须任何同步措施去保证正确性。

1. 栈封闭

多个线程访问同一个方法的局部变量时,不会出现线程安全问题,因为局部变量存储在虚拟机栈中,属于线程私有的。

1
2
3
4
5
6
7
8
9
public class StackClosedExample {
public void add100() {
int cnt = 0;
for (int i = 0; i < 100; i++) {
cnt++;
}
System.out.println(cnt);
}
}
1
2
3
4
5
6
7
public static void main(String[] args) {
StackClosedExample example = new StackClosedExample();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> example.add100());
executorService.execute(() -> example.add100());
executorService.shutdown();
}
1
2
100
100

2. 线程本地存储(Thread Local Storage)

如果一段代码中所需要的数据必须与其他代码共享,那就看看这些共享数据的代码是否能保证在同一个线程中执行。如果能保证,我们就可以把共享数据的可见范围限制在同一个线程之内,这样,无须同步也能保证线程之间不出现数据争用的问题。

符合这种特点的应用并不少见,大部分使用消费队列的架构模式(如“生产者-消费者”模式)都会将产品的消费过程尽量在一个线程中消费完。其中最重要的一个应用实例就是经典 Web 交互模型中的“一个请求对应一个服务器线程”(Thread-per-Request)的处理方式,这种处理方式的广泛应用使得很多 Web 服务端应用都可以使用线程本地存储来解决线程安全问题。

可以使用 java.lang.ThreadLocal 类来实现线程本地存储功能。

对于以下代码,thread1 中设置 threadLocal 为 1,而 thread2 设置 threadLocal 为 2。过了一段时间之后,thread1 读取 threadLocal 依然是 1,不受 thread2 的影响。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class ThreadLocalExample {
public static void main(String[] args) {
ThreadLocal threadLocal = new ThreadLocal();
Thread thread1 = new Thread(() -> {
threadLocal.set(1);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(threadLocal.get());
threadLocal.remove();
});
Thread thread2 = new Thread(() -> {
threadLocal.set(2);
threadLocal.remove();
});
thread1.start();
thread2.start();
}
}
1
1

为了理解 ThreadLocal,先看以下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class ThreadLocalExample1 {
public static void main(String[] args) {
ThreadLocal threadLocal1 = new ThreadLocal();
ThreadLocal threadLocal2 = new ThreadLocal();
Thread thread1 = new Thread(() -> {
threadLocal1.set(1);
threadLocal2.set(1);
});
Thread thread2 = new Thread(() -> {
threadLocal1.set(2);
threadLocal2.set(2);
});
thread1.start();
thread2.start();
}
}

它所对应的底层结构图为:


每个 Thread 都有一个 ThreadLocal.ThreadLocalMap 对象。

1
2
3
/* ThreadLocal values pertaining to this thread. This map is maintained
* by the ThreadLocal class. */
ThreadLocal.ThreadLocalMap threadLocals = null;

当调用一个 ThreadLocal 的 set(T value) 方法时,先得到当前线程的 ThreadLocalMap 对象,然后将 ThreadLocal->value 键值对插入到该 Map 中。

1
2
3
4
5
6
7
8
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}

get() 方法类似。

1
2
3
4
5
6
7
8
9
10
11
12
13
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}

ThreadLocal 从理论上讲并不是用来解决多线程并发问题的,因为根本不存在多线程竞争。

在一些场景 (尤其是使用线程池) 下,由于 ThreadLocal.ThreadLocalMap 的底层数据结构导致 ThreadLocal 有内存泄漏的情况,应该尽可能在每次使用 ThreadLocal 后手动调用 remove(),以避免出现 ThreadLocal 经典的内存泄漏甚至是造成自身业务混乱的风险。

3. 可重入代码(Reentrant Code)

这种代码也叫做纯代码(Pure Code),可以在代码执行的任何时刻中断它,转而去执行另外一段代码(包括递归调用它本身),而在控制权返回后,原来的程序不会出现任何错误。

可重入代码有一些共同的特征,例如不依赖存储在堆上的数据和公用的系统资源、用到的状态量都由参数中传入、不调用非可重入的方法等。

十二、锁优化

这里的锁优化主要是指 JVM 对 synchronized 的优化。

自旋锁

互斥同步进入阻塞状态的开销都很大,应该尽量避免。在许多应用中,共享数据的锁定状态只会持续很短的一段时间。自旋锁的思想是让一个线程在请求一个共享数据的锁时执行忙循环(自旋)一段时间,如果在这段时间内能获得锁,就可以避免进入阻塞状态。

自旋锁虽然能避免进入阻塞状态从而减少开销,但是它需要进行忙循环操作占用 CPU 时间,它只适用于共享数据的锁定状态很短的场景。

在 JDK 1.6 中引入了自适应的自旋锁。自适应意味着自旋的次数不再固定了,而是由前一次在同一个锁上的自旋次数及锁的拥有者的状态来决定。

锁消除

锁消除是指对于被检测出不可能存在竞争的共享数据的锁进行消除。

锁消除主要是通过逃逸分析来支持,如果堆上的共享数据不可能逃逸出去被其它线程访问到,那么就可以把它们当成私有数据对待,也就可以将它们的锁进行消除。

对于一些看起来没有加锁的代码,其实隐式的加了很多锁。例如下面的字符串拼接代码就隐式加了锁:

1
2
3
public static String concatString(String s1, String s2, String s3) {
return s1 + s2 + s3;
}

String 是一个不可变的类,编译器会对 String 的拼接自动优化。在 JDK 1.5 之前,会转化为 StringBuffer 对象的连续 append() 操作:

1
2
3
4
5
6
7
public static String concatString(String s1, String s2, String s3) {
StringBuffer sb = new StringBuffer();
sb.append(s1);
sb.append(s2);
sb.append(s3);
return sb.toString();
}

每个 append() 方法中都有一个同步块。虚拟机观察变量 sb,很快就会发现它的动态作用域被限制在 concatString() 方法内部。也就是说,sb 的所有引用永远不会逃逸到 concatString() 方法之外,其他线程无法访问到它,因此可以进行消除。

锁粗化

如果一系列的连续操作都对同一个对象反复加锁和解锁,频繁的加锁操作就会导致性能损耗。

上一节的示例代码中连续的 append() 方法就属于这类情况。如果虚拟机探测到由这样的一串零碎的操作都对同一个对象加锁,将会把加锁的范围扩展(粗化)到整个操作序列的外部。对于上一节的示例代码就是扩展到第一个 append() 操作之前直至最后一个 append() 操作之后,这样只需要加锁一次就可以了。

轻量级锁

JDK 1.6 引入了偏向锁和轻量级锁,从而让锁拥有了四个状态:无锁状态(unlocked)、偏向锁状态(biasble)、轻量级锁状态(lightweight locked)和重量级锁状态(inflated)。

以下是 HotSpot 虚拟机对象头的内存布局,这些数据被称为 Mark Word。其中 tag bits 对应了五个状态,这些状态在右侧的 state 表格中给出。除了 marked for gc 状态,其它四个状态已经在前面介绍过了。


下图左侧是一个线程的虚拟机栈,其中有一部分称为 Lock Record 的区域,这是在轻量级锁运行过程创建的,用于存放锁对象的 Mark Word。而右侧就是一个锁对象,包含了 Mark Word 和其它信息。


轻量级锁是相对于传统的重量级锁而言,它使用 CAS 操作来避免重量级锁使用互斥量的开销。对于绝大部分的锁,在整个同步周期内都是不存在竞争的,因此也就不需要都使用互斥量进行同步,可以先采用 CAS 操作进行同步,如果 CAS 失败了再改用互斥量进行同步。

当尝试获取一个锁对象时,如果锁对象标记为 0 01,说明锁对象的锁未锁定(unlocked)状态。此时虚拟机在当前线程的虚拟机栈中创建 Lock Record,然后使用 CAS 操作将对象的 Mark Word 更新为 Lock Record 指针。如果 CAS 操作成功了,那么线程就获取了该对象上的锁,并且对象的 Mark Word 的锁标记变为 00,表示该对象处于轻量级锁状态。


如果 CAS 操作失败了,虚拟机首先会检查对象的 Mark Word 是否指向当前线程的虚拟机栈,如果是的话说明当前线程已经拥有了这个锁对象,那就可以直接进入同步块继续执行,否则说明这个锁对象已经被其他线程线程抢占了。如果有两条以上的线程争用同一个锁,那轻量级锁就不再有效,要膨胀为重量级锁。

偏向锁

偏向锁的思想是偏向于让第一个获取锁对象的线程,这个线程在之后获取该锁就不再需要进行同步操作,甚至连 CAS 操作也不再需要。

当锁对象第一次被线程获得的时候,进入偏向状态,标记为 1 01。同时使用 CAS 操作将线程 ID 记录到 Mark Word 中,如果 CAS 操作成功,这个线程以后每次进入这个锁相关的同步块就不需要再进行任何同步操作。

当有另外一个线程去尝试获取这个锁对象时,偏向状态就宣告结束,此时撤销偏向(Revoke Bias)后恢复到未锁定状态或者轻量级锁状态。


十三、Java 内存模型

Java 内存模型试图屏蔽各种硬件和操作系统的内存访问差异,以实现让 Java 程序在各种平台下都能达到一致的内存访问效果。

主内存与工作内存

处理器上的寄存器的读写的速度比内存快几个数量级,为了解决这种速度矛盾,在它们之间加入了高速缓存。

加入高速缓存带来了一个新的问题:缓存一致性。如果多个缓存共享同一块主内存区域,那么多个缓存的数据可能会不一致,需要一些协议来解决这个问题。


所有的变量都存储在主内存中,每个线程还有自己的工作内存,工作内存存储在高速缓存或者寄存器中,保存了该线程使用的变量的主内存副本拷贝。

线程只能直接操作工作内存中的变量,不同线程之间的变量值传递需要通过主内存来完成。


内存间交互操作

Java 内存模型定义了 8 个操作来完成主内存和工作内存的交互操作。


  • read:把一个变量的值从主内存传输到工作内存中
  • load:在 read 之后执行,把 read 得到的值放入工作内存的变量副本中
  • use:把工作内存中一个变量的值传递给执行引擎
  • assign:把一个从执行引擎接收到的值赋给工作内存的变量
  • store:把工作内存的一个变量的值传送到主内存中
  • write:在 store 之后执行,把 store 得到的值放入主内存的变量中
  • lock:作用于主内存的变量
  • unlock

内存模型三大特性

1. 原子性

Java 内存模型保证了 read、load、use、assign、store、write、lock 和 unlock 操作具有原子性,例如对一个 int 类型的变量执行 assign 赋值操作,这个操作就是原子性的。但是 Java 内存模型允许虚拟机将没有被 volatile 修饰的 64 位数据(long,double)的读写操作划分为两次 32 位的操作来进行,即 load、store、read 和 write 操作可以不具备原子性。

有一个错误认识就是,int 等原子性的类型在多线程环境中不会出现线程安全问题。前面的线程不安全示例代码中,cnt 属于 int 类型变量,1000 个线程对它进行自增操作之后,得到的值为 997 而不是 1000。

为了方便讨论,将内存间的交互操作简化为 3 个:load、assign、store。

下图演示了两个线程同时对 cnt 进行操作,load、assign、store 这一系列操作整体上看不具备原子性,那么在 T1 修改 cnt 并且还没有将修改后的值写入主内存,T2 依然可以读入旧值。可以看出,这两个线程虽然执行了两次自增运算,但是主内存中 cnt 的值最后为 1 而不是 2。因此对 int 类型读写操作满足原子性只是说明 load、assign、store 这些单个操作具备原子性。


AtomicInteger 能保证多个线程修改的原子性。


使用 AtomicInteger 重写之前线程不安全的代码之后得到以下线程安全实现:

1
2
3
4
5
6
7
8
9
10
11
public class AtomicExample {
private AtomicInteger cnt = new AtomicInteger();

public void add() {
cnt.incrementAndGet();
}

public int get() {
return cnt.get();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void main(String[] args) throws InterruptedException {
final int threadSize = 1000;
AtomicExample example = new AtomicExample(); // 只修改这条语句
final CountDownLatch countDownLatch = new CountDownLatch(threadSize);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < threadSize; i++) {
executorService.execute(() -> {
example.add();
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
System.out.println(example.get());
}
1
1000

除了使用原子类之外,也可以使用 synchronized 互斥锁来保证操作的原子性。它对应的内存间交互操作为:lock 和 unlock,在虚拟机实现上对应的字节码指令为 monitorenter 和 monitorexit。

1
2
3
4
5
6
7
8
9
10
11
public class AtomicSynchronizedExample {
private int cnt = 0;

public synchronized void add() {
cnt++;
}

public synchronized int get() {
return cnt;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void main(String[] args) throws InterruptedException {
final int threadSize = 1000;
AtomicSynchronizedExample example = new AtomicSynchronizedExample();
final CountDownLatch countDownLatch = new CountDownLatch(threadSize);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < threadSize; i++) {
executorService.execute(() -> {
example.add();
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
System.out.println(example.get());
}
1
1000

2. 可见性

可见性指当一个线程修改了共享变量的值,其它线程能够立即得知这个修改。Java 内存模型是通过在变量修改后将新值同步回主内存,在变量读取前从主内存刷新变量值来实现可见性的。

主要有三种实现可见性的方式:

  • volatile
  • synchronized,对一个变量执行 unlock 操作之前,必须把变量值同步回主内存。
  • final,被 final 关键字修饰的字段在构造器中一旦初始化完成,并且没有发生 this 逃逸(其它线程通过 this 引用访问到初始化了一半的对象),那么其它线程就能看见 final 字段的值。

对前面的线程不安全示例中的 cnt 变量使用 volatile 修饰,不能解决线程不安全问题,因为 volatile 并不能保证操作的原子性。

3. 有序性

有序性是指:在本线程内观察,所有操作都是有序的。在一个线程观察另一个线程,所有操作都是无序的,无序是因为发生了指令重排序。在 Java 内存模型中,允许编译器和处理器对指令进行重排序,重排序过程不会影响到单线程程序的执行,却会影响到多线程并发执行的正确性。

volatile 关键字通过添加内存屏障的方式来禁止指令重排,即重排序时不能把后面的指令放到内存屏障之前。

也可以通过 synchronized 来保证有序性,它保证每个时刻只有一个线程执行同步代码,相当于是让线程顺序执行同步代码。

先行发生原则

上面提到了可以用 volatile 和 synchronized 来保证有序性。除此之外,JVM 还规定了先行发生原则,让一个操作无需控制就能先于另一个操作完成。

1. 单一线程原则

Single Thread rule

在一个线程内,在程序前面的操作先行发生于后面的操作。


2. 管程锁定规则

Monitor Lock Rule

一个 unlock 操作先行发生于后面对同一个锁的 lock 操作。


3. volatile 变量规则

Volatile Variable Rule

对一个 volatile 变量的写操作先行发生于后面对这个变量的读操作。


4. 线程启动规则

Thread Start Rule

Thread 对象的 start() 方法调用先行发生于此线程的每一个动作。


5. 线程加入规则

Thread Join Rule

Thread 对象的结束先行发生于 join() 方法返回。


6. 线程中断规则

Thread Interruption Rule

对线程 interrupt() 方法的调用先行发生于被中断线程的代码检测到中断事件的发生,可以通过 interrupted() 方法检测到是否有中断发生。

7. 对象终结规则

Finalizer Rule

一个对象的初始化完成(构造函数执行结束)先行发生于它的 finalize() 方法的开始。

8. 传递性

Transitivity

如果操作 A 先行发生于操作 B,操作 B 先行发生于操作 C,那么操作 A 先行发生于操作 C。

十四、多线程开发良好的实践

  • 给线程起个有意义的名字,这样可以方便找 Bug。

  • 缩小同步范围,从而减少锁争用。例如对于 synchronized,应该尽量使用同步块而不是同步方法。

  • 多用同步工具少用 wait() 和 notify()。首先,CountDownLatch, CyclicBarrier, Semaphore 和 Exchanger 这些同步类简化了编码操作,而用 wait() 和 notify() 很难实现复杂控制流;其次,这些同步类是由最好的企业编写和维护,在后续的 JDK 中还会不断优化和完善。

  • 使用 BlockingQueue 实现生产者消费者问题。

  • 多用并发集合少用同步集合,例如应该使用 ConcurrentHashMap 而不是 Hashtable。

  • 使用本地变量和不可变类来保证线程安全。

  • 使用线程池而不是直接创建线程,这是因为创建线程代价很高,线程池可以有效地利用有限的线程来启动任务。

参考资料