ForkJoinPool.java中pool方法源码如下:
final ForkJoinTask<?> poll() {
int b, k, cap; ForkJoinTask<?>[] a;
while ((a = array) != null && (cap = a.length) > 0 &&
top - (b = base) > 0) {
ForkJoinTask<?> t = (ForkJoinTask<?>)
QA.getAcquire(a, k = (cap - 1) & b);
if (base == b++) {
if (t == null)
Thread.yield(); // await index advance
else if (QA.compareAndSet(a, k, t, null)) {
BASE.setOpaque(this, b);
return t;
}
}
}
return null;
}
poll 方法,用于从当前线程的工作队列中获取并返回一个任务。该方法的核心逻辑是从任务数组中取出一个任务,并确保线程安全地移除该任务。在特定条件下,Thread.yield()忙等待导致线程不断让出 CPU 资源,而无法继续执行其他任务。
1. 方法签名和参数
final ForkJoinTask<?> poll()
poll:这是一个WorkQueue类中的方法,用于从当前工作队列中获取并返回一个任务。- 返回值:如果成功获取到任务,则返回该任务;否则返回
null。
2. 主要逻辑分析
2.1 检查任务数组
while ((a = array) != null && (cap = a.length) > 0 &&
top - (b = base) > 0) {
array:这是当前工作队列的任务数组,存储了尚未处理的任务。cap = a.length:任务数组的容量。top - b > 0:检查任务数组中是否有未处理的任务。top表示任务数组中的任务数量,base表示已经处理的任务数量。如果top - base > 0,说明队列中有未处理的任务。
2.2 获取任务
ForkJoinTask<?> t = (ForkJoinTask<?>) QA.getAcquire(a, k = (cap - 1) & b);
k = (cap - 1) & b:计算任务数组中的索引k,使用位运算将base映射到数组的有效索引范围内。QA.getAcquire(a, k):从任务数组中获取任务t。QA是ForkJoinTask的静态字段,表示任务数组的访问器(Accessor),getAcquire是一种内存屏障操作,确保读取任务时的可见性。
2.3 检查并更新 base
if (base == b++) {
base == b:确保在获取任务后,base没有被其他线程修改。b++是后置递增操作,意味着在比较后base会被递增,表示已经处理了一个任务。
2.4 处理任务为空的情况
if (t == null)
Thread.yield(); // await index advance
t == null:如果从任务数组中获取的任务t为null,说明任务已经被其他线程移除或尚未准备好。Thread.yield():在这种情况下,线程会调用Thread.yield(),主动让出 CPU 资源,等待其他线程完成任务的插入或移除操作,从而使base或top发生变化。
2.5 使用 CAS 移除任务
else if (QA.compareAndSet(a, k, t, null)) {
BASE.setOpaque(this, b);
return t;
}
QA.compareAndSet(a, k, t, null):使用 CAS(Compare-And-Swap)操作将任务t从任务数组中移除。这确保了多线程环境下的线程安全。BASE.setOpaque(this, b):更新当前工作队列的base,表示已经处理了一个任务。return t:返回窃取到的任务t,结束poll方法。
2.6 返回 null
return null;
- 如果任务数组为空或没有可处理的任务,
poll方法返回null,表示当前工作队列中没有任务可以处理。
3. 为什么线程会一直执行 Thread.yield()?
线程会一直执行 Thread.yield() 的主要原因是在某些情况下,base 和 top 之间的差值大于 0,但实际从任务数组中获取的任务 t 为 null。这可能是由于以下几种情况:
3.1 任务已被其他线程移除
- 当多个线程同时访问同一个任务数组时,可能会发生竞争条件。例如,一个线程刚刚检查了
top - base > 0,但在它尝试获取任务t之前,另一个线程已经通过 CAS 操作移除了该任务。因此,当前线程获取到的任务t为null。 - 在这种情况下,当前线程会调用
Thread.yield(),主动让出 CPU 资源,等待其他线程完成任务的插入或移除操作,从而使base或top发生变化。
3.2 任务尚未准备好
- 在某些情况下,任务可能还没有完全准备好,或者任务数组中的某个位置暂时没有任务。例如,任务可能正在被其他线程插入,但尚未完成插入操作(oom可能导致插入线程异常,进而导致状态异常)。
- 在这种情况下,
poll方法会不断尝试获取任务,但由于任务尚未准备好,t仍然为null,导致线程不断调用Thread.yield()。
3.3 任务数组的竞争
ForkJoinPool中的工作队列是共享的,多个线程可能会同时访问同一个任务数组。当多个线程同时尝试从同一个任务数组中获取任务时,可能会发生竞争,导致某些线程频繁获取到null。- 这种竞争会导致线程不断调用
Thread.yield(),试图让出 CPU 资源,等待其他线程完成任务的插入或移除操作。
3.4 死锁或活锁
- 在极端情况下,如果多个线程之间存在复杂的依赖关系,可能会导致死锁或活锁。例如,线程 A 等待线程 B 完成任务插入,而线程 B 又等待线程 A 完成任务移除。这种情况下,线程可能会陷入无限循环,不断调用
Thread.yield(),但实际上没有任何进展。
4. 如何避免线程频繁调用 Thread.yield()?
为了避免线程频繁调用 Thread.yield(),可以考虑以下优化策略:
4.1 减少任务数组的竞争
- 增加任务队列的数量:通过增加
ForkJoinPool的并行度,可以减少每个线程共享同一个任务数组的情况,从而减少竞争。 - 使用独立的任务队列:尽量让每个线程有自己的任务队列,减少多个线程同时访问同一个任务数组的机会。
4.2 优化任务插入和移除的顺序
- 批量插入任务:如果可能,尽量批量插入任务,而不是逐个插入。这样可以减少任务数组的访问频率,降低竞争的可能性。
- 批量移除任务:类似地,尽量批量移除任务,减少 CAS 操作的次数。
4.3 使用更高效的同步机制
- 使用无锁数据结构:考虑使用更高效的无锁数据结构(如
ConcurrentLinkedQueue或Deque)来管理任务队列,减少 CAS 操作的开销。 - 减少 CAS 操作的频率:尽量减少 CAS 操作的频率,避免频繁的失败重试。
4.4 调整 Thread.yield() 的调用频率
- 引入自旋等待:可以在
Thread.yield()之前引入短暂的自旋等待(spin-wait),给其他线程更多的时间完成任务的插入或移除操作。例如,可以使用LockSupport.parkNanos()来实现微秒级别的等待。 - 动态调整
Thread.yield()的调用频率:可以根据当前任务队列的状态动态调整Thread.yield()的调用频率。例如,如果多次连续获取到null,可以增加Thread.yield()的调用频率,反之则减少。
5. 总结
poll方法的作用:poll方法用于从当前线程的工作队列中获取并返回一个任务。它通过 CAS 操作确保线程安全地移除任务,并在任务为空时调用Thread.yield()让出 CPU 资源。Thread.yield()的原因:线程会一直执行Thread.yield()的原因是任务数组中可能存在竞争条件,导致多个线程同时访问同一个任务数组,或者任务尚未准备好。此外,任务数组的竞争、死锁或活锁也可能导致线程频繁调用Thread.yield()。- 优化建议:为了减少线程频繁调用
Thread.yield(),可以考虑减少任务数组的竞争、优化任务插入和移除的顺序、使用更高效的同步机制,以及调整Thread.yield()的调用频率。
