Java 并发集合
引言
在多线程环境下,普通的Java集合类(如ArrayList、HashMap等)并不是线程安全的。当多个线程同时访问这些集合时,可能会导致数据不一致或者抛出ConcurrentModificationException
异常。为了解决这个问题,Java提供了并发集合框架,专门用于多线程环境下的集合操作。
并发集合位于java.util.concurrent
包中,它们被设计为线程安全的,可以在多线程环境下高效地工作,无需额外的同步措施。
为什么需要并发集合?
让我们先通过一个简单的例子来理解为什么需要并发集合:
import java.util.ArrayList;
import java.util.List;
public class UnsafeCollectionExample {
public static void main(String[] args) throws InterruptedException {
List<Integer> list = new ArrayList<>();
// 创建两个线程,同时向列表中添加元素
Thread thread1 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
list.add(i);
}
});
Thread thread2 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
list.add(i);
}
});
thread1.start();
thread2.start();
thread1.join();
thread2.join();
System.out.println("预期大小: 2000, 实际大小: " + list.size());
}
}
运行这段代码,你会发现输出的集合大小通常小于2000,或者程序可能会抛出异常。这是因为ArrayList不是线程安全的,多线程同时操作会导致问题。
Java 并发集合概述
Java并发集合框架提供了多种线程安全的集合实现,主要包括:
- ConcurrentHashMap: HashMap的线程安全版本
- CopyOnWriteArrayList: ArrayList的线程安全版本
- CopyOnWriteArraySet: 基于CopyOnWriteArrayList实现的线程安全Set
- ConcurrentLinkedQueue: 线程安全的无界队列
- ConcurrentLinkedDeque: 线程安全的双向队列
- ArrayBlockingQueue: 基于数组的有界阻塞队列
- LinkedBlockingQueue: 基于链表的可选有界阻塞队列
- PriorityBlockingQueue: 线程安全的优先级队列
- DelayQueue: 延迟元素的无界阻塞队列
- SynchronousQueue: 没有内部容量的阻塞队列
这些集合类针对不同的使用场景进行了优化,下面我们详细介绍几个常用的并发集合。
ConcurrentHashMap
ConcurrentHashMap
是HashMap
的线程安全版本,它允许多个线程同时进行读操作,并且支持一定数量的线程安全地执行写操作。
特点
- 线程安全,无需额外同步
- 不允许null键或值
- 迭代器是弱一致性的,不会抛出ConcurrentModificationException
- 性能比同步的HashMap更高
使用示例
import java.util.concurrent.ConcurrentHashMap;
import java.util.Map;
public class ConcurrentHashMapExample {
public static void main(String[] args) throws InterruptedException {
// 创建ConcurrentHashMap
Map<String, Integer> map = new ConcurrentHashMap<>();
// 创建多个线程同时写入map
Thread thread1 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
map.put("Key" + i, i);
}
});
Thread thread2 = new Thread(() -> {
for (int i = 1000; i < 2000; i++) {
map.put("Key" + i, i);
}
});
thread1.start();
thread2.start();
thread1.join();
thread2.join();
System.out.println("Map大小: " + map.size()); // 应该输出2000
System.out.println("Key500的值: " + map.get("Key500"));
}
}
输出:
Map大小: 2000
Key500的值: 500
CopyOnWriteArrayList
CopyOnWriteArrayList
是ArrayList
的线程安全变体,它通过在每次修改操作时创建底层数组的新副本来实现线程安全。
特点
- 适用于读多写少的场景
- 写操作需要复制整个数组,性能较低
- 读操作不需要锁,性能高
- 迭代器支持快照语义,不会抛出ConcurrentModificationException
使用示例
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
public class CopyOnWriteArrayListExample {
public static void main(String[] args) throws InterruptedException {
List<String> list = new CopyOnWriteArrayList<>();
// 添加元素
list.add("Java");
list.add("Python");
list.add("C++");
// 创建一个线程用于遍历列表
Thread readThread = new Thread(() -> {
for (String language : list) {
System.out.println("读取: " + language);
try {
Thread.sleep(100); // 模拟读取操作耗时
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
// 创建一个线程用于修改列表
Thread writeThread = new Thread(() -> {
list.add("JavaScript");
System.out.println("添加: JavaScript");
list.remove("Python");
System.out.println("移除: Python");
});
readThread.start();
Thread.sleep(50); // 确保读线程先开始
writeThread.start();
readThread.join();
writeThread.join();
System.out.println("最终列表: " + list);
}
}
输出可能如下:
读取: Java
添加: JavaScript
移除: Python
读取: Python
读取: C++
最终列表: [Java, C++, JavaScript]
注意,尽管Python已被移除,读线程仍然可以读取到它,这是因为迭代器使用的是快照语义。
BlockingQueue接口及其实现
阻塞队列(BlockingQueue
)是Java并发包中非常重要的数据结构,主要用于生产者-消费者场景。
主要特点
- 当队列满时,插入操作会阻塞
- 当队列空时,获取操作会阻塞
- 提供超时机制的操作方法
常用实现
- ArrayBlockingQueue: 基于数组的有界阻塞队列
- LinkedBlockingQueue: 基于链表的可选有界阻塞队列
- PriorityBlockingQueue: 支持优先级的无界阻塞队列
- DelayQueue: 延迟元素的无界阻塞队列
- SynchronousQueue: 没有内部容量的阻塞队列
生产者-消费者示例
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ProducerConsumerExample {
public static void main(String[] args) {
// 创建容量为5的阻塞队列
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);
// 生产者线程
Thread producer = new Thread(() -> {
try {
for (int i = 1; i <= 10; i++) {
System.out.println("生产者生产: " + i);
queue.put(i); // 如果队列满了,这里会阻塞
Thread.sleep(100); // 生产速度
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 消费者线程
Thread consumer = new Thread(() -> {
try {
for (int i = 1; i <= 10; i++) {
int value = queue.take(); // 如果队列空了,这里会阻塞
System.out.println("消费者消费: " + value);
Thread.sleep(300); // 消费速度
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
producer.start();
consumer.start();
}
}
输出示例:
生产者生产: 1
消费者消费: 1
生产者生产: 2
生产者生产: 3
消费者消费: 2
生产者生产: 4
...
由于消费速度比生产慢,队列会逐渐填满,然后生产者会被阻塞直到有空间。
ConcurrentSkipListMap和ConcurrentSkipListSet
这两个类是java.util.concurrent
包中提供的可并发排序的集合实现。
特点
- 基于跳表(Skip List)数据结构实现
- 提供了线程安全的排序Map和Set
- 平均时间复杂度为O(log n)
- 不需要像
TreeMap
和TreeSet
那样使用显式锁
ConcurrentSkipListMap示例
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
public class ConcurrentSkipListMapExample {
public static void main(String[] args) throws InterruptedException {
// 创建ConcurrentSkipListMap
Map<Integer, String> map = new ConcurrentSkipListMap<>();
// 添加元素
map.put(3, "C");
map.put(1, "A");
map.put(2, "B");
map.put(5, "E");
map.put(4, "D");
// 遍历Map,会按照键的自然顺序排序
System.out.println("按键排序的结果:");
map.forEach((key, value) -> System.out.println(key + ": " + value));
}
}
输出:
按键排序的结果:
1: A
2: B
3: C
4: D
5: E
并发集合的性能考量
虽然并发集合提供了线程安全的保障,但它们在不同场景下的性能表现各不相同:
- ConcurrentHashMap: 读操作非常高效,写操作也相对高效
- CopyOnWriteArrayList: 读操作高效,但写操作开销大,适合读多写少的场景
- BlockingQueue: 用于生产者-消费者模式,不同实现在不同场景下性能各异
- ConcurrentSkipListMap: 适合需要并发且有序的场景,但比HashMap操作略慢
选择合适的并发集合时,应考虑:
- 是否需要线程安全
- 读写操作的比例
- 是否需要排序
- 是否需要阻塞功能
- 集合的大小和预期吞吐量
实际应用场景
1. 缓存系统
ConcurrentHashMap
常用于实现简单的缓存系统:
import java.util.concurrent.ConcurrentHashMap;
import java.util.Map;
public class SimpleCache<K, V> {
private final Map<K, V> cache = new ConcurrentHashMap<>();
public V get(K key) {
return cache.get(key);
}
public void put(K key, V value) {
cache.put(key, value);
}
public V remove(K key) {
return cache.remove(key);
}
public boolean contains(K key) {
return cache.containsKey(key);
}
public int size() {
return cache.size();
}
}
2. 任务调度系统
使用BlockingQueue
实现简单的任务调度:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TaskScheduler {
private final BlockingQueue<Runnable> taskQueue;
private final ExecutorService executor;
private volatile boolean running = true;
public TaskScheduler(int workerCount) {
this.taskQueue = new LinkedBlockingQueue<>();
this.executor = Executors.newFixedThreadPool(workerCount);
// 启动工作线程
for (int i = 0; i < workerCount; i++) {
executor.submit(() -> {
while (running) {
try {
Runnable task = taskQueue.take();
task.run();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
System.err.println("任务执行异常: " + e.getMessage());
}
}
});
}
}
public void scheduleTask(Runnable task) {
if (running) {
taskQueue.offer(task);
}
}
public void shutdown() {
running = false;
executor.shutdownNow();
}
}
3. 事件处理系统
使用CopyOnWriteArrayList
实现简单的事件监听器系统:
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
// 事件接口
interface Event {
String getName();
}
// 事件监听器接口
interface EventListener {
void onEvent(Event event);
}
// 事件分发器
class EventDispatcher {
private final List<EventListener> listeners = new CopyOnWriteArrayList<>();
public void addListener(EventListener listener) {
listeners.add(listener);
}
public void removeListener(EventListener listener) {
listeners.remove(listener);
}
public void dispatchEvent(Event event) {
for (EventListener listener : listeners) {
listener.onEvent(event);
}
}
}
总结
Java并发集合框架提供了丰富的线程安全集合实现,它们在多线程环境下可以高效工作,无需额外的同步措施。选择合适的并发集合类可以显著提高多线程程序的性能和可靠性。
主要并发集合及其特点:
- ConcurrentHashMap: 适合高并发的哈希表操作
- CopyOnWriteArrayList: 适合读多写少的列表操作
- BlockingQueue家族: 适合生产者-消费者模式
- ConcurrentSkipListMap/Set: 适合需要排序的并发集合操作
在使用并发集合时,应根据具体场景选择合适的实现,并理解其性能特点和权衡因素。
练习
- 使用
ConcurrentHashMap
实现一个简单的计数器,统计不同单词在多线程环境下出现的次数。 - 实现一个使用
ArrayBlockingQueue
的生产者-消费者模型,模拟订单处理系统。 - 使用
CopyOnWriteArrayList
实现一个线程安全的观察者模式。 - 比较
HashMap
与ConcurrentHashMap
在多线程环境下的性能差异。 - 使用
DelayQueue
实现一个定时任务系统,任务可以在指定延迟后执行。
相关资源
- Java官方文档 - Concurrent Collections
- Java Concurrency in Practice - Brian Goetz等著
即使使用了并发集合,也需要注意避免复合操作可能导致的线程安全问题。例如,"检查后再执行"(check-then-act)操作需要额外的同步措施。