抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

在多线程环境下,普通的集合类(如ArrayList、HashMap)不是线程安全的。本文介绍Java中常用的并发安全集合类,帮助你在多线程编程中选择合适的数据结构。

为什么需要并发安全的集合

普通集合在多线程环境下会出现问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 多线程操作普通HashMap,可能导致死循环、数据丢失
Map<String, Integer> map = new HashMap<>();

// 线程1
new Thread(() -> {
for (int i = 0; i < 10000; i++) {
map.put("key" + i, i);
}
}).start();

// 线程2
new Thread(() -> {
for (int i = 10000; i < 20000; i++) {
map.put("key" + i, i);
}
}).start();

Java提供了两类解决方案:

  1. 同步包装器Collections.synchronizedXxx() 方法
  2. 并发集合类java.util.concurrent 包中的专用类

同步包装器简单但性能差(全局锁),并发集合类采用更细粒度的锁或无锁算法,性能更好。

ConcurrentHashMap

最常用的并发Map实现,是 HashMap 的线程安全替代品。

基本用法

1
2
3
4
5
6
7
8
9
10
11
12
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();

// 基本操作与HashMap相同
map.put("apple", 1);
map.get("apple");
map.remove("apple");

// 原子操作方法
map.putIfAbsent("banana", 2); // 不存在才放入
map.computeIfAbsent("orange", k -> 3); // 不存在才计算并放入
map.computeIfPresent("apple", (k, v) -> v + 1); // 存在才更新
map.merge("apple", 1, Integer::sum); // 合并值

与Hashtable的区别

特性 Hashtable ConcurrentHashMap
锁粒度 整个表(全局锁) 分段锁/CAS(JDK8+)
null值 不允许null键值 不允许null键值
迭代器 fail-fast 弱一致性
性能
推荐 已过时 推荐使用

JDK 7 vs JDK 8 实现变化

JDK 7:分段锁(Segment),默认16个段,最多支持16个线程并发写

1
2
3
4
5
6
7
┌────────┬────────┬────────┬────────┐
│Segment0│Segment1│Segment2│ ... │
├────────┼────────┼────────┼────────┤
│ Entry │ Entry │ Entry │ ... │
│ Entry │ Entry │ │ │
└────────┴────────┴────────┴────────┘
每个Segment是一个小HashMap,有独立的锁

JDK 8+:Node数组 + CAS + synchronized

1
2
3
4
5
6
7
8
9
10
11
12
┌───┬───┬───┬───┬───┬───┬───┬───┐
│ 0 │ 1 │ 2 │ 3 │ 4 │ 5 │ 6 │ 7 │ ...
└─┬─┴───┴─┬─┴───┴───┴─┬─┴───┴───┘
│ │ │
▼ ▼ ▼
Node Node Node
│ │ │
▼ ▼ ▼
Node TreeNode Node


TreeNode
  • 锁粒度更细:只锁单个桶(bin)
  • 链表长度超过8转红黑树
  • 使用CAS操作减少锁竞争

常见使用场景

1
2
3
4
5
6
7
8
9
10
11
12
13
// 1. 计数器
ConcurrentHashMap<String, LongAdder> counters = new ConcurrentHashMap<>();
counters.computeIfAbsent("pageView", k -> new LongAdder()).increment();

// 2. 缓存
ConcurrentHashMap<String, Object> cache = new ConcurrentHashMap<>();
Object value = cache.computeIfAbsent(key, k -> loadFromDB(k));

// 3. 去重统计
ConcurrentHashMap<String, Boolean> seen = new ConcurrentHashMap<>();
if (seen.putIfAbsent(item, Boolean.TRUE) == null) {
// 第一次看到这个item
}

CopyOnWriteArrayList

写时复制的List实现,适合读多写少的场景。

工作原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
读操作:直接读取当前数组,无锁
┌───┬───┬───┬───┐
读线程 ──▶│ A │ B │ C │ D │
└───┴───┴───┴───┘

写操作:复制新数组,修改后替换引用
┌───┬───┬───┬───┐
原数组 │ A │ B │ C │ D │
└───┴───┴───┴───┘
↓ 复制
┌───┬───┬───┬───┬───┐
新数组 │ A │ B │ C │ D │ E │ ← 写入新元素
└───┴───┴───┴───┴───┘
↓ 替换引用
array = newArray

基本用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();

// 添加元素(会复制整个数组)
list.add("apple");
list.addAll(Arrays.asList("banana", "orange"));

// 读取元素(无锁,直接访问)
String first = list.get(0);
for (String item : list) {
System.out.println(item);
}

// 遍历时可以安全地修改
for (String item : list) {
if (item.equals("apple")) {
list.remove(item); // 不会抛出ConcurrentModificationException
}
}

适用场景

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 1. 监听器列表(读多写少)
CopyOnWriteArrayList<EventListener> listeners = new CopyOnWriteArrayList<>();

public void addListener(EventListener listener) {
listeners.add(listener); // 写操作少
}

public void fireEvent(Event event) {
for (EventListener listener : listeners) { // 读操作多
listener.onEvent(event);
}
}

// 2. 配置列表(启动时写入,运行时只读)
CopyOnWriteArrayList<String> blacklist = new CopyOnWriteArrayList<>();
// 启动时加载
blacklist.addAll(loadBlacklistFromDB());
// 运行时只读
boolean blocked = blacklist.contains(userId);

注意事项

  • 写操作代价高:每次写都复制整个数组,O(n) 复杂度
  • 内存占用:写时会有两份数组
  • 弱一致性:迭代器看到的是创建时的快照

CopyOnWriteArraySet

基于 CopyOnWriteArrayList 实现的Set,特性相同。

1
2
3
4
CopyOnWriteArraySet<String> set = new CopyOnWriteArraySet<>();
set.add("apple");
set.add("apple"); // 重复元素不会添加
System.out.println(set.size()); // 1

ConcurrentLinkedQueue

无界非阻塞队列,基于CAS实现,适合高并发场景。

1
2
3
4
5
6
7
8
9
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();

// 入队
queue.offer("task1");
queue.add("task2");

// 出队
String task = queue.poll(); // 返回并移除头部,空则返回null
String head = queue.peek(); // 只查看头部,不移除

与LinkedList的区别

特性 LinkedList ConcurrentLinkedQueue
线程安全
实现方式 双向链表 单向链表 + CAS
阻塞操作 不支持 不支持
null元素 允许 不允许

BlockingQueue 阻塞队列

阻塞队列在队列空时获取会阻塞,队列满时插入会阻塞,非常适合生产者-消费者模式。

核心方法

操作 抛异常 返回特殊值 阻塞 超时
插入 add(e) offer(e) put(e) offer(e, time, unit)
移除 remove() poll() take() poll(time, unit)
检查 element() peek() - -

ArrayBlockingQueue

有界阻塞队列,基于数组实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 创建容量为100的队列
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(100);

// 生产者
new Thread(() -> {
try {
queue.put("product"); // 队列满时阻塞
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();

// 消费者
new Thread(() -> {
try {
String product = queue.take(); // 队列空时阻塞
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();

LinkedBlockingQueue

可选有界的阻塞队列,基于链表实现。

1
2
3
4
5
// 无界队列(实际上界是Integer.MAX_VALUE)
LinkedBlockingQueue<String> unbounded = new LinkedBlockingQueue<>();

// 有界队列
LinkedBlockingQueue<String> bounded = new LinkedBlockingQueue<>(1000);

ArrayBlockingQueue vs LinkedBlockingQueue

特性 ArrayBlockingQueue LinkedBlockingQueue
底层结构 数组 链表
是否有界 必须指定容量 可选,默认无界
单锁 双锁(头尾分离)
内存 预分配 动态分配
吞吐量 较低 较高

PriorityBlockingQueue

支持优先级的无界阻塞队列。

1
2
3
4
5
6
7
8
9
10
11
// 按任务优先级排序
PriorityBlockingQueue<Task> queue = new PriorityBlockingQueue<>(
11,
Comparator.comparingInt(Task::getPriority)
);

queue.put(new Task("low", 3));
queue.put(new Task("high", 1));
queue.put(new Task("medium", 2));

queue.take(); // 返回high(优先级1最高)

DelayQueue

延迟队列,元素只有到期后才能被取出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class DelayedTask implements Delayed {
private String name;
private long executeTime;

public DelayedTask(String name, long delayMs) {
this.name = name;
this.executeTime = System.currentTimeMillis() + delayMs;
}

@Override
public long getDelay(TimeUnit unit) {
long diff = executeTime - System.currentTimeMillis();
return unit.convert(diff, TimeUnit.MILLISECONDS);
}

@Override
public int compareTo(Delayed o) {
return Long.compare(this.executeTime, ((DelayedTask) o).executeTime);
}
}

// 使用
DelayQueue<DelayedTask> queue = new DelayQueue<>();
queue.put(new DelayedTask("task1", 5000)); // 5秒后执行
queue.put(new DelayedTask("task2", 1000)); // 1秒后执行

DelayedTask task = queue.take(); // 阻塞直到最近的任务到期

适用场景:

  • 定时任务调度
  • 缓存过期处理
  • 订单超时取消

SynchronousQueue

不存储元素的阻塞队列,每个put必须等待一个take。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
SynchronousQueue<String> queue = new SynchronousQueue<>();

// 生产者线程
new Thread(() -> {
try {
queue.put("data"); // 阻塞,直到有消费者take
System.out.println("已交付");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();

// 消费者线程
new Thread(() -> {
try {
Thread.sleep(1000);
String data = queue.take(); // 直接从生产者手中获取
System.out.println("收到: " + data);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();

适用于直接传递的场景,如 Executors.newCachedThreadPool() 内部使用。

ConcurrentSkipListMap

基于跳表实现的并发有序Map,是 TreeMap 的线程安全替代品。

跳表结构

1
2
3
4
5
6
7
8
Level 3:  1 ─────────────────────────────▶ 9
Level 2: 1 ────────▶ 4 ─────────────────▶ 9
Level 1: 1 ───▶ 3 ─▶ 4 ───▶ 6 ─────────▶ 9
Level 0: 1 ─▶ 2 ─▶ 3 ─▶ 4 ─▶ 5 ─▶ 6 ─▶ 7 ─▶ 9

查找6:从Level 3开始,1→9太大,降级
Level 2:1→4→9太大,降级
Level 1:1→3→4→6,找到!

基本用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
ConcurrentSkipListMap<Integer, String> map = new ConcurrentSkipListMap<>();

map.put(3, "three");
map.put(1, "one");
map.put(2, "two");

// 有序遍历
map.forEach((k, v) -> System.out.println(k + "=" + v));
// 输出:1=one, 2=two, 3=three

// 范围操作
SortedMap<Integer, String> subMap = map.subMap(1, 3); // [1, 3)
map.firstKey(); // 1
map.lastKey(); // 3
map.higherKey(2); // 3
map.lowerKey(2); // 1

ConcurrentSkipListMap vs ConcurrentHashMap

特性 ConcurrentHashMap ConcurrentSkipListMap
有序性 无序 按key有序
时间复杂度 O(1) O(log n)
范围查询 不支持 支持
内存占用 较少 较多
null 不允许 不允许

ConcurrentSkipListSet

基于 ConcurrentSkipListMap 实现的并发有序Set。

1
2
3
4
5
6
7
8
9
10
11
12
13
ConcurrentSkipListSet<Integer> set = new ConcurrentSkipListSet<>();
set.add(3);
set.add(1);
set.add(2);

// 有序遍历
for (Integer i : set) {
System.out.println(i); // 1, 2, 3
}

// 范围操作
set.headSet(2); // [1]
set.tailSet(2); // [2, 3]

如何选择并发集合

Map类

1
2
3
4
需要Map?
├── 需要有序吗?
│ ├── 是 → ConcurrentSkipListMap
│ └── 否 → ConcurrentHashMap(首选)

List类

1
2
3
4
需要List?
├── 读多写少?
│ ├── 是 → CopyOnWriteArrayList
│ └── 否 → Collections.synchronizedList() 或考虑其他方案

Queue类

1
2
3
4
5
6
7
8
9
10
需要Queue?
├── 需要阻塞?
│ ├── 是 → BlockingQueue系列
│ │ ├── 需要优先级? → PriorityBlockingQueue
│ │ ├── 需要延迟? → DelayQueue
│ │ ├── 直接传递? → SynchronousQueue
│ │ └── 普通队列?
│ │ ├── 有界 → ArrayBlockingQueue
│ │ └── 无界 → LinkedBlockingQueue
│ └── 否 → ConcurrentLinkedQueue

Set类

1
2
3
4
需要Set?
├── 需要有序吗?
│ ├── 是 → ConcurrentSkipListSet
│ └── 否 → ConcurrentHashMap.newKeySet() 或 CopyOnWriteArraySet

总结

集合类 特点 适用场景
ConcurrentHashMap 高性能并发Map 通用并发Map场景
CopyOnWriteArrayList 写时复制 读多写少的List
CopyOnWriteArraySet 写时复制 读多写少的Set
ConcurrentLinkedQueue 无锁队列 高并发非阻塞队列
ArrayBlockingQueue 有界阻塞 生产者-消费者(需限流)
LinkedBlockingQueue 可选有界阻塞 生产者-消费者(高吞吐)
PriorityBlockingQueue 优先级阻塞 任务调度
DelayQueue 延迟阻塞 定时任务、缓存过期
SynchronousQueue 直接传递 线程间直接交换
ConcurrentSkipListMap 有序并发Map 需要范围查询的并发Map
ConcurrentSkipListSet 有序并发Set 需要排序的并发Set

选择原则:

  • 优先使用并发集合而非同步包装器
  • 根据读写比例选择:读多写少用CopyOnWrite系列
  • 根据是否需要阻塞选择:生产者-消费者用BlockingQueue
  • 根据是否需要有序选择:需要有序用SkipList系列