Java中的PriorityBlockingQueue的指南

评论 0 浏览 0 2017-02-06

1.绪论

在这篇文章中,我们将重点讨论PriorityBlockingQueue 类,并讨论一些实际的例子。

从假设我们已经知道什么是Queue开始,我们将首先演示PriorityBlockingQueue中的元素是如何按优先级排序的。

在这之后,我们将演示如何用这种队列来阻断线程。

最后,我们将展示在跨多线程处理数据时,如何共同使用这两个功能。

2.元素的优先级

与标准队列不同,你不能随便向PriorityBlockingQueue添加任何类型的元素。有两种选择:

  1. 添加实现Comparable的元素。
  2. 添加没有实现Comparable的元素,条件是你要提供一个Comparator

通过使用Comparator Comparable实现来比较元素,PriorityBlockingQueue将始终被排序。

其目的是以一种方式实现比较逻辑,即最高优先级的元素总是先排序。然后,当我们从队列中移除一个元素时,它将总是具有最高优先级的那个。

首先,让我们在一个线程中使用我们的队列,而不是在多个线程中使用它。通过这样做,可以很容易地证明元素在单元测试中是如何排序的。

PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>();
ArrayList<Integer> polledElements = new ArrayList<>();
 
queue.add(1);
queue.add(5);
queue.add(2);
queue.add(3);
queue.add(4);

queue.drainTo(polledElements);

assertThat(polledElements).containsExactly(1, 2, 3, 4, 5);

正如我们所看到的,尽管以随机的顺序将元素添加到队列中,但当我们开始轮询它们时,它们将被排序。这是因为Integer类实现了Comparable,它将反过来被用来确保我们以升序从队列中取出它们。

还值得注意的是,当两个元素被比较且相同时,并不能保证它们将被如何排序。

3.使用Queue阻塞

如果我们处理的是一个标准队列,我们会调用poll() 来检索元素。然而,如果队列是空的,调用poll() 就会返回

PriorityBlockingQueue 实现了 BlockingQueue 接口,它为我们提供了一些额外的方法,使我们能够在从空队列中移除时进行阻塞。让我们尝试使用 take() 方法,它应该能做到这一点:

PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>();

new Thread(() -> {
  System.out.println("Polling...");

  try {
      Integer poll = queue.take();
      System.out.println("Polled: " + poll);
  } catch (InterruptedException e) {
      e.printStackTrace();
  }
}).start();

Thread.sleep(TimeUnit.SECONDS.toMillis(5));
System.out.println("Adding to queue");
queue.add(1);

尽管使用sleep()是一种略显脆弱的演示方式,但当我们运行这段代码时,我们将看到。

Polling...
Adding to queue
Polled: 1

这证明了take()在一个项目被添加之前是被阻塞:

  1. 该线程将打印“Polling”,以证明它已经开始了。
  2. 然后,测试将暂停5秒钟左右,以证明线程在这时一定已经调用了take()
  3. 我们添加到队列中,应该或多或少地立即看到“Polled: 1”来证明take() 返回了一个元素,只要它是可用的

还值得一提的是,BlockingQueue接口也为我们提供了在添加到满队列时进行阻塞的方法。

然而,一个PriorityBlockingQueue是无界的。这意味着它永远不会满,因此总是可以添加新的元素。

4.同时使用阻塞和优先权

现在我们已经解释了PriorityBlockingQueue的两个关键概念,让我们一起使用它们。我们可以简单地扩展我们之前的例子,但这次要在队列中添加更多的元素。

Thread thread = new Thread(() -> {
    System.out.println("Polling...");
    while (true) {
        try {
            Integer poll = queue.take();
            System.out.println("Polled: " + poll);
        } 
        catch (InterruptedException e) { 
            e.printStackTrace();
        }
    }
});

thread.start();

Thread.sleep(TimeUnit.SECONDS.toMillis(5));
System.out.println("Adding to queue");

queue.addAll(newArrayList(1, 5, 6, 1, 2, 6, 7));
Thread.sleep(TimeUnit.SECONDS.toMillis(1));

同样,由于使用了sleep(),这个例子有点脆,它仍然向我们展示了一个有效的用例。我们现在有一个队列,它阻塞了,等待元素被添加。然后我们一次添加很多元素,然后显示它们将按照优先级顺序被处理。输出将看起来像这样。

Polling...
Adding to queue
Polled: 1
Polled: 1
Polled: 2
Polled: 5
Polled: 6
Polled: 6
Polled: 7

5.总结

在本指南中,我们已经演示了如何使用PriorityBlockingQueue 来阻塞一个线程,直到一些项目被添加到其中,同时我们能够根据这些项目的优先级来处理它们。

这些例子的实现可以在GitHub上找到。这是一个基于Maven的项目,所以应该很容易按原样运行。

最后更新2023-02-06
0 个评论
标签