阻塞队列(以 ArrayBlockingQueue 为例)笔记

一、什么是阻塞队列?

阻塞队列(BlockingQueue)是 Java 并发包中一种特殊的队列,它在普通队列的基础上增加了 阻塞功能

  • 当队列满时,生产者线程会阻塞等待(无法插入元素);
  • 当队列空时,取元素的线程会阻塞等待(无法获取元素)。

这种特性使其非常适合 生产者 - 消费者模型,无需手动处理线程同步,简化了多线程协作。

二、核心特性(以代码中的 ArrayBlockingQueue 为例)

  1. 有界队列

    ArrayBlockingQueue 是 有界队列(初始化时必须指定容量,如代码中 new ArrayBlockingQueue<>(10) 表示容量为 10),一旦队列满,生产者线程会被阻塞。

  2. 阻塞插入与取出

    • 插入元素put() 方法在队列满时会阻塞线程,直到队列有空闲位置;
    • 取出元素take() 方法在队列空时会阻塞线程,直到队列中有元素。

    (代码中:生产者用 put() 放数据,消费者用 take() 取数据,自动实现了 “生产 - 消费” 的同步)

三、代码中的生产者 - 消费者模型

  1. 生产者(Producter)

    循环生成随机整数,通过 q.put(...) 放入队列。若队列已满(达到 10 个元素),put() 会阻塞生产者线程,直到消费者取走元素后才继续放入。

  2. 消费者(Consumer)

    每 200ms 循环从队列中用 q.take(...) 取元素并打印。若队列空,take() 会阻塞消费者线程,直到生产者放入新元素后才继续取出。

  3. 自动协调

    无需额外加锁(如 synchronized 或 Lock),阻塞队列内部已实现线程安全,自动协调生产者和消费者的速度(生产快了就等消费,消费快了就等生产)。

四、常见阻塞队列及特点

阻塞队列类型 特点 适用场景
ArrayBlockingQueue 基于数组实现,有界,初始化需指定容量 固定大小的生产者 - 消费者模型
LinkedBlockingQueue 基于链表实现,默认无界(可指定容量) 任务队列(如线程池任务队列)
SynchronousQueue 无容量,放入元素后必须等待被取走 线程间直接传递数据
PriorityBlockingQueue 支持优先级排序,无界 按优先级处理任务

五、核心方法

阻塞队列的方法分为三类(以插入和取出为例):

操作类型 满队列时的行为 空队列时的行为 示例方法
阻塞式 阻塞等待 阻塞等待 put(e)take()
非阻塞式 返回 false 或抛出异常 返回 null 或抛出异常 add(e)remove()
超时式 超时后返回 false 超时后返回 null offer(e, timeout)poll(timeout)

代码中使用的 put() 和 take() 是 阻塞式方法,最适合需要等待的场景。

六、优点

  1. 线程安全:内部实现了同步机制(如 ReentrantLock),无需手动处理锁;
  2. 简化协作:自动处理生产者 - 消费者的速度匹配(满则等、空则等);
  3. 高效:比手动用锁实现的队列更简洁,性能更稳定。

总结

阻塞队列是多线程协作的重要工具,尤其适合生产者 - 消费者模型。ArrayBlockingQueue 作为有界队列,通过 put() 和 take() 的阻塞特性,轻松实现了生产与消费的同步,避免了线程安全问题和复杂的锁管理。


代码:

以下是阻塞队列(ArrayBlockingQueue 及其他常用实现)的代码示例,结合不同场景展示其用法:

示例 1:基础生产者 - 消费者模型(ArrayBlockingQueue)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;

public class BlockingQueueDemo {
public static void main(String[] args) {
// 创建容量为3的有界阻塞队列
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(3);

// 启动1个生产者线程
new Thread(new Producer(queue), "生产者").start();
// 启动1个消费者线程
new Thread(new Consumer(queue), "消费者").start();
}

// 生产者:向队列中放入数据
static class Producer implements Runnable {
private final ArrayBlockingQueue<Integer> queue;

public Producer(ArrayBlockingQueue<Integer> queue) {
this.queue = queue;
}

@Override
public void run() {
Random random = new Random();
while (true) {
int num = random.nextInt(100); // 生成随机数
try {
queue.put(num); // 队列满时阻塞
System.out.println(Thread.currentThread().getName() + " 放入:" + num + ",当前队列大小:" + queue.size());
Thread.sleep(500); // 模拟生产耗时
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

// 消费者:从队列中取数据
static class Consumer implements Runnable {
private final ArrayBlockingQueue<Integer> queue;

public Consumer(ArrayBlockingQueue<Integer> queue) {
this.queue = queue;
}

@Override
public void run() {
while (true) {
try {
int num = queue.take(); // 队列空时阻塞
System.out.println(Thread.currentThread().getName() + " 取出:" + num + ",当前队列大小:" + queue.size());
Thread.sleep(1000); // 模拟消费耗时(比生产慢)
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}

运行效果

  • 生产者每 500ms 放一个数据,消费者每 1000ms 取一个数据;
  • 当队列满(3 个元素)时,生产者会阻塞等待,直到消费者取走数据后再继续放入;
  • 输出类似:
1
2
3
4
5
6
7
生产者 放入:45,当前队列大小:1
消费者 取出:45,当前队列大小:0
生产者 放入:72,当前队列大小:1
生产者 放入:33,当前队列大小:2
消费者 取出:72,当前队列大小:1
生产者 放入:18,当前队列大小:2
...

示例 2:非阻塞与超时方法(ArrayBlockingQueue)

展示阻塞队列的三类方法(阻塞、非阻塞、超时):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

public class BlockingQueueMethods {
public static void main(String[] args) throws InterruptedException {
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(2);

// 1. 阻塞式方法:put(满则等)、take(空则等)
queue.put("A");
queue.put("B"); // 队列已满,再put会阻塞(此处若再加put会卡住)
System.out.println("阻塞式放入后:" + queue); // [A, B]

String s1 = queue.take();
System.out.println("阻塞式取出:" + s1 + ",剩余:" + queue); // [B]

// 2. 非阻塞式方法:add(满则抛异常)、remove(空则抛异常)
boolean addOk = queue.add("C"); // 队列有空间,添加成功
System.out.println("add成功:" + addOk + ",队列:" + queue); // [B, C]

try {
queue.add("D"); // 队列已满,add会抛IllegalStateException
} catch (IllegalStateException e) {
System.out.println("add失败:队列已满");
}

// 3. 超时式方法:offer(满则等超时)、poll(空则等超时)
boolean offerOk = queue.offer("D", 1, TimeUnit.SECONDS); // 等待1秒,若仍满则返回false
System.out.println("offer超时后是否成功:" + offerOk); // false(1秒内队列未空)

String s2 = queue.poll(1, TimeUnit.SECONDS); // 等待1秒,取出B
System.out.println("poll超时取出:" + s2 + ",剩余:" + queue); // [C]
}
}

示例 3:无界队列(LinkedBlockingQueue)

LinkedBlockingQueue 默认无界(容量为 Integer.MAX_VALUE),适合任务数不确定的场景:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import java.util.concurrent.LinkedBlockingQueue;

public class LinkedBlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
// 无界队列(默认容量极大,可视为无限)
LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();

// 快速放入1000个元素(不会阻塞,因为无界)
for (int i = 0; i < 1000; i++) {
queue.put("任务" + i);
}
System.out.println("放入1000个任务后,队列大小:" + queue.size()); // 1000

// 取出第一个任务
System.out.println("取出:" + queue.take()); // 任务0
}
}

示例 4:同步队列(SynchronousQueue)

SynchronousQueue 容量为 0,放入元素后必须等待被取走才能继续:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import java.util.concurrent.SynchronousQueue;

public class SynchronousQueueDemo {
public static void main(String[] args) {
SynchronousQueue<String> queue = new SynchronousQueue<>();

// 生产者线程:放入元素后会阻塞,直到被取走
new Thread(() -> {
try {
System.out.println("生产者:准备放入数据");
queue.put("同步数据"); // 必须等待消费者取走,否则一直阻塞
System.out.println("生产者:数据被取走,继续执行");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();

// 消费者线程:1秒后取数据
new Thread(() -> {
try {
Thread.sleep(1000); // 延迟1秒
String data = queue.take(); // 取走数据,释放生产者
System.out.println("消费者:取出数据:" + data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}

运行效果

生产者放入数据后阻塞 1 秒,直到消费者取走数据才继续执行。

总结

阻塞队列通过内置的阻塞机制,简化了多线程协作:

  • ArrayBlockingQueue 适合固定容量场景,生产者 - 消费者速度匹配;
  • LinkedBlockingQueue 适合任务数不确定的场景;
  • SynchronousQueue 适合线程间直接传递数据(无缓冲);
  • 核心方法分为阻塞式(put/take)、非阻塞式(add/remove)、超时式(offer/poll),根据场景选择即可。