开发技能Java线程并发无界队列(Unbounded Queue)详解

在 Java 并发编程中,无界队列是指理论上容量没有上限的阻塞队列。它不会因为达到容量而阻塞生产者(任务提交者),可以无限制地添加元素。

1. 常见实现类

实现类是否无界说明
LinkedBlockingQueue可选默认构造函数创建无界队列(容量 = Integer.MAX_VALUE
LinkedTransferQueue无界无容量限制
PriorityBlockingQueue无界基于优先级堆,无容量上限
DelayQueue无界元素需实现 Delayed 接口
SynchronousQueue有界(容量 0)不属于无界队列
ArrayBlockingQueue有界必须指定容量

注意LinkedBlockingQueue 如果不指定容量,则使用 Integer.MAX_VALUE(约 21 亿),在实际内存耗尽之前可以认为是“无界”的。

2. 在线程池中的行为(以 ThreadPoolExecutor 为例)

线程池的核心参数:

  • corePoolSize:核心线程数

  • maximumPoolSize:最大线程数

  • workQueue:任务队列

当使用无界队列时

  • 如果当前运行的线程数 < corePoolSize,直接创建新线程执行任务。

  • 如果当前线程数 >= corePoolSize,新任务会永远进入队列等待,而不会创建超出 corePoolSize 的线程。

  • maximumPoolSize 参数失效,因为队列永远不会满,所以不会触发扩容到 maximumPoolSize

  • 拒绝策略(如 AbortPolicy永远不会被触发,除非队列真的满了(但无界队列几乎不可能满)。

示例:Executors.newFixedThreadPool(10)


public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

这里使用的是无界队列,所以即使任务提交速度远超处理速度,队列也会不断膨胀。

3. 无界队列的风险

  • 内存溢出(OOM):如果任务提交速度持续高于处理速度,队列会无限增长,最终耗尽堆内存,导致 OutOfMemoryError: Java heap space

  • 响应延迟:任务在队列中积压,等待时间越来越长,可能无法满足实时性要求。

  • 系统不可预测:没有背压(backpressure)机制,上游无法感知下游的负载,系统容易崩溃。

4. 适用场景(谨慎使用)

无界队列只适用于:

  • 任务提交速度稳定且远低于处理速度,队列永远不会积压。

  • 内存足够大,且可以容忍极端情况下的积压。

  • 业务上允许任务无限排队(例如批量离线处理)。

对于生产环境,强烈建议使用有界队列,并配合合理的拒绝策略。

5. 有界队列 vs 无界队列

特性有界队列无界队列
容量上限明确指定(如 100, 1000)理论无限(实际受内存限制)
触发扩容队列满时创建新线程(直到 maximumPoolSize永远不会满,永远不会扩容到 > corePoolSize
拒绝策略当队列满且线程数已达最大时触发几乎从不触发
内存安全可控,防止 OOM高风险,可能导致 OOM
适用场景大部分生产环境极特殊场景,或学习测试

6. 最佳实践

  • 避免使用 Executors.newFixedThreadPool 和 newSingleThreadExecutor,因为它们默认使用无界队列。推荐直接使用 ThreadPoolExecutor 构造方法,并指定有界队列。

  • 常见有界队列选择

    • ArrayBlockingQueue:固定大小数组,性能好,公平性可配置。

    • LinkedBlockingQueue:可指定容量,链表结构。

    • SynchronousQueue:容量为 0,直接交给线程处理,适用于 newCachedThreadPool(但该队列不是无界,而是特殊的传递队列)。

  • 设置合理的拒绝策略

    • AbortPolicy(默认):抛异常。

    • CallerRunsPolicy:让调用者线程执行任务,提供反压。

    • DiscardPolicy / DiscardOldestPolicy:静默丢弃。

  • 监控队列大小:通过 ThreadPoolExecutor.getQueue().size() 监控积压情况,设置告警。

7. 代码示例:使用有界队列的安全线程池


int corePoolSize = 5;
int maxPoolSize = 10;
long keepAliveTime = 60L;
// 有界队列,容量 100
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(100);
RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
    corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS,
    workQueue, handler
);

总结

  • 无界队列:容量无上限,会导致 maximumPoolSize 失效,容易引发内存溢出。

  • 不要在生产环境随意使用无界队列,尤其是与固定大小线程池组合。

  • 始终优先考虑有界队列 + 明确的拒绝策略,以构建弹性、可靠的多线程系统。

Built with LogoFlowershow