Java SynchronousQueue的指南

评论 0 浏览 0 2017-04-24

1.概述

在这篇文章中,我们将研究来自java.util.concurrent 包的SynchronousQueue

简单地说,这种实现方式使我们能够以线程安全的方式在线程之间交换信息。

2.API概述

SynchronousQueue 仅有两个支持的操作。take() put(),而且这两个操作都是阻塞的

例如,当我们想向队列添加一个元素时,我们需要调用put() 方法。该方法将被阻塞,直到其他某个线程调用take() 方法,表示它已经准备好获取一个元素。

尽管SynchronousQueue有一个队列的接口,但我们应该把它看作是两个线程之间的单个元素的交换点,其中一个线程正在传递一个元素,而另一个线程正在获取该元素。

3.使用共享变量实现切换

为了了解为什么SynchronousQueue 可以如此有用,我们将使用两个线程之间的共享变量实现一个逻辑,接下来,我们将使用SynchronousQueue重写该逻辑,使我们的代码更简单,更易读。

假设我们有两个线程 – 一个生产者和一个消费者 – 当生产者在设置一个共享变量的值时,我们要向消费者线程发出信号。接下来,消费者线程将从共享变量中获取一个值。

我们将使用CountDownLatch来协调这两个线程,以防止出现消费者访问一个尚未设置的共享变量的值的情况。

我们将定义一个共享状态变量和一个CountDownLatch,它将被用于协调处理。

ExecutorService executor = Executors.newFixedThreadPool(2);
AtomicInteger sharedState = new AtomicInteger();
CountDownLatch countDownLatch = new CountDownLatch(1);

生产者将保存一个随机的整数到sharedState 变量中,并在countDownLatch上执行countDown() 方法,向消费者发出信号,表示它可以从sharedState:中获取一个值。

Runnable producer = () -> {
    Integer producedElement = ThreadLocalRandom
      .current()
      .nextInt();
    sharedState.set(producedElement);
    countDownLatch.countDown();
};

消费者将使用await() 方法来等待countDownLatch。当生产者发出信号说该变量已被设置,消费者将从sharedState:中获取它。

Runnable consumer = () -> {
    try {
        countDownLatch.await();
        Integer consumedElement = sharedState.get();
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
};

最后但并非最不重要的是,让我们开始我们的计划。

executor.execute(producer);
executor.execute(consumer);

executor.awaitTermination(500, TimeUnit.MILLISECONDS);
executor.shutdown();
assertEquals(countDownLatch.getCount(), 0);

它将产生以下输出结果。

Saving an element: -1507375353 to the exchange point
consumed an element: -1507375353 from the exchange point

我们可以看到,为了实现在两个线程之间交换一个元素这样一个简单的功能,需要很多代码。在下一节中,我们将尝试让它变得更好。

4.使用SynchronousQueue实现交接

现在让我们实现与上一节相同的功能,但使用SynchronousQueue。它有双重效果,因为我们可以用它来交换线程间的状态,并协调该动作,这样我们就不需要使用除SynchronousQueue.以外的任何东西了。

首先,我们将定义一个队列。

ExecutorService executor = Executors.newFixedThreadPool(2);
SynchronousQueue<Integer> queue = new SynchronousQueue<>();

生产者将调用一个put()方法,该方法将被阻塞,直到其他线程从队列中获取一个元素。

Runnable producer = () -> {
    Integer producedElement = ThreadLocalRandom
      .current()
      .nextInt();
    try {
        queue.put(producedElement);
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
};

消费者将简单地使用take()方法来检索该元素。

Runnable consumer = () -> {
    try {
        Integer consumedElement = queue.take();
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
};

下一步,我们将开始我们的程序。

executor.execute(producer);
executor.execute(consumer);

executor.awaitTermination(500, TimeUnit.MILLISECONDS);
executor.shutdown();
assertEquals(queue.size(), 0);

它将产生以下输出结果。

Saving an element: 339626897 to the exchange point
consumed an element: 339626897 from the exchange point

我们可以看到,一个同步队列被用作线程之间的交换点,这比之前的例子要好得多,也更容易理解,因为之前的例子将共享状态与CountDownLatch.一起使用。

5.总结

在这个快速教程中,我们看了SynchronousQueue结构。我们创建了一个使用共享状态在两个线程之间交换数据的程序,然后重写了该程序以利用SynchronousQueue 结构。这作为一个交换点,协调了生产者和消费者线程。

所有这些例子和代码片段的实现都可以在GitHub项目中找到--这是一个Maven项目,所以应该很容易导入和运行,就像现在一样。

最后更新2023-02-22
0 个评论
标签