在多线程环境下,普通的集合类(如ArrayList、HashMap)不是线程安全的。本文介绍Java中常用的并发安全集合类,帮助你在多线程编程中选择合适的数据结构。
为什么需要并发安全的集合 普通集合在多线程环境下会出现问题:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 Map<String, Integer> map = new HashMap <>(); new Thread (() -> { for (int i = 0 ; i < 10000 ; i++) { map.put("key" + i, i); } }).start(); new Thread (() -> { for (int i = 10000 ; i < 20000 ; i++) { map.put("key" + i, i); } }).start();
Java提供了两类解决方案:
同步包装器 :Collections.synchronizedXxx() 方法
并发集合类 :java.util.concurrent 包中的专用类
同步包装器简单但性能差(全局锁),并发集合类采用更细粒度的锁或无锁算法,性能更好。
ConcurrentHashMap 最常用的并发Map实现,是 HashMap 的线程安全替代品。
基本用法 1 2 3 4 5 6 7 8 9 10 11 12 ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap <>(); map.put("apple" , 1 ); map.get("apple" ); map.remove("apple" ); map.putIfAbsent("banana" , 2 ); map.computeIfAbsent("orange" , k -> 3 ); map.computeIfPresent("apple" , (k, v) -> v + 1 ); map.merge("apple" , 1 , Integer::sum);
与Hashtable的区别
特性
Hashtable
ConcurrentHashMap
锁粒度
整个表(全局锁)
分段锁/CAS(JDK8+)
null值
不允许null键值
不允许null键值
迭代器
fail-fast
弱一致性
性能
差
好
推荐
已过时
推荐使用
JDK 7 vs JDK 8 实现变化 JDK 7 :分段锁(Segment),默认16个段,最多支持16个线程并发写
1 2 3 4 5 6 7 ┌────────┬────────┬────────┬────────┐ │Segment0│Segment1│Segment2│ ... │ ├────────┼────────┼────────┼────────┤ │ Entry │ Entry │ Entry │ ... │ │ Entry │ Entry │ │ │ └────────┴────────┴────────┴────────┘ 每个Segment是一个小HashMap,有独立的锁
JDK 8+ :Node数组 + CAS + synchronized
1 2 3 4 5 6 7 8 9 10 11 12 ┌───┬───┬───┬───┬───┬───┬───┬───┐ │ 0 │ 1 │ 2 │ 3 │ 4 │ 5 │ 6 │ 7 │ ... └─┬─┴───┴─┬─┴───┴───┴─┬─┴───┴───┘ │ │ │ ▼ ▼ ▼ Node Node Node │ │ │ ▼ ▼ ▼ Node TreeNode Node │ ▼ TreeNode
锁粒度更细:只锁单个桶(bin)
链表长度超过8转红黑树
使用CAS操作减少锁竞争
常见使用场景 1 2 3 4 5 6 7 8 9 10 11 12 13 ConcurrentHashMap<String, LongAdder> counters = new ConcurrentHashMap <>(); counters.computeIfAbsent("pageView" , k -> new LongAdder ()).increment(); ConcurrentHashMap<String, Object> cache = new ConcurrentHashMap <>(); Object value = cache.computeIfAbsent(key, k -> loadFromDB(k));ConcurrentHashMap<String, Boolean> seen = new ConcurrentHashMap <>(); if (seen.putIfAbsent(item, Boolean.TRUE) == null ) { }
CopyOnWriteArrayList 写时复制的List实现,适合读多写少 的场景。
工作原理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 读操作:直接读取当前数组,无锁 ┌───┬───┬───┬───┐ 读线程 ──▶│ A │ B │ C │ D │ └───┴───┴───┴───┘ 写操作:复制新数组,修改后替换引用 ┌───┬───┬───┬───┐ 原数组 │ A │ B │ C │ D │ └───┴───┴───┴───┘ ↓ 复制 ┌───┬───┬───┬───┬───┐ 新数组 │ A │ B │ C │ D │ E │ ← 写入新元素 └───┴───┴───┴───┴───┘ ↓ 替换引用 array = newArray
基本用法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList <>(); list.add("apple" ); list.addAll(Arrays.asList("banana" , "orange" )); String first = list.get(0 );for (String item : list) { System.out.println(item); } for (String item : list) { if (item.equals("apple" )) { list.remove(item); } }
适用场景 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 CopyOnWriteArrayList<EventListener> listeners = new CopyOnWriteArrayList <>(); public void addListener (EventListener listener) { listeners.add(listener); } public void fireEvent (Event event) { for (EventListener listener : listeners) { listener.onEvent(event); } } CopyOnWriteArrayList<String> blacklist = new CopyOnWriteArrayList <>(); blacklist.addAll(loadBlacklistFromDB()); boolean blocked = blacklist.contains(userId);
注意事项
写操作代价高 :每次写都复制整个数组,O(n) 复杂度
内存占用 :写时会有两份数组
弱一致性 :迭代器看到的是创建时的快照
CopyOnWriteArraySet 基于 CopyOnWriteArrayList 实现的Set,特性相同。
1 2 3 4 CopyOnWriteArraySet<String> set = new CopyOnWriteArraySet <>(); set.add("apple" ); set.add("apple" ); System.out.println(set.size());
ConcurrentLinkedQueue 无界非阻塞队列,基于CAS实现,适合高并发场景。
1 2 3 4 5 6 7 8 9 ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue <>(); queue.offer("task1" ); queue.add("task2" ); String task = queue.poll(); String head = queue.peek();
与LinkedList的区别
特性
LinkedList
ConcurrentLinkedQueue
线程安全
否
是
实现方式
双向链表
单向链表 + CAS
阻塞操作
不支持
不支持
null元素
允许
不允许
BlockingQueue 阻塞队列 阻塞队列在队列空时获取会阻塞,队列满时插入会阻塞,非常适合生产者-消费者模式。
核心方法
操作
抛异常
返回特殊值
阻塞
超时
插入
add(e)
offer(e)
put(e)
offer(e, time, unit)
移除
remove()
poll()
take()
poll(time, unit)
检查
element()
peek()
-
-
ArrayBlockingQueue 有界阻塞队列,基于数组实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 ArrayBlockingQueue<String> queue = new ArrayBlockingQueue <>(100 ); new Thread (() -> { try { queue.put("product" ); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }).start(); new Thread (() -> { try { String product = queue.take(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }).start();
LinkedBlockingQueue 可选有界的阻塞队列,基于链表实现。
1 2 3 4 5 LinkedBlockingQueue<String> unbounded = new LinkedBlockingQueue <>(); LinkedBlockingQueue<String> bounded = new LinkedBlockingQueue <>(1000 );
ArrayBlockingQueue vs LinkedBlockingQueue
特性
ArrayBlockingQueue
LinkedBlockingQueue
底层结构
数组
链表
是否有界
必须指定容量
可选,默认无界
锁
单锁
双锁(头尾分离)
内存
预分配
动态分配
吞吐量
较低
较高
PriorityBlockingQueue 支持优先级的无界阻塞队列。
1 2 3 4 5 6 7 8 9 10 11 PriorityBlockingQueue<Task> queue = new PriorityBlockingQueue <>( 11 , Comparator.comparingInt(Task::getPriority) ); queue.put(new Task ("low" , 3 )); queue.put(new Task ("high" , 1 )); queue.put(new Task ("medium" , 2 )); queue.take();
DelayQueue 延迟队列,元素只有到期后才能被取出。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 class DelayedTask implements Delayed { private String name; private long executeTime; public DelayedTask (String name, long delayMs) { this .name = name; this .executeTime = System.currentTimeMillis() + delayMs; } @Override public long getDelay (TimeUnit unit) { long diff = executeTime - System.currentTimeMillis(); return unit.convert(diff, TimeUnit.MILLISECONDS); } @Override public int compareTo (Delayed o) { return Long.compare(this .executeTime, ((DelayedTask) o).executeTime); } } DelayQueue<DelayedTask> queue = new DelayQueue <>(); queue.put(new DelayedTask ("task1" , 5000 )); queue.put(new DelayedTask ("task2" , 1000 )); DelayedTask task = queue.take();
适用场景:
SynchronousQueue 不存储元素的阻塞队列,每个put必须等待一个take。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 SynchronousQueue<String> queue = new SynchronousQueue <>(); new Thread (() -> { try { queue.put("data" ); System.out.println("已交付" ); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }).start(); new Thread (() -> { try { Thread.sleep(1000 ); String data = queue.take(); System.out.println("收到: " + data); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }).start();
适用于直接传递的场景,如 Executors.newCachedThreadPool() 内部使用。
ConcurrentSkipListMap 基于跳表实现的并发有序Map,是 TreeMap 的线程安全替代品。
跳表结构 1 2 3 4 5 6 7 8 Level 3: 1 ─────────────────────────────▶ 9 Level 2: 1 ────────▶ 4 ─────────────────▶ 9 Level 1: 1 ───▶ 3 ─▶ 4 ───▶ 6 ─────────▶ 9 Level 0: 1 ─▶ 2 ─▶ 3 ─▶ 4 ─▶ 5 ─▶ 6 ─▶ 7 ─▶ 9 查找6:从Level 3开始,1→9太大,降级 Level 2:1→4→9太大,降级 Level 1:1→3→4→6,找到!
基本用法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 ConcurrentSkipListMap<Integer, String> map = new ConcurrentSkipListMap <>(); map.put(3 , "three" ); map.put(1 , "one" ); map.put(2 , "two" ); map.forEach((k, v) -> System.out.println(k + "=" + v)); SortedMap<Integer, String> subMap = map.subMap(1 , 3 ); map.firstKey(); map.lastKey(); map.higherKey(2 ); map.lowerKey(2 );
ConcurrentSkipListMap vs ConcurrentHashMap
特性
ConcurrentHashMap
ConcurrentSkipListMap
有序性
无序
按key有序
时间复杂度
O(1)
O(log n)
范围查询
不支持
支持
内存占用
较少
较多
null
不允许
不允许
ConcurrentSkipListSet 基于 ConcurrentSkipListMap 实现的并发有序Set。
1 2 3 4 5 6 7 8 9 10 11 12 13 ConcurrentSkipListSet<Integer> set = new ConcurrentSkipListSet <>(); set.add(3 ); set.add(1 ); set.add(2 ); for (Integer i : set) { System.out.println(i); } set.headSet(2 ); set.tailSet(2 );
如何选择并发集合 Map类 1 2 3 4 需要Map? ├── 需要有序吗? │ ├── 是 → ConcurrentSkipListMap │ └── 否 → ConcurrentHashMap(首选)
List类 1 2 3 4 需要List? ├── 读多写少? │ ├── 是 → CopyOnWriteArrayList │ └── 否 → Collections.synchronizedList() 或考虑其他方案
Queue类 1 2 3 4 5 6 7 8 9 10 需要Queue? ├── 需要阻塞? │ ├── 是 → BlockingQueue系列 │ │ ├── 需要优先级? → PriorityBlockingQueue │ │ ├── 需要延迟? → DelayQueue │ │ ├── 直接传递? → SynchronousQueue │ │ └── 普通队列? │ │ ├── 有界 → ArrayBlockingQueue │ │ └── 无界 → LinkedBlockingQueue │ └── 否 → ConcurrentLinkedQueue
Set类 1 2 3 4 需要Set? ├── 需要有序吗? │ ├── 是 → ConcurrentSkipListSet │ └── 否 → ConcurrentHashMap.newKeySet() 或 CopyOnWriteArraySet
总结
集合类
特点
适用场景
ConcurrentHashMap
高性能并发Map
通用并发Map场景
CopyOnWriteArrayList
写时复制
读多写少的List
CopyOnWriteArraySet
写时复制
读多写少的Set
ConcurrentLinkedQueue
无锁队列
高并发非阻塞队列
ArrayBlockingQueue
有界阻塞
生产者-消费者(需限流)
LinkedBlockingQueue
可选有界阻塞
生产者-消费者(高吞吐)
PriorityBlockingQueue
优先级阻塞
任务调度
DelayQueue
延迟阻塞
定时任务、缓存过期
SynchronousQueue
直接传递
线程间直接交换
ConcurrentSkipListMap
有序并发Map
需要范围查询的并发Map
ConcurrentSkipListSet
有序并发Set
需要排序的并发Set
选择原则:
优先使用并发集合 而非同步包装器
根据读写比例 选择:读多写少用CopyOnWrite系列
根据是否需要阻塞 选择:生产者-消费者用BlockingQueue
根据是否需要有序 选择:需要有序用SkipList系列