案例

Java并发的时候,看到了一个关于生产者消费者的案例

一个producer类和一个consumer类,实现了Runnable接口,作为线程执行的任务

一个Queue类,实现了queue的put()和take操作,模拟了生产和消费2个动作

实现方式使用了ReentrantLock+Condition

这个示例代码的话,是可以实现多线程之间并发生产和消费的,多个线程共同操作一个Queue对象,可以实现队列满时生产等待,队列空时消费等待,且无并发问题。

我当时产生的疑问是为什么使用了ReentrantLock,还要使用Condition

这个问题下一个章节再说,先贴下示例代码

Producer

public class Producer implements Runnable {

    Queue queue = null;

    public Producer(Queue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        String threadName = Thread.currentThread().getName();

        try {
            // 隔10秒轮询生产一次
            while (true) {
                System.out.println("Producer");
                TimeUnit.SECONDS.sleep(10);
                queue.put(new Random().nextInt(100),threadName);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Consumer

public class Consumer implements Runnable {
    Queue queue = null;

    public Consumer(Queue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        String threadName = Thread.currentThread().getName();
        try {

            // 隔3秒轮询消费一次
            while (true) {
                System.out.println("Customer");
                TimeUnit.SECONDS.sleep(3);
                System.out.println("取到的值-" + queue.take());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Queue

public class Queue {
    private int[] arr = new int[5];
    private int size = 0;
    private int putIndex = 0;  // 生产位置
    private int takeIndex = 0; // 消费位置
    private ReentrantLock lock = new ReentrantLock();
    private Condition pCondition = lock.newCondition();
    private Condition cCondition = lock.newCondition();

    public boolean isEmpty() {
        return size==0;
    }

    public boolean isFull() {
        return size==5;
    }

    public void put(Integer value, String name) throws InterruptedException {
        log(name + " ▶▶▶ 尝试获取锁...");
        lock.lock();
        try {
            log(name + " ✅ 成功获取锁 | 当前锁状态: " + lock);
            while (isFull()) {
                log(name + " ⏸️ 队列已满,进入等待 (pCondition.await())...");
                pCondition.await(); // 自动释放锁!
                log(name + " 🔄 被唤醒,重新获取锁 | 当前锁状态: " + lock);
            }
            arr[putIndex] = value;
            putIndex = (putIndex + 1) % arr.length;
            size++;
            // 生产逻辑...
            log(name + " 🔔 生产完成,唤醒消费者 (cCondition.signalAll())");
            cCondition.signalAll();
        } finally {
            lock.unlock();
            log(name + " ⏹️ 释放锁 | 当前锁状态: " + lock);
        }
    }

    public int take() throws InterruptedException {
        String threadName = Thread.currentThread().getName();
        log(threadName + " ▶▶▶ 尝试获取锁...");
        lock.lock();
        try {
            log(threadName + " ✅ 成功获取锁 | 当前锁状态: " + lock);
            while (isEmpty()) {
                log(threadName + " ⏸️ 队列为空,进入等待 (cCondition.await())...");
                cCondition.await(); // 自动释放锁!
                log(threadName + " 🔄 被唤醒,重新获取锁 | 当前锁状态: " + lock);
            }
            int value = arr[takeIndex];
            arr[takeIndex] = 0;  // 可选:清理数据以便调试
            takeIndex = (takeIndex + 1) % arr.length;
            size--;
            log(threadName + " 🔔 消费完成,唤醒生产者 (pCondition.signalAll())");
            pCondition.signalAll();
            return value;
        } finally {
            lock.unlock();
            log(threadName + " ⏹️ 释放锁 | 当前锁状态: " + lock);
        }
    }

    private void log(String message) {
        System.out.printf("[%s] %s%n", LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_TIME), message);
    }
}

分析

代码测试

首先写了个测试类,测试了一下生产消费逻辑, 2个生产者,2个消费者,共4个线程,操作同一个queue,实现生产消费逻辑

public static void main(String[] args) {
        // 两个生产者,两个消费者
        Queue queue = new Queue();
        Thread producer1 = new Thread(new Producer(queue));
        producer1.setName("Producer1");
        producer1.start();
        Thread producer2 = new Thread(new Producer(queue));
        producer2.setName("Producer2");
        producer2.start();
        Thread Consumer1 = new Thread(new Consumer(queue));
        Consumer1.setName("Consumer1");
        Consumer1.start();

        Thread Consumer2 = new Thread(new Consumer(queue));
        Consumer2.setName("Consumer2");
        Consumer2.start();
}

控制台部分输出内容如下

输出内容展示了尝试获取锁—->获取锁成功—->等待—->释放锁—->被唤醒—->重新获取锁—->生产/消费完成并唤醒等待的线程—->释放锁的这么一个过程

[16:14:07.504] Consumer1 ▶▶▶ 尝试获取锁...
[16:14:07.504] Consumer2 ▶▶▶ 尝试获取锁...
[16:14:07.516] Consumer1  成功获取锁 | 当前锁状态: java.util.concurrent.locks.ReentrantLock@363b2ca2[Locked by thread Consumer1]
[16:14:07.516] Consumer1  队列为空进入等待 (cCondition.await())...
[16:14:07.516] Consumer2  成功获取锁 | 当前锁状态: java.util.concurrent.locks.ReentrantLock@363b2ca2[Locked by thread Consumer2]
[16:14:07.516] Consumer2  队列为空进入等待 (cCondition.await())...
[16:14:14.5] Producer2 ▶▶▶ 尝试获取锁...
[16:14:14.501] Producer2  成功获取锁 | 当前锁状态: java.util.concurrent.locks.ReentrantLock@363b2ca2[Locked by thread Producer2]
[16:14:14.502] Producer2 🔔 生产完成唤醒消费者 (cCondition.signalAll())
[16:14:14.5] Producer1 ▶▶▶ 尝试获取锁...
[16:14:14.502] Consumer1 🔄 被唤醒重新获取锁 | 当前锁状态: java.util.concurrent.locks.ReentrantLock@363b2ca2[Locked by thread Consumer1]
[16:14:14.502] Producer2  释放锁 | 当前锁状态: java.util.concurrent.locks.ReentrantLock@363b2ca2[Unlocked]
Producer
[16:14:14.503] Consumer1 🔔 消费完成唤醒生产者 (pCondition.signalAll())
[16:14:14.504] Consumer2 🔄 被唤醒重新获取锁 | 当前锁状态: java.util.concurrent.locks.ReentrantLock@363b2ca2[Locked by thread Consumer2]
[16:14:14.504] Consumer1  释放锁 | 当前锁状态: java.util.concurrent.locks.ReentrantLock@363b2ca2[Unlocked]
[16:14:14.504] Consumer2  队列为空进入等待 (cCondition.await())...
取到的值-68
Customer
[16:14:14.504] Producer1  成功获取锁 | 当前锁状态: java.util.concurrent.locks.ReentrantLock@363b2ca2[Locked by thread Producer1]
[16:14:14.505] Producer1 🔔 生产完成唤醒消费者 (cCondition.signalAll())
[16:14:14.505] Producer1  释放锁 | 当前锁状态: java.util.concurrent.locks.ReentrantLock@363b2ca2[Unlocked]
Producer
[16:14:14.505] Consumer2 🔄 被唤醒重新获取锁 | 当前锁状态: java.util.concurrent.locks.ReentrantLock@363b2ca2[Locked by thread Consumer2]
[16:14:14.506] Consumer2 🔔 消费完成唤醒生产者 (pCondition.signalAll())
[16:14:14.506] Consumer2  释放锁 | 当前锁状态: java.util.concurrent.locks.ReentrantLock@363b2ca2[Unlocked]
取到的值-83

疑问

一开始由于对这快内容不是很了解,我产生了2个疑问,

  • 线程只要获取锁之后,其他线程都得等待获取锁,多个线程只用竞争一把锁,为什么要使用condition呢。
  • 线程在获取锁之后,只能等待锁执行完释放才能重新竞争到锁,加了condition等待唤醒有什么用呢

带着这2个疑问,我去问了deepseek和grok以及chatgpt,靠ai给思路,然后去看了代码,确认了使用condition的合理性和必要性

问题1

先看看grok怎么回答问题1的

只使用lock

单独使用 Lock 通常会通过忙等待(busy-waiting)或简单的条件检查来模拟生产者-消费者的同步

Lock lock = new ReentrantLock();
Queue<Integer> queue = new LinkedList<>();
int capacity = 10;

public void put(int item) {
    lock.lock();
    try {
        while (queue.size() >= capacity) {
            lock.unlock(); // 临时释放锁
            Thread.yield(); // 让出 CPU
            lock.lock();   // 重新获取锁
        }
        queue.add(item);
    } finally {
        lock.unlock();
    }
}

public void take() {
    lock.lock();
    try {
        while (queue.isEmpty()) {
            lock.unlock();
            Thread.yield();
            lock.lock();
        }
        queue.poll();
    } finally {
        lock.unlock();
    }
}

只使用lock,会频繁判断条件,写出如上的加锁/释放锁的代码,以及要让出cpu,不够优雅

结合lock+condition

使用 Lock 和 Condition,可以通过条件变量精确控制线程的等待和唤醒

Lock lock = new ReentrantLock();
Condition pCondition = lock.newCondition();
Condition cCondition = lock.newCondition();
Queue<Integer> queue = new LinkedList<>();
int capacity = 10;

public void produce(int item) {
    lock.lock();
    try {
        while (queue.size() >= capacity) {
            pCondition.await(); // 等待“非满”条件
        }
        queue.add(item);
        cCondition.signal(); // 唤醒等待“非空”的消费者
    } finally {
        lock.unlock();
    }
}

public void consume() {
    lock.lock();
    try {
        while (queue.isEmpty()) {
            cCondition.await(); // 等待“非空”条件
        }
        queue.poll();
        pCondition.signal(); // 唤醒等待“非满”的生产者
    } finally {
        lock.unlock();
    }
}

lock+condition,可以精准控制线程的等待和唤醒

使用了2个condition,queue满了pCondition就等待,暂停生产,queue空了cCondition就等待,暂停消费

生产之后, 释放信号,唤醒cCondition去消费,消费之后,唤醒pCondition去生产

2种实现方式优缺点比较

特性只使用 Lock使用 Lock + Condition
线程通知无法精确通知,依赖轮询精确通知,通过 signal() 唤醒
效率忙等待或频繁锁切换,效率低线程挂起,无忙等待,效率高
锁竞争高,可能反复释放和获取锁低,唤醒后队列式获取锁
代码复杂度高,手动实现等待逻辑低,API 简洁且功能强大
条件区分无法区分不同条件支持多个 Condition 对象
中断处理手动检查和处理内置支持,抛出异常

通过以上的优缺点比较,可以看到单独使用lock,有以下缺点

  • 缺乏精确的线程通知机制
  • 效率低下,忙等待或锁切换开销大。
  • 更高的锁竞争和延迟。
  • 代码复杂且不够优雅。
  • 无法区分不同等待条件。
  • 中断处理困难。

在实际的生产场景种

假设一个高并发生产者-消费者系统:

  • 只使用 Lock
    • 缓冲区满时,生产者线程不断轮询,浪费 CPU。
    • 消费者被唤醒后可能发现缓冲区仍空(其他消费者抢先消费),需要再次等待。
    • 系统吞吐量低,延迟高。
  • 使用 Lock + Condition
    • 生产者等待 pCondition,消费者等待 cCondition,互不干扰。
    • 生产者放入数据后只唤醒消费者,消费者移除数据后只唤醒生产者。
    • 系统高效运行,资源利用率高。

问题2

再看看问题2的回答

获取condition使用的是lock的api,返回的是定义在aqs中的conditionobject

private ReentrantLock lock = new ReentrantLock();
private Condition pCondition = lock.newCondition();
private Condition cCondition = lock.newCondition();

image-20250227172213797

调用pCondition.await(); 实际是调用了java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#await()

我将aqs的方法代码,喂给了grok,让grok帮我分析方法实现的细节

/**
 * 实现可中断的条件等待。
 * <ol>
 * <li> 如果当前线程被中断,则抛出 InterruptedException 异常。
 * <li> 保存由 {@link #getState} 返回的锁状态。
 * <li> 使用保存的状态作为参数调用 {@link #release} 方法,
 *      如果失败则抛出 IllegalMonitorStateException 异常。
 * <li> 阻塞,直到被唤醒或被中断。
 * <li> 通过调用 {@link #acquire} 的特定版本,并传入保存的状态作为参数来重新获取锁。
 * <li> 如果在第 4 步阻塞时被中断,则抛出 InterruptedException 异常。
 * </ol>
 */
  

        /**
         * Implements interruptible condition wait.
         * <ol>
         * <li> If current thread is interrupted, throw InterruptedException.
         * <li> Save lock state returned by {@link #getState}.
         * <li> Invoke {@link #release} with saved state as argument,
         *      throwing IllegalMonitorStateException if it fails.
         * <li> Block until signalled or interrupted.
         * <li> Reacquire by invoking specialized version of
         *      {@link #acquire} with saved state as argument.
         * <li> If interrupted while blocked in step 4, throw InterruptedException.
         * </ol>
         */
        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
           //释放锁
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

grok帮我总结出了如下重点

锁的释放
 fullyRelease(node) 中释放锁
发生在等待之前确保线程在挂起时不持有锁
等待锁的地方
 while (!isOnSyncQueue(node)) 循环中通过 LockSupport.park(this) 挂起线程
等待条件是 node 被转移到同步队列 signal() 触发或被中断
获取锁后的恢复
 acquireQueued(node, savedState) 中重新竞争并获取锁
使用保存的 savedState 恢复锁的状态例如重入次数)。
获取锁后处理可能的清理和中断逻辑


锁的流程如下
  
持有锁
  
检查中断  中断抛异常退出
  
创建等待节点
  
释放锁 (fullyRelease)
  
循环等待 (LockSupport.park)
  
被唤醒/中断  检查中断状态
  
重新获取锁 (acquireQueued)
  
清理取消节点
  
处理中断
  
返回 (持有锁)

也就是说在这行代码 int savedState = fullyRelease(node);会释放掉锁

详细看一下这个方法,这个方法是aqs提供的java.util.concurrent.locks.AbstractQueuedSynchronizer#fullyRelease

/**

* 使用当前状态值调用 release 方法,并返回保存的状态。
* 若操作失败,则取消节点并抛出异常。
* @param node 等待中的条件节点
* @return 先前的同步状态
*/

/**
     * Invokes release with current state value; returns saved state.
     * Cancels node and throws exception on failure.
     * @param node the condition node for this wait
     * @return previous sync state
     */
    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

会在if条件里执行java.util.concurrent.locks.AbstractQueuedSynchronizer#release方法,尝试释放锁

/**  
 * 以独占模式释放锁。若 {@link #tryRelease} 返回 true,则唤醒一个或多个线程。  
 * 此方法可用于实现 {@link Lock#unlock}。  
 *  
 * @param arg 释放参数,该值会传递给 {@link #tryRelease},但不会被额外解释,可用于表示任意含义。  
 * @return {@link #tryRelease} 方法的返回值。  
 */


/**
     * Releases in exclusive mode.  Implemented by unblocking one or
     * more threads if {@link #tryRelease} returns true.
     * This method can be used to implement method {@link Lock#unlock}.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryRelease} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @return the value returned from {@link #tryRelease}
     */
    @ReservedStackAccess
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

最终执行java.util.concurrent.locks.AbstractQueuedSynchronizer#tryRelease进行释放,由子类定义具体的释放逻辑,这个场景下最终调用java.util.concurrent.locks.ReentrantReadWriteLock.Sync#tryRelease进行的释放逻辑

这么看下来,执行java.util.concurrent.locks.Condition#await()会将持有的锁释放掉,释放掉锁之后,其他等待获取锁的线程可以尝试获取锁,拿到锁之后尝试进行生产或者消费

假如执行到java.util.concurrent.locks.Condition#signalAll,那么会发通知等待在pCondition或cCondition上的线程,唤醒等待,尝试获取锁,在put()或take()finally里,最终会释放锁,对应的condition被唤醒,尝试重新获取锁。

只有这样在await会释放锁,lock+condition的组合才会有意义。

总结

lock+condition结合,可以实现精准控制,准确通知线程,降低锁竞争和资源消耗,实现并发的生产消费模型

其他

以前有问题,只能靠搜索引擎给答案,现在有了ai之后,可以向ai提问,ai可以给你做详细的解释,也不用自己苦哈哈的看和理解了。

遇到不懂的,ai能给你做详细解释,开发的学习成本确实降低了

但是ai浪潮席卷下,硅基如何干碎碳基,可能很快就会来临了。

现在就是ai人类化,人类ai化,真的抽象