在现实应用中,有些场景可能需要多个线程按照指定的规则共同完成一项任务。这时就需要多个线程互相协调,这个过程被称为线程通信。

线程通信主要分为 3 种方式,分别为共享内存、消息传递和管道流。

1、共享内存实现线程通信

一个进程下的多个线程共享该进程被分配的内存空间,内存空间中一些特定区域(主内存)的数据可以被该进程下的多个线程共同访问,这些数据被称为进程(程序)的公共状态。

一个进程下的多个线程之间可以通过读/写内存中的公共状态来实现隐式通信。例如,A、B 两个线程配合完成一项工作,B 线程需要基于 A 线程处理后的结果向下进行。此时,可以设计两个公共状态:

一个状态用来标记 A 线程是否已执行完毕;

另一个状态用来保存 A 线程的处理结果。

B 线程可以通过访问第一个公共状态来确认 A 线程是否已执行完毕,并在 A 线程执行完毕再访问第二个公共状态,取得 A 线程的处理结果。

基于共享内存的线程通信可能存在并发问题,因为 Java 内存模型规定,线程对公共状态的操作(读取、赋值等)必须在自己的栈空间中进行。因此,线程需要先从主内存中将公共状态的值复制到自己的栈空间中。后续如果读取,那么使用栈空间中的数据;后续如果修改,那么先修改自己的栈空间中副本的值,再将修改后的值写到主内存中。

线程访问共享数据的示意图如下图所示:

图 1 线程访问共享数据的示意图

从图 1 中可以看到,线程 2 的栈空间中保存的 num 变量的副本为 10,后续线程 1 对主内存中的变量进行更新,但是不会主动通知线程 2。在这种情况下,可能会出现并发问题,即资源不可见问题。

【实例】volatile 共享内存的应用。

public class Example {

public static void main(String[] args) {

// 创建保存共享数据的对象

SharedData sharedData = new SharedData();

// 启动一个线程修改 sharedData 对象的变量 flag,将变量 flag 的值改为 false

new Thread(new Runnable() {

@Override

public void run() {

String name = Thread.currentThread().getName();

System.out.println("线程" + name + "正在执行");

try {

Thread.sleep(3000);

} catch (Exception e) {

e.printStackTrace();

}

sharedData.setFlagFalse();

System.out.println("线程" + name + "更新后,flag 的值为" + sharedData.flag);

}

}).start();

// 确定主线程的副本是否会自动更新

while (sharedData.flag) {

// 当上面的线程将变量 flag 的值改为 false 之后

// 如果没有自动更新,就会一直在循环中执行

}

System.out.println("主线程运行终止");

}

}

class SharedData {

boolean flag = true;

// 将变量 flag 的值改为 false

public void setFlagFalse() {

this.flag = false;

}

}

运行结果为:

线程Thread-0正在执行

线程Thread-0更新后,flag的值为false

由此可知,虽然主线程和子线程 1 访问的都是 sharedData 对象的变量 flag,但是在子线程 1 对变量 flag 的值进行修改后,主线程并没有跳出循环,即主线程使用的是自己栈空间中保存的变量 flag 的副本,值始终是 true。

想要解决上述问题,可以使用 Java 提供的关键字 volatile。关键字 volatile 可以修饰字段(成员变量),即规定线程对该变量的访问均需要从共享内存中获取,对该变量的修改也必须同步刷新到共享内存中,以保证资源的可见性。

需要注意的是,过多地使用关键字 volatile 可能会降低程序的效率。

【实例】关键字volatile共享内存的应用。

public class Example {

public static void main(String[] args) {

// 创建保存共享数据的对象

SharedData2 sharedData = new SharedData2();

// 启动一个线程修改 sharedData 对象的变量 flag,将变量 flag 的值改为 false

new Thread(new Runnable() {

@Override

public void run() {

String name = Thread.currentThread().getName();

System.out.println("线程" + name + "正在执行");

try {

Thread.sleep(3000);

} catch (Exception e) {

e.printStackTrace();

}

sharedData.setFlagFalse();

System.out.println("线程" + name + "更新后,flag 的值为" + sharedData.flag);

}

}).start();

// 确定主线程的副本是否会自动更新

while (sharedData.flag) {

// 当上面的线程将变量 flag 的值改为 false 之后

// 如果没有自动更新,就会一直在循环中执行

}

System.out.println("主线程运行终止");

}

}

class SharedData2 {

// 使用关键字 volatile 修饰变量 flag

volatile boolean flag = true;

// 将变量 flag 的值改为 false

public void setFlagFalse() {

this.flag = false;

}

}

运行结果为:

线程Thread-0正在执行

主线程运行终止

线程Thread-0更新后,flag的值为false

2、消息传递实现线程通信

Java 模型中的多个线程在共享数据时,需要交替地占用临界资源来执行各自的方法,所以就需要线程通信。

在线程通信的消息传递过程中,最常用的就是等待通知(wait/notify)方式。等待通知方式就是将处于等待状态的线程由其他线程发出通知后重新获取 CPU 资源,继续执行之前没有执行完的任务。

Java 提供了如下 3 个方法来实现线程之间的消息传递。

wait():导致当前线程等待,释放同步监听器的锁定;直到其他线程调用该线程的同步监听器的 notify() 方法或 notifyAll() 方法来唤醒该线程。

notify():随机唤醒在此同步监听器上等待的其他单个线程(再次调用 wait() 方法,让当前线程等待,释放同步监听器,这样才可以执行被唤醒的线程)。

notifyAll():唤醒所有在此同步监听器上等待的线程(再次调用 wait() 方法,让当前线程等待,释放同步监听器,这样才可以执行被唤醒的线程)。

上述 3 个方法的调用者必须是同步代码块或同步方法中的同步监听器,否则会出现 IllegalMonitorStateException 异常。

这 3 个方法定义在 java.lang.Object 类中,属于 final 方法。

【实例】使用两个线程打印 1~10,线程 1 和线程 2 交替打印,实现线程通信。

public class Example {

public static void main(String[] args) {

Number number1 = new Number();

Thread t1 = new Thread(number1);

Thread t2 = new Thread(number1);

t1.setName("线程1");

t2.setName("线程2");

t1.start();

t2.start();

}

}

class Number implements Runnable {

private int number = 1;

@Override

public void run() {

while (true) {

synchronized (this) {

// 必须在同步代码块中调用 notify() 方法

notify();

if (number <= 10) {

try {

Thread.sleep(10);

} catch (InterruptedException e) {

e.printStackTrace();

}

System.out.println(Thread.currentThread().getName() + "打印" + number);

number++;

} else {

break;

}

try {

wait();

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

}

}

运行结果为:

线程1打印1

线程2打印2

线程1打印3

线程2打印4

线程1打印5

线程2打印6

线程1打印7

线程2打印8

线程1打印9

线程2打印10

3、管道流实现线程通信

管道流是一种较少使用的线程间通信方式。

和普通文件输入/输出流或网络输出/输出流不同,管道输入/输出流主要用于线程之间的数据传输,传输媒介为管道。

管道输入/输出流主要包括 4 种具体的实现,分别为 PipedOutputStream、PipedInputStream、PipedReader 和 PipedWriter,前两种面向字节,后两种面向字符。

Java 的管道的输入和输出实际上是使用循环缓存数组来实现的,默认为 1024,输入流从这个数组中读取数据,输出流从这个数组中写入数据。当数组已满时,输出流所在的线程就会被阻塞;当这个数组为空时,输入流所在的线程就会被阻塞。

【实例】使用管道流实现线程通信。

import java.io.IOException;

import java.io.PipedReader;

import java.io.PipedWriter;

public class Example {

public static void main(String[] args) throws IOException {

PipedWriter writer = new PipedWriter();

PipeedReader reader = new PipedReader();

writer.connect(reader);

Thread t1 = new Thread(() -> {

System.out.println("writer running");

try {

for (int i = 0; i < 5; i++) {

writer.write(i);

Thread.sleep(1000);

}

} catch (Exception e) {

e.printStackTrace();

} finally {

try {

writer.close();

} catch (IOException e) {

e.printStackTrace();

}

}

System.out.println("writer ending");

});

Thread t2 = new Thread(() -> {

System.out.println("reader running");

int message = 0;

try {

while ((message = reader.read()) != -1) {

System.out.println("message = " + message + " , time --> " + System.currentTimeMillis());

}

} catch (Exception e) {

} finally {

try {

reader.close();

} catch (IOException e) {

e.printStackTrace();

}

}

System.out.println("reader ending");

});

t1.start();

t2.start();

}

}

运行结果为:

writer running

reader running

writer running

reader running

message = 0 , time --> 1657428909901

message = 1 , time --> 1657428911917

message = 2 , time --> 1657428911917

message = 3 , time --> 1657428913928

message = 4 , time --> 1657428913929

writer ending

reader ending

sleep()方法和wait()方法的异同

sleep() 方法和 wait() 方法的相同点:一旦执行方法,就可以使当前线程进入阻塞状态。

sleep()方法和wait()方法的不同点如下:

声明两个方法的位置不同:Thread 类中声明的是 sleep() 方法,Object 类中声明的是 wait() 方法。

调用的要求不同:sleep() 方法可以在任何需要的场景下调用,wait() 方法必须在同步代码块或同步方法中调用。

是否释放同步监听器:如果两个方法都在同步代码块或同步方法中调用,那么 sleep() 方法不释放同步监听器,wait() 方法会释放同步监听器。


平衡##漫画下拉式
还需查看戴尔(Dell)更多信息?