跳到主要内容

Java 并发集合

引言

在多线程环境下,普通的Java集合类(如ArrayList、HashMap等)并不是线程安全的。当多个线程同时访问这些集合时,可能会导致数据不一致或者抛出ConcurrentModificationException异常。为了解决这个问题,Java提供了并发集合框架,专门用于多线程环境下的集合操作。

并发集合位于java.util.concurrent包中,它们被设计为线程安全的,可以在多线程环境下高效地工作,无需额外的同步措施。

为什么需要并发集合?

让我们先通过一个简单的例子来理解为什么需要并发集合:

java
import java.util.ArrayList;
import java.util.List;

public class UnsafeCollectionExample {
public static void main(String[] args) throws InterruptedException {
List<Integer> list = new ArrayList<>();

// 创建两个线程,同时向列表中添加元素
Thread thread1 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
list.add(i);
}
});

Thread thread2 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
list.add(i);
}
});

thread1.start();
thread2.start();

thread1.join();
thread2.join();

System.out.println("预期大小: 2000, 实际大小: " + list.size());
}
}

运行这段代码,你会发现输出的集合大小通常小于2000,或者程序可能会抛出异常。这是因为ArrayList不是线程安全的,多线程同时操作会导致问题。

Java 并发集合概述

Java并发集合框架提供了多种线程安全的集合实现,主要包括:

  1. ConcurrentHashMap: HashMap的线程安全版本
  2. CopyOnWriteArrayList: ArrayList的线程安全版本
  3. CopyOnWriteArraySet: 基于CopyOnWriteArrayList实现的线程安全Set
  4. ConcurrentLinkedQueue: 线程安全的无界队列
  5. ConcurrentLinkedDeque: 线程安全的双向队列
  6. ArrayBlockingQueue: 基于数组的有界阻塞队列
  7. LinkedBlockingQueue: 基于链表的可选有界阻塞队列
  8. PriorityBlockingQueue: 线程安全的优先级队列
  9. DelayQueue: 延迟元素的无界阻塞队列
  10. SynchronousQueue: 没有内部容量的阻塞队列

这些集合类针对不同的使用场景进行了优化,下面我们详细介绍几个常用的并发集合。

ConcurrentHashMap

ConcurrentHashMapHashMap的线程安全版本,它允许多个线程同时进行读操作,并且支持一定数量的线程安全地执行写操作。

特点

  1. 线程安全,无需额外同步
  2. 不允许null键或值
  3. 迭代器是弱一致性的,不会抛出ConcurrentModificationException
  4. 性能比同步的HashMap更高

使用示例

java
import java.util.concurrent.ConcurrentHashMap;
import java.util.Map;

public class ConcurrentHashMapExample {
public static void main(String[] args) throws InterruptedException {
// 创建ConcurrentHashMap
Map<String, Integer> map = new ConcurrentHashMap<>();

// 创建多个线程同时写入map
Thread thread1 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
map.put("Key" + i, i);
}
});

Thread thread2 = new Thread(() -> {
for (int i = 1000; i < 2000; i++) {
map.put("Key" + i, i);
}
});

thread1.start();
thread2.start();

thread1.join();
thread2.join();

System.out.println("Map大小: " + map.size()); // 应该输出2000
System.out.println("Key500的值: " + map.get("Key500"));
}
}

输出:

Map大小: 2000
Key500的值: 500

CopyOnWriteArrayList

CopyOnWriteArrayListArrayList的线程安全变体,它通过在每次修改操作时创建底层数组的新副本来实现线程安全。

特点

  1. 适用于读多写少的场景
  2. 写操作需要复制整个数组,性能较低
  3. 读操作不需要锁,性能高
  4. 迭代器支持快照语义,不会抛出ConcurrentModificationException

使用示例

java
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

public class CopyOnWriteArrayListExample {
public static void main(String[] args) throws InterruptedException {
List<String> list = new CopyOnWriteArrayList<>();

// 添加元素
list.add("Java");
list.add("Python");
list.add("C++");

// 创建一个线程用于遍历列表
Thread readThread = new Thread(() -> {
for (String language : list) {
System.out.println("读取: " + language);
try {
Thread.sleep(100); // 模拟读取操作耗时
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});

// 创建一个线程用于修改列表
Thread writeThread = new Thread(() -> {
list.add("JavaScript");
System.out.println("添加: JavaScript");
list.remove("Python");
System.out.println("移除: Python");
});

readThread.start();
Thread.sleep(50); // 确保读线程先开始
writeThread.start();

readThread.join();
writeThread.join();

System.out.println("最终列表: " + list);
}
}

输出可能如下:

读取: Java
添加: JavaScript
移除: Python
读取: Python
读取: C++
最终列表: [Java, C++, JavaScript]

注意,尽管Python已被移除,读线程仍然可以读取到它,这是因为迭代器使用的是快照语义。

BlockingQueue接口及其实现

阻塞队列(BlockingQueue)是Java并发包中非常重要的数据结构,主要用于生产者-消费者场景。

主要特点

  1. 当队列满时,插入操作会阻塞
  2. 当队列空时,获取操作会阻塞
  3. 提供超时机制的操作方法

常用实现

  • ArrayBlockingQueue: 基于数组的有界阻塞队列
  • LinkedBlockingQueue: 基于链表的可选有界阻塞队列
  • PriorityBlockingQueue: 支持优先级的无界阻塞队列
  • DelayQueue: 延迟元素的无界阻塞队列
  • SynchronousQueue: 没有内部容量的阻塞队列

生产者-消费者示例

java
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class ProducerConsumerExample {
public static void main(String[] args) {
// 创建容量为5的阻塞队列
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);

// 生产者线程
Thread producer = new Thread(() -> {
try {
for (int i = 1; i <= 10; i++) {
System.out.println("生产者生产: " + i);
queue.put(i); // 如果队列满了,这里会阻塞
Thread.sleep(100); // 生产速度
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});

// 消费者线程
Thread consumer = new Thread(() -> {
try {
for (int i = 1; i <= 10; i++) {
int value = queue.take(); // 如果队列空了,这里会阻塞
System.out.println("消费者消费: " + value);
Thread.sleep(300); // 消费速度
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});

producer.start();
consumer.start();
}
}

输出示例:

生产者生产: 1
消费者消费: 1
生产者生产: 2
生产者生产: 3
消费者消费: 2
生产者生产: 4
...

由于消费速度比生产慢,队列会逐渐填满,然后生产者会被阻塞直到有空间。

ConcurrentSkipListMap和ConcurrentSkipListSet

这两个类是java.util.concurrent包中提供的可并发排序的集合实现。

特点

  1. 基于跳表(Skip List)数据结构实现
  2. 提供了线程安全的排序Map和Set
  3. 平均时间复杂度为O(log n)
  4. 不需要像TreeMapTreeSet那样使用显式锁

ConcurrentSkipListMap示例

java
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;

public class ConcurrentSkipListMapExample {
public static void main(String[] args) throws InterruptedException {
// 创建ConcurrentSkipListMap
Map<Integer, String> map = new ConcurrentSkipListMap<>();

// 添加元素
map.put(3, "C");
map.put(1, "A");
map.put(2, "B");
map.put(5, "E");
map.put(4, "D");

// 遍历Map,会按照键的自然顺序排序
System.out.println("按键排序的结果:");
map.forEach((key, value) -> System.out.println(key + ": " + value));
}
}

输出:

按键排序的结果:
1: A
2: B
3: C
4: D
5: E

并发集合的性能考量

虽然并发集合提供了线程安全的保障,但它们在不同场景下的性能表现各不相同:

  1. ConcurrentHashMap: 读操作非常高效,写操作也相对高效
  2. CopyOnWriteArrayList: 读操作高效,但写操作开销大,适合读多写少的场景
  3. BlockingQueue: 用于生产者-消费者模式,不同实现在不同场景下性能各异
  4. ConcurrentSkipListMap: 适合需要并发且有序的场景,但比HashMap操作略慢
选择建议

选择合适的并发集合时,应考虑:

  1. 是否需要线程安全
  2. 读写操作的比例
  3. 是否需要排序
  4. 是否需要阻塞功能
  5. 集合的大小和预期吞吐量

实际应用场景

1. 缓存系统

ConcurrentHashMap常用于实现简单的缓存系统:

java
import java.util.concurrent.ConcurrentHashMap;
import java.util.Map;

public class SimpleCache<K, V> {
private final Map<K, V> cache = new ConcurrentHashMap<>();

public V get(K key) {
return cache.get(key);
}

public void put(K key, V value) {
cache.put(key, value);
}

public V remove(K key) {
return cache.remove(key);
}

public boolean contains(K key) {
return cache.containsKey(key);
}

public int size() {
return cache.size();
}
}

2. 任务调度系统

使用BlockingQueue实现简单的任务调度:

java
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TaskScheduler {
private final BlockingQueue<Runnable> taskQueue;
private final ExecutorService executor;
private volatile boolean running = true;

public TaskScheduler(int workerCount) {
this.taskQueue = new LinkedBlockingQueue<>();
this.executor = Executors.newFixedThreadPool(workerCount);

// 启动工作线程
for (int i = 0; i < workerCount; i++) {
executor.submit(() -> {
while (running) {
try {
Runnable task = taskQueue.take();
task.run();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
System.err.println("任务执行异常: " + e.getMessage());
}
}
});
}
}

public void scheduleTask(Runnable task) {
if (running) {
taskQueue.offer(task);
}
}

public void shutdown() {
running = false;
executor.shutdownNow();
}
}

3. 事件处理系统

使用CopyOnWriteArrayList实现简单的事件监听器系统:

java
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

// 事件接口
interface Event {
String getName();
}

// 事件监听器接口
interface EventListener {
void onEvent(Event event);
}

// 事件分发器
class EventDispatcher {
private final List<EventListener> listeners = new CopyOnWriteArrayList<>();

public void addListener(EventListener listener) {
listeners.add(listener);
}

public void removeListener(EventListener listener) {
listeners.remove(listener);
}

public void dispatchEvent(Event event) {
for (EventListener listener : listeners) {
listener.onEvent(event);
}
}
}

总结

Java并发集合框架提供了丰富的线程安全集合实现,它们在多线程环境下可以高效工作,无需额外的同步措施。选择合适的并发集合类可以显著提高多线程程序的性能和可靠性。

主要并发集合及其特点:

  1. ConcurrentHashMap: 适合高并发的哈希表操作
  2. CopyOnWriteArrayList: 适合读多写少的列表操作
  3. BlockingQueue家族: 适合生产者-消费者模式
  4. ConcurrentSkipListMap/Set: 适合需要排序的并发集合操作

在使用并发集合时,应根据具体场景选择合适的实现,并理解其性能特点和权衡因素。

练习

  1. 使用ConcurrentHashMap实现一个简单的计数器,统计不同单词在多线程环境下出现的次数。
  2. 实现一个使用ArrayBlockingQueue的生产者-消费者模型,模拟订单处理系统。
  3. 使用CopyOnWriteArrayList实现一个线程安全的观察者模式。
  4. 比较HashMapConcurrentHashMap在多线程环境下的性能差异。
  5. 使用DelayQueue实现一个定时任务系统,任务可以在指定延迟后执行。

相关资源

注意事项

即使使用了并发集合,也需要注意避免复合操作可能导致的线程安全问题。例如,"检查后再执行"(check-then-act)操作需要额外的同步措施。