LinkedBlockingQueue 死循环

​ 上线前压测发现业务线程 hang 死在 LinkedBlockingQueue,同时另外一条线程在 LinkedBlockingQueue.forEach() 中死循环了。看来是触发了 JDK(1.8.0_261) 的 bug。

定位 Bug

Dump 线程栈找到死循环点

Dump 下线程,找到死循环的堆栈如下:

1
2
3
4
"b-thread-1@1" prio=5 tid=0x1 nid=NA runnable
java.lang.Thread.State: RUNNABLE
at java.util.concurrent.LinkedBlockingQueue$LBQSpliterator.forEachRemaining(LinkedBlockingQueue.java:926)
at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)

对应的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public void forEachRemaining(Consumer<? super E> action) {
if (action == null) throw new NullPointerException();
final LinkedBlockingQueue<E> q = this.queue;
if (!exhausted) {
exhausted = true;
Node<E> p = current;
do {
E e = null;
q.fullyLock(); // 获取锁
try {
if (p == null)
p = q.head.next;
while (p != null) {
e = p.item; // 在这里死循环了
p = p.next;
if (e != null)
break;
}
} finally {
q.fullyUnlock();
}
if (e != null)
action.accept(e);
} while (p != null);
}
}

分析死循环原因

​ LinkedBlockingQueue 是基于链表实现的阻塞队列,根据死循环点的代码,猜测是跟 HashMap 一样由于自身成环无法退出循环条件。开发环境随便模拟了一下多个线程读写的情况,果然出现了死循环的情况,打个断点看内存,真的是自身成环了。
接下来查找何处代码会导致自身循环,dequeue的实现逻辑会把出队的节点设置成自身成环 help gc

1
2
3
4
5
6
7
8
9
10
11
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}

​ 但是问题是调用 dequeue 的上层代码是加了锁的,forEachRemaining 也是加了锁的,产生并发的点是哪里呢?
​ 回过头再来看 forEachRemaining 的代码逻辑,这个方法是 JDK8 为了支持 Stream 新增的代码,其逻辑是先获取 fullyLock*,拿到节点的 item,然后释放锁,执行 *accept 逻辑处理节点内容,然后继续执行以上步骤直到节点都遍历完成。
乍一看代码逻辑严谨,但是有个变量 p 在释放锁后逸出,并且在重新获取锁后没有去判断这个引用指向的对象是否安全。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public void forEachRemaining(Consumer<? super E> action) {
if (action == null) throw new NullPointerException();
final LinkedBlockingQueue<E> q = this.queue;
if (!exhausted) {
exhausted = true;
Node<E> p = current; // 变量 p 逸出
do {
E e = null;
q.fullyLock();
try {
if (p == null)
p = q.head.next;
while (p != null) {
e = p.item;
p = p.next;
if (e != null)
break;
}
} finally {
q.fullyUnlock();
}
if (e != null)
action.accept(e);
} while (p != null);
}
}
精准复现 Bug

根据 dequeueforEachRemaining 的代码分析,产生死循环的条件如下:

  • 队列中要 >= 3个元素
  • forEachRemaining 逻辑先 获取锁-> 释放锁,变量 p (即队列中第二个元素的引用) 逃逸出来,与此同时,另外一个线程马上取出队列中的所有元素。

满足以上条件时候,逃逸出锁的变量 p 就会自身成环,继续执行 forEachRemaining 就进入死循环了。
ps: 为什么必须要 >=3 个元素呢?因为 dequeue 的出队逻辑是把当前要出队的节点 item 置为 null,然后把它作为 head,原有的 head 节点自身成环,废弃掉。所以必须是成为过 head 节点,然后又被废弃的节点,同时还被 forEachRemaining 逃逸引用的节点才能满足死循环的条件。

精确复现代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public void testLBQInfiniteLoop() {
final LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
queue.add(1);queue.add(2);queue.add(3);
final Object notify = new Object();
new Thread(() -> {
synchronized (notify) {
try {
notify.wait();
System.out.println("Start to take.");
queue.take();queue.take();queue.take();
System.out.println("Take all.");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
queue.stream().forEach(e -> {
synchronized (notify) {
notify.notifyAll();
System.out.println("Notify to start take.");
}
System.out.println("Time consuming operation." + e);
try {
Thread.sleep(1000);
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
});
}

修复 Bug

​ 以上已经分析了 bug 的原因,需要处理的是 forEachRemaining 中锁外逸出的变量的不安全使用。直接看一下新版的 JDK 是否有修复这个 bug。
切换 JDK 到 -> 11.0.5,跑测试用例,用例正常结束,再来看相关逻辑的修改:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
void forEachFrom(Consumer<? super E> action, Node<E> p) {
// Extract batches of elements while holding the lock; then
// run the action on the elements while not
final int batchSize = 64; // max number of elements per batch
Object[] es = null; // container for batch of elements
int n, len = 0;
do {
fullyLock();
try {
if (es == null) {
if (p == null) p = head.next;
for (Node<E> q = p; q != null; q = succ(q))
if (q.item != null && ++len == batchSize)
break;
es = new Object[len];
}
for (n = 0; p != null && n < len; p = succ(p))
if ((es[n] = p.item) != null)
n++;
} finally {
fullyUnlock();
}
for (int i = 0; i < n; i++) {
@SuppressWarnings("unchecked") E e = (E) es[i];
action.accept(e);
}
} while (n > 0 && p != null);
}

​ 首先,已经修改成批量取出到数组(batchSize = 64)进行 accept 逻辑,避免高并发下频繁获取,释放锁造成的竞争,但是同时变量 p 依然是在锁是否后逸出的啊,为什么用例不会产生死循环了呢?

再来看 succ的逻辑:

1
2
3
4
5
Node<E> succ(Node<E> p) {
if (p == (p = p.next))
p = head.next;
return p;
}

原来是在这里,当发现 p 在释放锁期间成环,从头部重新获取队列头元素。搞定,bug 修复。