0%

java-并发(三)

八、J.U.C-其他组件

fork-join框架

​ Fork/Join框架是Java 7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。Fork/Join框架要完成两件事情:

  1.任务分割:首先Fork/Join框架需要把大的任务分割成足够小的子任务,如果子任务比较大的话还要对子任务进行继续分割

  2.执行任务并合并结果:分割的子任务分别放到双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都放在另外一个队列里,启动一个线程从队列里取数据,然后合并这些数据。

Fork/Join框架简化了并行程序的原因有:

  • 它简化了线程的创建,在框架中线程是自动被创建和管理。
  • 它自动使用多个处理器,因此程序可以扩展到使用可用处理器。

由于支持真正的并行执行,Fork/Join框架可以显著减少计算时间,并提高解决图像处理、视频处理、大数据处理等非常大问题的性能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class ForkJoinExample extends RecursiveTask<Integer> {

private final int threshold = 5;
private int first;
private int last;

public ForkJoinExample(int first, int last) {
this.first = first;
this.last = last;
}

@Override
protected Integer compute() {
int result = 0;
if (last - first <= threshold) {
// 任务足够小则直接计算
for (int i = first; i <= last; i++) {
result += i;
}
} else {
// 拆分成小任务
int middle = first + (last - first) / 2;
ForkJoinExample leftTask = new ForkJoinExample(first, middle);
ForkJoinExample rightTask = new ForkJoinExample(middle + 1, last);
leftTask.fork();
rightTask.fork();
result = leftTask.join() + rightTask.join();
}
return result;
}
}
1
2
3
4
5
6
public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkJoinExample example = new ForkJoinExample(1, 10000);
ForkJoinPool forkJoinPool = new ForkJoinPool();
Future result = forkJoinPool.submit(example);
System.out.println(result.get());
}

关于Fork/Join框架的一个有趣的地方是:它使用工作窃取算法来平衡线程之间的负载:如果一个工作线程没有事情要做,它可以从其他仍然忙碌的线程窃取任务。

Fork/Join框架在java.util.concurrent包下被实现。它的核心有4个类:

  • ForkJoinTask: 这是一个抽象任务类,并且运行在ForkJoinPool中。
  • ForkJoinPool:这是一个线程池管理并运行众多ForkJoinTask任务。
  • RecursiveAction: ForkJoinTask的子类,这个类没有返回值。
  • RecursiveTask: ForkJoinTask的子类,有返回值。

基本上,我们解决问题的代码是在RecursiveAction或者RecursiveTask中进行的,然后将任务提交由ForkJoinPool执行,ForkJoinPool处理从线程管理到多核处理器的利用等各种事务。

我们先来理解一下这些类中的关键方法。

ForkJoinTask

这是一个运行在ForkJoinPool中的抽象的任务类。类型V指定了任务的返回结果。ForkJoinTask是一个类似线程的实体,它表示任务的轻量级抽象,而不是实际的执行线程。该机制允许由ForkJoinPool中的少量实际线程管理大量任务。其关键方法是:

  • final ForkJoinTask fork()
  • final V join()
  • final V invoke()

fork()方法提交并执行异步任务,该方法返回ForkJoinTask并且调用线程继续运行。

join()方法等待任务直到返回结果。

invoke()方法是组合了fork()join(),它开始一个任务并等待结束返回结果。

此外,ForkJoinTask中还提供了用于一次调用多个任务的两个静态方法

  • static void invokeAll(ForkJoinTask task1, ForkJoinTask task2) :执行两个任务
  • static void invokeAll(ForkJoinTask<?>… taskList):执行任务集合

RecursiveAction

这是一个递归的ForkJoinTask子类,不返回结果。Recursive意思是任务可以通过分治策略分成自己的子任务。

我们必须重写compute()方法,并将计算代码写在其中:

1
protected abstract void compute();

RecursiveTask

RecursiveAction一样,但是RecursiveTask有返回结果,结果类型由V指定。我们仍然需要重写compute()方法:

1
protected abstract V compute();

ForkJoinPool

任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务(工作窃取算法)。

这是Fork/Join框架的核心类。它负责线程的管理和ForkJoinTask的执行,为了执行ForkJoinTask,首先需要获取到ForkJoinPool的实例。

有两种构造器方式可以获取ForkJoinPool的实例,第一种使用构造器创建:

  • ForkJoinPool(): 使用默认的构造器创建实例,该构造器创建出的池与系统中可用的处理器数量相等
  • ForkJoinPool(int parallelism):该构造器指定处理器数量,创建具有自定义并行度级别的池,该级别的并行度必须大于0,且不超过可用处理器的实际数量。

获取ForkJoinPool实例的第二种方法是使用以下ForkJoinPool的静态方法获取公共池实例:

1
public static ForkJoinPool commonPool();

这种方式创建的池不受shutdown()或者shutdownNow()方法的影响,但是他会在System.exit()时会自动中止。任何依赖异步任务处理的程序在主体程序中止前都应该调用awaitQuiescence()方法。该方式是静态的,可以自动被使用。

e.g.

在创建好ForkJoinPool实例之后,可以使用下面的方法执行任务:

  • T invoke(ForkJoinTask task):执行指定任务并返回结果,该方法是异步的,调用的线程会一直等待直到该方法返回结果,对于RecursiveAction任务来说,参数类型是Void.
  • void execute(ForkJoinTask<?> task):异步执行指定的任务,调用的线程一直等待知道任务完成才会继续执行。

另外,也可以通过ForkJoinTask自己拥有的方法fork()invoke()执行任务。在这种情况下,如果任务还没在ForkJoinPool中运行,那么commonPool()将会自动被使用。

值得注意的一点是:ForkJoinPool使用的是守护线程,当所有的用户线程被终止是它也会被终止,这意味着可以不必显示的关闭ForkPoolJoin(虽然这样也可以)。如果是common pool的情况下,调用shutdown没有任何效果,应为这个池总是可用的。

使用RecursiveAction

假设要对一个很大的数字数组进行变换,为了简单简单起见,转换只需要将数组中的每个元素乘以指定的数字。下面的代码用于转换任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import java.util.concurrent.*;

public class ArrayTransform extends RecursiveAction {
int[] array;
int number;
int threshold = 100_000;
int start;
int end;

public ArrayTransform(int[] array, int number, int start, int end) {
this.array = array;
this.number = number;
this.start = start;
this.end = end;
}

@Override
protected void compute() {
if (end - start < threshold) {
computeDirectly();
} else {
int middle = (end + start) / 2;

ArrayTransform subTask1 = new ArrayTransform(array, number, start, middle);
ArrayTransform subTask2 = new ArrayTransform(array, number, middle, end);

invokeAll(subTask1, subTask2);
}
}

protected void computeDirectly() {
for (int i = start; i < end; i++) {
array[i] = array[i] * number;
}
}
}

可以看到,这是一个RecursiveAction的子类,我们重写了compute()方法。

数组和数字从它的构造函数传递。参数start和end指定要处理的数组中的元素的范围。如果数组的大小大于阈值,这有助于将数组拆分为子数组,否则直接对整个数组执行计算。

观察else中的代码片段:

1
2
3
4
5
6
7
8
9
10
11
12
protected void compute() {
if (end - start < threshold) {
computeDirectly();
} else {
int middle = (end + start) / 2;

ArrayTransform subTask1 = new ArrayTransform(array, number, start, middle);
ArrayTransform subTask2 = new ArrayTransform(array, number, middle, end);

invokeAll(subTask1, subTask2);
}
}

这里,将数组分成两个部分,并分别创建他们的子任务,反过来,子任务也可以递归的进一步划分为更小的子任务,直到其大小小于直接调用computeDirectly();方法的的阈值。

然后,在main函数中创建ForkJoinPool执行任务:

1
2
3
ArrayTransform mainTask = new ArrayTransform(array, number, 0, SIZE);
ForkJoinPool pool = new ForkJoinPool();
pool.invoke(mainTask);

或者使用common pool执行任务:

1
2
ArrayTransform mainTask = new ArrayTransform(array, number, 0, SIZE);
mainTask.invoke();

这里是全部的测试程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import java.util.*;
import java.util.concurrent.*;

public class ForkJoinRecursiveActionTest {
static final int SIZE = 10_000_000;
static int[] array = randomArray();

public static void main(String[] args) {

int number = 9;

System.out.println("数组中的初始元素: ");
print();
//ArrayTransform 是一个继承RecursiveAction的类,在覆写compute方法时改写成递归方法
ArrayTransform mainTask = new ArrayTransform(array, number, 0, SIZE);
//ForkJoinPool是一个线程池管理并运行众多ForkJoinTask任务。
ForkJoinPool pool = new ForkJoinPool();
//invoke()方法是组合了fork()和join(),它开始一个任务并等待结束返回结果。
pool.invoke(mainTask);
System.out.println("并行计算之后的元素:");
print();
}

static int[] randomArray() {
int[] array = new int[SIZE];
Random random = new Random();

for (int i = 0; i < SIZE; i++) {
array[i] = random.nextInt(100);
}

return array;
}

static void print() {
for (int i = 0; i < 10; i++) {
System.out.print(array[i] + ", ");
}
System.out.println();
}
}

如您所见,使用随机生成的1,000万个元素数组进行测试。由于数组太大,我们在计算前后只打印前10个元素,看效果如何:

1
2
3
4
数组中的初始元素:
42, 98, 43, 14, 9, 92, 33, 18, 18, 76,
并行计算之后的元素:
378, 882, 387, 126, 81, 828, 297, 162, 162, 684,

使用RecursiveTask

这个例子中,展示了如何使用带有返回值的任务,下面的任务计算在一个大数组中出现偶数的次数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import java.util.concurrent.*;

public class ArrayCounter extends RecursiveTask<Integer> {
int[] array;
int threshold = 100_000;
int start;
int end;

public ArrayCounter(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}

protected Integer compute() {
if (end - start < threshold) {
return computeDirectly();
} else {
int middle = (end + start) / 2;

ArrayCounter subTask1 = new ArrayCounter(array, start, middle);
ArrayCounter subTask2 = new ArrayCounter(array, middle, end);

invokeAll(subTask1, subTask2);


return subTask1.join() + subTask2.join();
}
}

protected Integer computeDirectly() {
Integer count = 0;

for (int i = start; i < end; i++) {
if (array[i] % 2 == 0) {
count++;
}
}

return count;
}
}

如你所见,这个类是RecursiveTask的子类并且重写了compute()方法,并且返回了一个整型的结果。

这里还使用了join()方法去合并子任务的结果:

1
return subTask1.join() + subTask2.join();

测试程序就和RecursiveAction的一样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import java.util.*;
import java.util.concurrent.*;

public class ForkJoinRecursiveTaskTest {
static final int SIZE = 10_000_000;
static int[] array = randomArray();

public static void main(String[] args) {

ArrayCounter mainTask = new ArrayCounter(array, 0, SIZE);
ForkJoinPool pool = new ForkJoinPool();
Integer evenNumberCount = pool.invoke(mainTask);

System.out.println("偶数的个数: " + evenNumberCount);
}

static int[] randomArray() {
int[] array = new int[SIZE];
Random random = new Random();

for (int i = 0; i < SIZE; i++) {
array[i] = random.nextInt(100);
}

return array;
}

}

运行程序就会看到如下的结果:

1
偶数的个数: 5000045

并行性试验

这个例子展示并行性的级别如何影响计算时间:

ArrayCounter类让阈值可以通过构造器传入:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import java.util.concurrent.*;

public class ArrayCounter extends RecursiveTask<Integer> {
int[] array;
int threshold;
int start;
int end;

public ArrayCounter(int[] array, int start, int end, int threshold) {
this.array = array;
this.start = start;
this.end = end;
this.threshold = threshold;
}

protected Integer compute() {
if (end - start < threshold) {
return computeDirectly();
} else {
int middle = (end + start) / 2;

ArrayCounter subTask1 = new ArrayCounter(array, start, middle, threshold);
ArrayCounter subTask2 = new ArrayCounter(array, middle, end, threshold);

invokeAll(subTask1, subTask2);


return subTask1.join() + subTask2.join();
}
}

protected Integer computeDirectly() {
Integer count = 0;

for (int i = start; i < end; i++) {
if (array[i] % 2 == 0) {
count++;
}
}

return count;
}
}

测试程序将并行度级别和阈值作为参数传递:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import java.util.*;
import java.util.concurrent.*;

public class ParallelismTest {
static final int SIZE = 10_000_000;

static int[] array = randomArray();

public static void main(String[] args) {
int threshold = Integer.parseInt(args[0]);
int parallelism = Integer.parseInt(args[1]);

long startTime = System.currentTimeMillis();

ArrayCounter mainTask = new ArrayCounter(array, 0, SIZE, threshold);
ForkJoinPool pool = new ForkJoinPool(parallelism);
Integer evenNumberCount = pool.invoke(mainTask);

long endTime = System.currentTimeMillis();

System.out.println("偶数的个数: " + evenNumberCount);

long time = (endTime - startTime);
System.out.println("执行时间: " + time + " ms");
}

static int[] randomArray() {
int[] array = new int[SIZE];
Random random = new Random();

for (int i = 0; i < SIZE; i++) {
array[i] = random.nextInt(100);
}

return array;
}

}

该程序允许您使用不同的并行度和阈值轻松测试性能。注意,它在最后打印执行时间。尝试用不同的参数多次运行这个程序,并观察执行时间。

结论

  • Fork/Join框架的设计简化了java语言的并行程序
  • ForkJoinPoolFork/Join框架的核心,它允许多个ForkJoinTask请求由少量实际线程执行,每个线程运行在单独的处理核心上
  • 既可以通过构造器也可以通过静态方法common pool去获取ForkJoinPool的实例
  • ForkJoinTask是一个抽象类,它表示的任务比普通线程更轻。通过覆盖其compute()方法实现计算逻辑
  • RecursiveAction是一个没有返回值的ForkJoinTask
  • RecursiveTask是一个有返回值的ForkJoinTask
  • ForkJoinPool与其它池的不同之处在于,它使用了工作窃取算法,该算法允许一个线程完成了可以做的事情,从仍然繁忙的其他线程窃取任务
  • ForkJoinPool中的线程是守护线程,不必显式地关闭池
  • 执行一个ForkJoinTask既可以通过调用它自己的invoke()fork()方法,也可以提交任务给ForkJoinPool并调用它的invoke()或者execute()方法
  • 直接使用ForkJoinTask自身的方法执行任务,如果它还没运行在ForkJoinPool中那么将运行在common pool
  • ForkJoinTask中使用join()方法,可以合并子任务的结果
  • invoke()方法会等待子任务完成,但是execute()方法不会

阻塞队列

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

当试图向队列添加元素而队列已满,或是想从队列移出元素而队列为空的时候,阻塞队列(blockingqueue)导致线程阻塞。

阻塞队列提供了四种处理方法:

方法\处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
检查方法 element() peek() 不可用 不可用

异常:是指当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException(“Queue full”)异常。当队列为空时,从队列里获取元素时会抛出NoSuchElementException异常 。

  • 返回特殊值:插入方法会返回是否成功,成功则返回true。移除方法,则是从队列里拿出一个元素,如果没有则返回null
  • 一直阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到拿到数据,或者响应中断退出。当队列空时,消费者线程试图从队列里take元素,队列也会阻塞消费者线程,直到队列可用。
  • 超时退出:当阻塞队列满时,队列会阻塞生产者线程一段时间,如果超过一定的时间,生产者线程就会退出。

BlockingQueue的核心方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public interface BlockingQueue<E> extends Queue<E> {

//将给定元素设置到队列中,如果设置成功返回true, 否则抛出异常。如果是往限定了长度的队列中设置值,推荐使用offer()方法。
boolean add(E e);

//将给定的元素设置到队列中,如果设置成功返回true, 否则返回false. e的值不能为空,否则抛出空指针异常。
boolean offer(E e);

//将元素设置到队列中,如果队列中没有多余的空间,该方法会一直阻塞,直到队列中有多余的空间。
void put(E e) throws InterruptedException;

//将给定元素在给定的时间内设置到队列中,如果设置成功返回true, 否则返回false.
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;

//从队列中获取值,如果队列中没有值,线程会一直阻塞,直到队列中有值,并且该方法取得了该值。
E take() throws InterruptedException;

//在给定的时间里,从队列中获取值,如果没有取到会抛出异常。
E poll(long timeout, TimeUnit unit)
throws InterruptedException;

//获取队列中剩余的空间。
int remainingCapacity();

//从队列中移除指定的值。
boolean remove(Object o);

//判断队列中是否拥有该值。
public boolean contains(Object o);

//将队列中值,全部移除,并发设置到给定的集合中。
int drainTo(Collection<? super E> c);

//指定最多数量限制将队列中值,全部移除,并发设置到给定的集合中。
int drainTo(Collection<? super E> c, int maxElements);
}

在深入之前先了解下下ReentrantLock 和 Condition:
重入锁ReentrantLock:
ReentrantLock锁在同一个时间点只能被一个线程锁持有;而可重入的意思是,ReentrantLock锁,可以被单个线程多次获取。
ReentrantLock分为“公平锁”和“非公平锁”。它们的区别体现在获取锁的机制上是否公平。“锁”是为了保护竞争资源,防止多个线程同时操作线程而出错,ReentrantLock在同一个时间点只能被一个线程获取(当某线程获取到“锁”时,其它线程就必须等待);ReentraantLock是通过一个FIFO的等待队列来管理获取该锁所有线程的。在“公平锁”的机制下,线程依次排队获取锁;而“非公平锁”在锁是可获取状态时,不管自己是不是在队列的开头都会获取锁。
主要方法:

  • lock()获得锁
  • lockInterruptibly()获得锁,但优先响应中断
  • tryLock()尝试获得锁,成功返回true,否则false,该方法不等待,立即返回
  • tryLock(long time,TimeUnit unit)在给定时间内尝试获得锁
  • unlock()释放锁

Condition:await()、signal()方法分别对应之前的Object的wait()和notify()

  • 和重入锁一起使用
  • await()是当前线程等待同时释放锁
  • awaitUninterruptibly()不会在等待过程中响应中断
  • signal()用于唤醒一个在等待的线程,还有对应的singalAll()方法

阻塞队列的成员

队列 有界性 数据结构
ArrayBlockingQueue bounded(有界) 加锁 arrayList
LinkedBlockingQueue optionally-bounded 加锁 linkedList
PriorityBlockingQueue unbounded 加锁 heap
DelayQueue unbounded 加锁 heap
SynchronousQueue bounded 加锁
LinkedTransferQueue unbounded 加锁 heap
LinkedBlockingDeque unbounded 无锁 heap

下面分别简单介绍一下:

  • ArrayBlockingQueue:是一个用数组实现的有界阻塞队列,此队列按照先进先出(FIFO)的原则对元素进行排序。支持公平锁和非公平锁。【注:每一个线程在获取锁的时候可能都会排队等待,如果在等待时间上,先获取锁的线程的请求一定先被满足,那么这个锁就是公平的。反之,这个锁就是不公平的。公平的获取锁,也就是当前等待时间最长的线程先获取锁】
  • LinkedBlockingQueue:一个由链表结构组成的有界队列,此队列的长度为Integer.MAX_VALUE。此队列按照先进先出的顺序进行排序。
  • PriorityBlockingQueue: 一个支持线程优先级排序的无界队列,默认自然序进行排序,也可以自定义实现compareTo()方法来指定元素排序规则,不能保证同优先级元素的顺序。
  • DelayQueue: 一个实现PriorityBlockingQueue实现延迟获取的无界队列,在创建元素时,可以指定多久才能从队列中获取当前元素。只有延时期满后才能从队列中获取元素。(DelayQueue可以运用在以下应用场景:1.缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。2.定时任务调度。使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,从比如TimerQueue就是使用DelayQueue实现的。)
  • SynchronousQueue: 一个不存储元素的阻塞队列,每一个put操作必须等待take操作,否则不能添加元素。支持公平锁和非公平锁。SynchronousQueue的一个使用场景是在线程池里。Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。
  • LinkedTransferQueue: 一个由链表结构组成的无界阻塞队列,相当于其它队列,LinkedTransferQueue队列多了transfer和tryTransfer方法。
  • LinkedBlockingDeque: 一个由链表结构组成的双向阻塞队列。队列头部和尾部都可以添加和移除元素,多线程并发时,可以将锁的竞争最多降到一半。

ArrayBlockingQueue、LinkedBlockingQueue以及DelayQueue介绍

https://www.cnblogs.com/bjxq-cs88/p/9759571.html

九、线程安全的集合

首先要明白线程的工作原理,jvm有一个main memory,而每个线程有自己的working memory,一个线程对一个variable进行操作时,都要在自己的working memory里面建立一个copy,操作完之后再写入main memory。多个线程同时操作同一个variable,就可能会出现不可预知的结果。根据上面的解释,很容易想出相应的scenario。

而用synchronized的关键是建立一个monitor,这个monitor可以是要修改的variable也可以其他你认为合适的object比如method,然后通过给这个monitor加锁来实现线程安全,每个线程在获得这个锁之后,要执行完load到workingmemory -> use&assign -> store到mainmemory 的过程,才会释放它得到的锁。这样就实现了所谓的线程安全。

什么是线程安全?线程安全是怎么完成的(原理)? 线程安全就是说多线程访问同一代码,不会产生不确定的结果。编写线程安全的代码是低依靠线程同步。

1、早期线程安全的集合

1、Vector、ArrayList、LinkedList

Vector和ArrayList在使用上非常相似,都可用来表示一组数量可变的对象应用的集合,并且可以随机地访问其中的元素。

   Vector的方法都是同步的(Synchronized),是线程安全的(thread-safe),而ArrayList的方法不是,由于线程的同步必然要影响性能,因此,ArrayList的性能比Vector好。

ArrayList和LinkedList区别

   对于处理一列数据项,Java提供了两个类ArrayList和LinkedList,ArrayList的内部实现是基于内部数组Object[],所以从概念上讲,它更象数组,但LinkedList的内部实现是基于一组连接的记录,所以,它更像一个链表结构,所以,它们在性能上有很大的差别。

   从上面的分析可知,在ArrayList的前面或中间插入数据时,你必须将其后的所有数据相应的后移,这样必然要花费较多时间,所以,当你的操作是在一列数据的后面添加数据而不是在前面或中间,并且需要随机地访问其中的元素时,使用ArrayList会提供比较好的性能

   而访问链表中的某个元素时,就必须从链表的一端开始沿着连接方向一个一个元素地去查找,直到找到所需的元素为止,所以,当你的操作是在一列数据的前面或中间添加或删除数据,并且按照顺序访问其中的元素时,就应该使用LinkedList了。

   如果在编程中,1,2两种情形交替出现,这时,你可以考虑使用List这样的通用接口,而不用关心具体的实现,在具体的情形下,它的性能由具体的实现来保证。

2、HashTable,HashMap,HashSet

HashTable和HashMap采用相同的存储机制,二者的实现基本一致,不同的是:

1)、HashMap是非线程安全的,HashTable是线程安全的,内部的方法基本都是synchronized。

2)、HashTable不允许有null值的存在。

在HashTable中调用put方法时,如果key为null,直接抛出NullPointerException。其它细微的差别还有,比如初始化Entry数组的大小等等,但基本思想和HashMap一样。

HashSet:

1、HashSet基于HashMap实现,无容量限制。

2、HashSet是非线程安全的。

3、HashSet不保证有序。

HashMap:

1、HashMap采用数组方式存储key,value构成的Entry对象,无容量限制。

2、HashMap基于Key hash查找Entry对象存放到数组的位置,对于hash冲突采用链表的方式来解决。

3、HashMap在插入元素时可能会要扩大数组的容量,在扩大容量时须要重新计算hash,并复制对象到新的数组中。

4、HashMap是非线程安全的。

5、HashMap遍历使用的是Iterator

HashTable

1、HashTable是线程安全的。

2、HashTable中无论是Key,还是Value都不允许为null。

3、HashTable遍历使用的是Enumeration。

TreeSet,TreeMap

TreeSet:

1、TreeSet基于TreeMap实现,支持排序。

2、TreeSet是非线程安全的。

从对HashSet和TreeSet的描述来看,TreeSet和HashSet一样,也是完全基于Map来实现的,并且都不支持get(int)来获取指定位置的元素(需要遍历获取),另外TreeSet还提供了一些排序方面的支持。例如传入Comparator实现、descendingSet以及descendingIterator等。

TreeMap:

1、TreeMap是一个典型的基于红黑树的Map实现,因此它要求一定要有Key比较的方法,要么传入Comparator实现,要么key对象实现Comparable接口。

2、TreeMap是非线程安全的。

2、Collections包装方法

Vector和HashTable被弃用后,它们被ArrayList和HashMap代替,但它们不是线程安全的,所以Collections工具类中提供了相应的包装方法把它们包装成线程安全的集合

1
2
3
4
5
List<E> synArrayList = Collections.synchronizedList(new ArrayList<E>());

Set<E> synHashSet = Collections.synchronizedSet(new HashSet<E>());

Map<K,V> synHashMap = Collections.synchronizedMap(new HashMap<K,V>());

Collections针对每种集合都声明了一个线程安全的包装类,在原集合的基础上添加了锁对象,集合中的每个方法都通过这个锁对象实现同步

3、java.util.concurrent包中的集合

在java.util.concurrent包中,不但包含了我们本篇要说的线程安全的集合,还涉及到了多线程、CAS、线程锁等相关内容,可以说是完整覆盖了Java并发的知识栈。

对于Java开发人员来说,学好java.util.concurrent包下的内容,是一个必备的功课,也是逐渐提升自己的一个重要阶段。

1.ConcurrentHashMap

ConcurrentHashMap和HashTable都是线程安全的集合,它们的不同主要是加锁粒度上的不同。HashTable的加锁方法是给每个方法加上synchronized关键字,这样锁住的是整个Table对象。而ConcurrentHashMap是更细粒度的加锁
在JDK1.8之前,ConcurrentHashMap加的是分段锁,也就是Segment锁,每个Segment含有整个table的一部分,这样不同分段之间的并发操作就互不影响
JDK1.8对此做了进一步的改进,它取消了Segment字段,直接在table元素上加锁,实现对每一行进行加锁,进一步减小了并发冲突的概率

2.CopyOnWriteArrayList和CopyOnWriteArraySet

它们是加了写锁的ArrayList和ArraySet,锁住的是整个对象,但读操作可以并发执行

3.其他

除此之外还有ConcurrentSkipListMap、ConcurrentSkipListSet、ConcurrentLinkedQueue、ConcurrentLinkedDeque等,至于为什么没有ConcurrentArrayList,原因是无法设计一个通用的而且可以规避ArrayList的并发瓶颈的线程安全的集合类,只能锁住整个list,这用Collections里的包装类就能办到

Collection集合:

List:

1
CopyOnWriteArrayList

Set:

1
2
CopyOnWriteArraySet
ConcurrentSkipListSet

Queue:

1
2
3
4
5
6
7
8
9
10
BlockingQueue:
LinkedBlockingQueue
DelayQueue
PriorityBlockingQueue
ConcurrentLinkedQueue
TransferQueue:
LinkedTransferQueue
BlockingDeque:
LinkedBlockingDeque
ConcurrentLinkedDeque

Map集合:

Map:

1
2
3
4
ConcurrentMap:
ConcurrentHashMap
ConcurrentSkipListMap
ConcurrentNavigableMap

通过以上可以看出,java.util.concurrent包为每一类集合都提供了线程安全的实现。

线程安全有以下几种实现方式:

不可变

不可变(Immutable)的对象一定是线程安全的,不需要再采取任何的线程安全保障措施。只要一个不可变的对象被正确地构建出来,永远也不会看到它在多个线程之中处于不一致的状态。多线程环境下,应当尽量使对象成为不可变,来满足线程安全。

不可变的类型:

  • final 关键字修饰的基本数据类型
  • String
  • 枚举类型
  • Number 部分子类,如 Long 和 Double 等数值包装类型,BigInteger 和 BigDecimal 等大数据类型。但同为 Number 的原子类 AtomicInteger 和 AtomicLong 则是可变的。

对于集合类型,可以使用 Collections.unmodifiableXXX() 方法来获取一个不可变的集合。

1
2
3
4
5
6
7
public class ImmutableExample {
public static void main(String[] args) {
Map<String, Integer> map = new HashMap<>();
Map<String, Integer> unmodifiableMap = Collections.unmodifiableMap(map);
unmodifiableMap.put("a", 1);
}
}
1
2
3
Exception in thread "main" java.lang.UnsupportedOperationException
at java.util.Collections$UnmodifiableMap.put(Collections.java:1457)
at ImmutableExample.main(ImmutableExample.java:9)

Collections.unmodifiableXXX() 先对原始的集合进行拷贝,需要对集合进行修改的方法都直接抛出异常。

1
2
3
public V put(K key, V value) {
throw new UnsupportedOperationException();
}

互斥同步

synchronized 和 ReentrantLock。

非阻塞同步

互斥同步最主要的问题就是线程阻塞和唤醒所带来的性能问题,因此这种同步也称为阻塞同步。

互斥同步属于一种悲观的并发策略,总是认为只要不去做正确的同步措施,那就肯定会出现问题。无论共享数据是否真的会出现竞争,它都要进行加锁(这里讨论的是概念模型,实际上虚拟机会优化掉很大一部分不必要的加锁)、用户态核心态转换、维护锁计数器和检查是否有被阻塞的线程需要唤醒等操作。

随着硬件指令集的发展,我们可以使用基于冲突检测的乐观并发策略:先进行操作,如果没有其它线程争用共享数据,那操作就成功了,否则采取补偿措施(不断地重试,直到成功为止)。这种乐观的并发策略的许多实现都不需要将线程阻塞,因此这种同步操作称为非阻塞同步。

1. CAS

乐观锁需要操作和冲突检测这两个步骤具备原子性,这里就不能再使用互斥同步来保证了,只能靠硬件来完成。硬件支持的原子性操作最典型的是:比较并交换(Compare-and-Swap,CAS)。CAS 指令需要有 3 个操作数,分别是内存地址 V、旧的预期值 A 和新值 B。当执行操作时,只有当 V 的值等于 A,才将 V 的值更新为 B。

2. AtomicInteger

J.U.C 包里面的整数原子类 AtomicInteger 的方法调用了 Unsafe 类的 CAS 操作。

以下代码使用了 AtomicInteger 执行了自增的操作。

1
2
3
4
5
private AtomicInteger cnt = new AtomicInteger();

public void add() {
cnt.incrementAndGet();
}

以下代码是 incrementAndGet() 的源码,它调用了 Unsafe 的 getAndAddInt() 。

1
2
3
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}

以下代码是 getAndAddInt() 源码,var1 指示对象内存地址,var2 指示该字段相对对象内存地址的偏移,var4 指示操作需要加的数值,这里为 1。通过 getIntVolatile(var1, var2) 得到旧的预期值,通过调用 compareAndSwapInt() 来进行 CAS 比较,如果该字段内存地址中的值等于 var5,那么就更新内存地址为 var1+var2 的变量为 var5+var4。

可以看到 getAndAddInt() 在一个循环中进行,发生冲突的做法是不断的进行重试。

1
2
3
4
5
6
7
8
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

return var5;
}

3. ABA

如果一个变量初次读取的时候是 A 值,它的值被改成了 B,后来又被改回为 A,那 CAS 操作就会误认为它从来没有被改变过。

J.U.C 包提供了一个带有标记的原子引用类 AtomicStampedReference 来解决这个问题,它可以通过控制变量值的版本来保证 CAS 的正确性。大部分情况下 ABA 问题不会影响程序并发的正确性,如果需要解决 ABA 问题,改用传统的互斥同步可能会比原子类更高效。

无同步方案

要保证线程安全,并不是一定就要进行同步。如果一个方法本来就不涉及共享数据,那它自然就无须任何同步措施去保证正确性。

1. 栈封闭

多个线程访问同一个方法的局部变量时,不会出现线程安全问题,因为局部变量存储在虚拟机栈中,属于线程私有的。

1
2
3
4
5
6
7
8
9
public class StackClosedExample {
public void add100() {
int cnt = 0;
for (int i = 0; i < 100; i++) {
cnt++;
}
System.out.println(cnt);
}
}
1
2
3
4
5
6
7
public static void main(String[] args) {
StackClosedExample example = new StackClosedExample();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> example.add100());
executorService.execute(() -> example.add100());
executorService.shutdown();
}
1
2
100
100

2. 线程本地存储(Thread Local Storage)

如果一段代码中所需要的数据必须与其他代码共享,那就看看这些共享数据的代码是否能保证在同一个线程中执行。如果能保证,我们就可以把共享数据的可见范围限制在同一个线程之内,这样,无须同步也能保证线程之间不出现数据争用的问题。

符合这种特点的应用并不少见,大部分使用消费队列的架构模式(如“生产者-消费者”模式)都会将产品的消费过程尽量在一个线程中消费完。其中最重要的一个应用实例就是经典 Web 交互模型中的“一个请求对应一个服务器线程”(Thread-per-Request)的处理方式,这种处理方式的广泛应用使得很多 Web 服务端应用都可以使用线程本地存储来解决线程安全问题。

可以使用 java.lang.ThreadLocal 类来实现线程本地存储功能。

对于以下代码,thread1 中设置 threadLocal 为 1,而 thread2 设置 threadLocal 为 2。过了一段时间之后,thread1 读取 threadLocal 依然是 1,不受 thread2 的影响。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class ThreadLocalExample {
public static void main(String[] args) {
ThreadLocal threadLocal = new ThreadLocal();
Thread thread1 = new Thread(() -> {
threadLocal.set(1);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(threadLocal.get());
threadLocal.remove();
});
Thread thread2 = new Thread(() -> {
threadLocal.set(2);
threadLocal.remove();
});
thread1.start();
thread2.start();
}
}
1
1

为了理解 ThreadLocal,先看以下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class ThreadLocalExample1 {
public static void main(String[] args) {
ThreadLocal threadLocal1 = new ThreadLocal();
ThreadLocal threadLocal2 = new ThreadLocal();
Thread thread1 = new Thread(() -> {
threadLocal1.set(1);
threadLocal2.set(1);
});
Thread thread2 = new Thread(() -> {
threadLocal1.set(2);
threadLocal2.set(2);
});
thread1.start();
thread2.start();
}
}

它所对应的底层结构图为:


每个 Thread 都有一个 ThreadLocal.ThreadLocalMap 对象。

1
2
3
/* ThreadLocal values pertaining to this thread. This map is maintained
* by the ThreadLocal class. */
ThreadLocal.ThreadLocalMap threadLocals = null;

当调用一个 ThreadLocal 的 set(T value) 方法时,先得到当前线程的 ThreadLocalMap 对象,然后将 ThreadLocal->value 键值对插入到该 Map 中。

1
2
3
4
5
6
7
8
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}

get() 方法类似。

1
2
3
4
5
6
7
8
9
10
11
12
13
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}

ThreadLocal 从理论上讲并不是用来解决多线程并发问题的,因为根本不存在多线程竞争。

在一些场景 (尤其是使用线程池) 下,由于 ThreadLocal.ThreadLocalMap 的底层数据结构导致 ThreadLocal 有内存泄漏的情况,应该尽可能在每次使用 ThreadLocal 后手动调用 remove(),以避免出现 ThreadLocal 经典的内存泄漏甚至是造成自身业务混乱的风险。

3. 可重入代码(Reentrant Code)

这种代码也叫做纯代码(Pure Code),可以在代码执行的任何时刻中断它,转而去执行另外一段代码(包括递归调用它本身),而在控制权返回后,原来的程序不会出现任何错误。

可重入代码有一些共同的特征,例如不依赖存储在堆上的数据和公用的系统资源、用到的状态量都由参数中传入、不调用非可重入的方法等。

十、锁优化

这里的锁优化主要是指 JVM 对 synchronized 的优化。

自旋锁

互斥同步进入阻塞状态的开销都很大,应该尽量避免。在许多应用中,共享数据的锁定状态只会持续很短的一段时间。自旋锁的思想是让一个线程在请求一个共享数据的锁时执行忙循环(自旋)一段时间,如果在这段时间内能获得锁,就可以避免进入阻塞状态。

自旋锁虽然能避免进入阻塞状态从而减少开销,但是它需要进行忙循环操作占用 CPU 时间,它只适用于共享数据的锁定状态很短的场景。

在 JDK 1.6 中引入了自适应的自旋锁。自适应意味着自旋的次数不再固定了,而是由前一次在同一个锁上的自旋次数及锁的拥有者的状态来决定。

锁消除

锁消除是指对于被检测出不可能存在竞争的共享数据的锁进行消除。

锁消除主要是通过逃逸分析来支持,如果堆上的共享数据不可能逃逸出去被其它线程访问到,那么就可以把它们当成私有数据对待,也就可以将它们的锁进行消除。

对于一些看起来没有加锁的代码,其实隐式的加了很多锁。例如下面的字符串拼接代码就隐式加了锁:

1
2
3
public static String concatString(String s1, String s2, String s3) {
return s1 + s2 + s3;
}

String 是一个不可变的类,编译器会对 String 的拼接自动优化。在 JDK 1.5 之前,会转化为 StringBuffer 对象的连续 append() 操作:

1
2
3
4
5
6
7
public static String concatString(String s1, String s2, String s3) {
StringBuffer sb = new StringBuffer();
sb.append(s1);
sb.append(s2);
sb.append(s3);
return sb.toString();
}

每个 append() 方法中都有一个同步块。虚拟机观察变量 sb,很快就会发现它的动态作用域被限制在 concatString() 方法内部。也就是说,sb 的所有引用永远不会逃逸到 concatString() 方法之外,其他线程无法访问到它,因此可以进行消除。

锁粗化

如果一系列的连续操作都对同一个对象反复加锁和解锁,频繁的加锁操作就会导致性能损耗。

上一节的示例代码中连续的 append() 方法就属于这类情况。如果虚拟机探测到由这样的一串零碎的操作都对同一个对象加锁,将会把加锁的范围扩展(粗化)到整个操作序列的外部。对于上一节的示例代码就是扩展到第一个 append() 操作之前直至最后一个 append() 操作之后,这样只需要加锁一次就可以了。

轻量级锁

JDK 1.6 引入了偏向锁和轻量级锁,从而让锁拥有了四个状态:无锁状态(unlocked)、偏向锁状态(biasble)、轻量级锁状态(lightweight locked)和重量级锁状态(inflated)。

以下是 HotSpot 虚拟机对象头的内存布局,这些数据被称为 Mark Word。其中 tag bits 对应了五个状态,这些状态在右侧的 state 表格中给出。除了 marked for gc 状态,其它四个状态已经在前面介绍过了。


下图左侧是一个线程的虚拟机栈,其中有一部分称为 Lock Record 的区域,这是在轻量级锁运行过程创建的,用于存放锁对象的 Mark Word。而右侧就是一个锁对象,包含了 Mark Word 和其它信息。


轻量级锁是相对于传统的重量级锁而言,它使用 CAS 操作来避免重量级锁使用互斥量的开销。对于绝大部分的锁,在整个同步周期内都是不存在竞争的,因此也就不需要都使用互斥量进行同步,可以先采用 CAS 操作进行同步,如果 CAS 失败了再改用互斥量进行同步。

当尝试获取一个锁对象时,如果锁对象标记为 0 01,说明锁对象的锁未锁定(unlocked)状态。此时虚拟机在当前线程的虚拟机栈中创建 Lock Record,然后使用 CAS 操作将对象的 Mark Word 更新为 Lock Record 指针。如果 CAS 操作成功了,那么线程就获取了该对象上的锁,并且对象的 Mark Word 的锁标记变为 00,表示该对象处于轻量级锁状态。


如果 CAS 操作失败了,虚拟机首先会检查对象的 Mark Word 是否指向当前线程的虚拟机栈,如果是的话说明当前线程已经拥有了这个锁对象,那就可以直接进入同步块继续执行,否则说明这个锁对象已经被其他线程线程抢占了。如果有两条以上的线程争用同一个锁,那轻量级锁就不再有效,要膨胀为重量级锁。

偏向锁

偏向锁的思想是偏向于让第一个获取锁对象的线程,这个线程在之后获取该锁就不再需要进行同步操作,甚至连 CAS 操作也不再需要。

当锁对象第一次被线程获得的时候,进入偏向状态,标记为 1 01。同时使用 CAS 操作将线程 ID 记录到 Mark Word 中,如果 CAS 操作成功,这个线程以后每次进入这个锁相关的同步块就不需要再进行任何同步操作。

当有另外一个线程去尝试获取这个锁对象时,偏向状态就宣告结束,此时撤销偏向(Revoke Bias)后恢复到未锁定状态或者轻量级锁状态。


十一、Java 内存模型

Java 内存模型试图屏蔽各种硬件和操作系统的内存访问差异,以实现让 Java 程序在各种平台下都能达到一致的内存访问效果。

主内存与工作内存

处理器上的寄存器的读写的速度比内存快几个数量级,为了解决这种速度矛盾,在它们之间加入了高速缓存。

加入高速缓存带来了一个新的问题:缓存一致性。如果多个缓存共享同一块主内存区域,那么多个缓存的数据可能会不一致,需要一些协议来解决这个问题。


所有的变量都存储在主内存中,每个线程还有自己的工作内存,工作内存存储在高速缓存或者寄存器中,保存了该线程使用的变量的主内存副本拷贝。

线程只能直接操作工作内存中的变量,不同线程之间的变量值传递需要通过主内存来完成。


内存间交互操作

Java 内存模型定义了 8 个操作来完成主内存和工作内存的交互操作。


  • read:把一个变量的值从主内存传输到工作内存中
  • load:在 read 之后执行,把 read 得到的值放入工作内存的变量副本中
  • use:把工作内存中一个变量的值传递给执行引擎
  • assign:把一个从执行引擎接收到的值赋给工作内存的变量
  • store:把工作内存的一个变量的值传送到主内存中
  • write:在 store 之后执行,把 store 得到的值放入主内存的变量中
  • lock:作用于主内存的变量
  • unlock

内存模型三大特性

1. 原子性

Java 内存模型保证了 read、load、use、assign、store、write、lock 和 unlock 操作具有原子性,例如对一个 int 类型的变量执行 assign 赋值操作,这个操作就是原子性的。但是 Java 内存模型允许虚拟机将没有被 volatile 修饰的 64 位数据(long,double)的读写操作划分为两次 32 位的操作来进行,即 load、store、read 和 write 操作可以不具备原子性。

有一个错误认识就是,int 等原子性的类型在多线程环境中不会出现线程安全问题。前面的线程不安全示例代码中,cnt 属于 int 类型变量,1000 个线程对它进行自增操作之后,得到的值为 997 而不是 1000。

为了方便讨论,将内存间的交互操作简化为 3 个:load、assign、store。

下图演示了两个线程同时对 cnt 进行操作,load、assign、store 这一系列操作整体上看不具备原子性,那么在 T1 修改 cnt 并且还没有将修改后的值写入主内存,T2 依然可以读入旧值。可以看出,这两个线程虽然执行了两次自增运算,但是主内存中 cnt 的值最后为 1 而不是 2。因此对 int 类型读写操作满足原子性只是说明 load、assign、store 这些单个操作具备原子性。


AtomicInteger 能保证多个线程修改的原子性。


使用 AtomicInteger 重写之前线程不安全的代码之后得到以下线程安全实现:

1
2
3
4
5
6
7
8
9
10
11
public class AtomicExample {
private AtomicInteger cnt = new AtomicInteger();

public void add() {
cnt.incrementAndGet();
}

public int get() {
return cnt.get();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void main(String[] args) throws InterruptedException {
final int threadSize = 1000;
AtomicExample example = new AtomicExample(); // 只修改这条语句
final CountDownLatch countDownLatch = new CountDownLatch(threadSize);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < threadSize; i++) {
executorService.execute(() -> {
example.add();
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
System.out.println(example.get());
}
1
1000

除了使用原子类之外,也可以使用 synchronized 互斥锁来保证操作的原子性。它对应的内存间交互操作为:lock 和 unlock,在虚拟机实现上对应的字节码指令为 monitorenter 和 monitorexit。

1
2
3
4
5
6
7
8
9
10
11
public class AtomicSynchronizedExample {
private int cnt = 0;

public synchronized void add() {
cnt++;
}

public synchronized int get() {
return cnt;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void main(String[] args) throws InterruptedException {
final int threadSize = 1000;
AtomicSynchronizedExample example = new AtomicSynchronizedExample();
final CountDownLatch countDownLatch = new CountDownLatch(threadSize);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < threadSize; i++) {
executorService.execute(() -> {
example.add();
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
System.out.println(example.get());
}
1
1000

2. 可见性

可见性指当一个线程修改了共享变量的值,其它线程能够立即得知这个修改。Java 内存模型是通过在变量修改后将新值同步回主内存,在变量读取前从主内存刷新变量值来实现可见性的。

主要有三种实现可见性的方式:

  • volatile
  • synchronized,对一个变量执行 unlock 操作之前,必须把变量值同步回主内存。
  • final,被 final 关键字修饰的字段在构造器中一旦初始化完成,并且没有发生 this 逃逸(其它线程通过 this 引用访问到初始化了一半的对象),那么其它线程就能看见 final 字段的值。

对前面的线程不安全示例中的 cnt 变量使用 volatile 修饰,不能解决线程不安全问题,因为 volatile 并不能保证操作的原子性。

3. 有序性

有序性是指:在本线程内观察,所有操作都是有序的。在一个线程观察另一个线程,所有操作都是无序的,无序是因为发生了指令重排序。在 Java 内存模型中,允许编译器和处理器对指令进行重排序,重排序过程不会影响到单线程程序的执行,却会影响到多线程并发执行的正确性。

volatile 关键字通过添加内存屏障的方式来禁止指令重排,即重排序时不能把后面的指令放到内存屏障之前。

也可以通过 synchronized 来保证有序性,它保证每个时刻只有一个线程执行同步代码,相当于是让线程顺序执行同步代码。

先行发生原则

上面提到了可以用 volatile 和 synchronized 来保证有序性。除此之外,JVM 还规定了先行发生原则,让一个操作无需控制就能先于另一个操作完成。

1. 单一线程原则

Single Thread rule

在一个线程内,在程序前面的操作先行发生于后面的操作。


2. 管程锁定规则

Monitor Lock Rule

一个 unlock 操作先行发生于后面对同一个锁的 lock 操作。


3. volatile 变量规则

Volatile Variable Rule

对一个 volatile 变量的写操作先行发生于后面对这个变量的读操作。


4. 线程启动规则

Thread Start Rule

Thread 对象的 start() 方法调用先行发生于此线程的每一个动作。


5. 线程加入规则

Thread Join Rule

Thread 对象的结束先行发生于 join() 方法返回。


6. 线程中断规则

Thread Interruption Rule

对线程 interrupt() 方法的调用先行发生于被中断线程的代码检测到中断事件的发生,可以通过 interrupted() 方法检测到是否有中断发生。

7. 对象终结规则

Finalizer Rule

一个对象的初始化完成(构造函数执行结束)先行发生于它的 finalize() 方法的开始。

8. 传递性

Transitivity

如果操作 A 先行发生于操作 B,操作 B 先行发生于操作 C,那么操作 A 先行发生于操作 C。

十二、多线程开发良好的实践

  • 给线程起个有意义的名字,这样可以方便找 Bug。

  • 缩小同步范围,从而减少锁争用。例如对于 synchronized,应该尽量使用同步块而不是同步方法。

  • 多用同步工具少用 wait() 和 notify()。首先,CountDownLatch, CyclicBarrier, Semaphore 和 Exchanger 这些同步类简化了编码操作,而用 wait() 和 notify() 很难实现复杂控制流;其次,这些同步类是由最好的企业编写和维护,在后续的 JDK 中还会不断优化和完善。

  • 使用 BlockingQueue 实现生产者消费者问题。

  • 多用并发集合少用同步集合,例如应该使用 ConcurrentHashMap 而不是 Hashtable。

  • 使用本地变量和不可变类来保证线程安全。

  • 使用线程池而不是直接创建线程,这是因为创建线程代价很高,线程池可以有效地利用有限的线程来启动任务。

参考资料