
在java并行流中对共享可变状态(如外部列表)进行操作时,由于多线程并发访问,可能导致不可预测的行为,例如`list.size()`的非预期变化。本文将深入探讨并行流中状态操作引发的竞态条件,并提供使用并发锁等机制进行有效控制的方法,以确保数据一致性和程序正确性。
理解Java并行流与状态操作
Java 8引入的Stream API极大地简化了集合操作。并行流(Parallel Stream)是Stream API的一个强大特性,它允许我们将流操作并行化,从而利用多核处理器的优势来提高处理速度。然而,并行流的强大能力也伴随着对并发编程的挑战。
当流操作是“无状态的”(stateless)时,即每个元素的操作独立于其他元素,并且不修改任何外部共享状态时,并行流能很好地工作。但如果流操作是“有状态的”(stateful),例如在lambda表达式中访问或修改一个外部变量(如一个List),那么就可能引入并发问题。
考虑以下示例代码,它尝试在一个并行流中根据条件向一个外部List添加元素:
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
public class ParallelStreamStatefulExample {
static void statefulParallelLambdaSetProblem() {
Set s = new HashSet<>(
Arrays.asList(1, 2, 3, 4, 5, 6)
);
List list = new ArrayList<>();
int sum = s.parallelStream().mapToInt(e -> {
// 问题:list.size() 在管道操作执行期间可能发生变化
// mapToInt 的 lambda 表达式依赖于此值,因此它是“有状态的”
if (list.size() <= 3) {
list.add(e);
return e;
} else {
return 0;
}
}).sum();
System.out.println("计算结果 sum: " + sum);
System.out.println("最终 list: " + list);
System.out.println("最终 list size: " + list.size());
}
public static void main(String[] args) {
statefulParallelLambdaSetProblem();
}
} 在上述代码中,list.size()和list.add(e)都在并行流的lambda表达式中被访问和修改。由于并行流会使用多个线程同时处理数据,这些对共享list的操作会交错执行,导致不可预测的结果。
立即学习“Java免费学习笔记(深入)”;
竞态条件:list.size()变化之谜
当多个线程同时访问和修改同一个共享资源,并且至少有一个操作是写入操作时,如果这些操作的最终结果取决于线程执行的时序,就称之为发生了“竞态条件”(Race Condition)。在上述示例中,list.size()的非预期变化正是竞态条件的一个典型表现。
具体来说,当一个线程执行if (list.size()
这种线程执行顺序的不确定性,加上对非线程安全的ArrayList的并发修改,使得list.size()的值在不同的执行时刻和不同的线程看来可能不同,最终导致:
- list.size()
- list中实际添加的元素可能超过3个,甚至可能因为ArrayList的非线程安全特性而抛出ConcurrentModificationException或导致内部数据结构损坏。
- 每次运行程序,sum的值和list中的内容都可能不同。
规避竞态条件:并发控制机制
为了解决并行流中状态操作引发的竞态条件,我们需要引入并发控制机制,确保对共享资源的访问是同步的(Synchronized)和原子性的(Atomic)。Java提供了多种并发工具,其中最常用的是synchronized关键字和java.util.concurrent.locks包下的锁。
使用 synchronized 关键字
synchronized关键字可以用于方法或代码块,确保在任何给定时刻只有一个线程可以执行被同步的代码。
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
public class ParallelStreamStatefulExampleSynchronized {
static void statefulParallelLambdaSetSynchronized() {
Set s = new HashSet<>(
Arrays.asList(1, 2, 3, 4, 5, 6)
);
List list = new ArrayList<>();
// 使用一个专门的锁对象,或者直接同步在list对象上(如果list本身不是线程安全的,需要谨慎)
// 这里为了清晰,使用一个独立的锁对象
final Object lock = new Object();
int sum = s.parallelStream().mapToInt(e -> {
int result = 0;
synchronized (lock) { // 同步访问 list.size() 和 list.add()
if (list.size() <= 3) {
list.add(e);
result = e;
}
}
return result;
}).sum();
System.out.println("同步后的 sum: " + sum);
System.out.println("同步后的 list: " + list);
System.out.println("同步后的 list size: " + list.size());
}
public static void main(String[] args) {
statefulParallelLambdaSetSynchronized();
}
} 通过将if (list.size()
使用 java.util.concurrent.locks.ReentrantLock
ReentrantLock提供了比synchronized更灵活的锁定机制,例如可以尝试获取锁、定时获取锁等。
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ParallelStreamStatefulExampleReentrantLock {
static void statefulParallelLambdaSetReentrantLock() {
Set s = new HashSet<>(
Arrays.asList(1, 2, 3, 4, 5, 6)
);
List list = new ArrayList<>();
final Lock listLock = new ReentrantLock(); // 创建一个可重入锁
int sum = s.parallelStream().mapToInt(e -> {
int result = 0;
listLock.lock(); // 获取锁
try {
if (list.size() <= 3) {
list.add(e);
result = e;
}
} finally {
listLock.unlock(); // 确保在finally块中释放锁
}
return result;
}).sum();
System.out.println("ReentrantLock 同步后的 sum: " + sum);
System.out.println("ReentrantLock 同步后的 list: " + list);
System.out.println("ReentrantLock 同步后的 list size: " + list.size());
}
public static void main(String[] args) {
statefulParallelLambdaSetReentrantLock();
}
} 使用ReentrantLock时,需要手动调用lock()获取锁和unlock()释放锁,并且通常建议将unlock()放在finally块中,以确保在发生异常时也能正确释放锁。
注意事项与最佳实践
- 避免状态操作: 最好的解决方案是尽可能避免在并行流中执行有状态的操作。如果需要收集结果,考虑使用Collectors提供的并发收集器,如Collectors.toConcurrentMap()、Collectors.groupingByConcurrent()等,它们内部已经处理了并发问题。
- 性能开销: 引入锁机制会带来额外的性能开销,因为它会序列化对共享资源的访问,这可能抵消并行流带来的部分性能优势。如果同步块非常大或者竞争激烈,并行流的性能甚至可能低于串行流。
- 线程安全集合: 如果需要向集合中添加元素,可以考虑使用线程安全的集合类,如java.util.concurrent.CopyOnWriteArrayList或java.util.concurrent.ConcurrentLinkedQueue,但它们有各自的适用场景和性能特点。
- 原子操作: 对于简单的计数器或布尔标志,可以使用java.util.concurrent.atomic包下的原子类(如AtomicInteger、AtomicLong)来避免使用锁,它们提供了无锁的原子操作,性能通常更好。
- 串行流的确定性: 即使是串行流,如果源数据(如HashSet)的迭代顺序不确定,那么每次运行得到的最终结果(例如sum和list的内容)也可能不同,但这与并行流中的竞态条件是不同的概念。串行流不会有list.size()在单次操作中“意外”变化的竞态问题。
总结
Java并行流是提高程序性能的强大工具,但它要求开发者对并发编程有深入的理解。在并行流中使用有状态操作,特别是对共享可变状态进行读写时,极易引发竞态条件,导致程序行为不可预测。通过理解竞态条件的本质,并合理运用synchronized关键字或java.util.concurrent.locks包下的锁机制,我们可以有效地控制并发访问,确保数据的一致性和程序的正确性。然而,最好的实践是尽量设计无状态的流操作,或利用Java并发API提供的线程安全结构,以最小化锁的开销,充分发挥并行流的优势。










