Java 的 ConcurrentSkipListMap 指南

评论 0 浏览 0 2017-04-28

一、概述

在这篇简短的文章中,我们将查看 ConcurrentSkipListMap 来自 java.util.concurrent 包的类。

这种构造允许我们以无锁的方式创建线程安全的逻辑。当我们想要制作数据的不可变快照而其他线程仍在将数据插入地图时,它是解决问题的理想选择。

我们将解决一个问题,即使用该构造对事件流进行排序并获取最近 60 秒内到达的事件的快照

2. 流排序逻辑

假设我们有一个不断来自多个线程的事件流。我们需要能够获取最近 60 秒的事件,以及超过 60 秒的事件。

首先,让我们定义事件数据的结构:

public class Event {
    private ZonedDateTime eventTime;
    private String content;

    // standard constructors/getters
}

我们希望使用 eventTime 字段对事件进行排序。要使用 ConcurrentSkipListMap 实现这一点,我们需要在创建它的实例时将 Comparator 传递给它的构造函数:

ConcurrentSkipListMap<ZonedDateTime, String> events
 = new ConcurrentSkipListMap<>(
 Comparator.comparingLong(v -> v.toInstant().toEpochMilli()));

我们将使用时间戳比较所有到达的事件。我们正在使用 comparingLong() 方法并传递可以从 ZonedDateTime 获取时间戳的提取函数。

当我们的事件到达时,我们只需要使用 put() 方法将它们添加到地图中。请注意,此方法不需要任何显式同步:

public void acceptEvent(Event event) {
    events.put(event.getEventTime(), event.getContent());
}

ConcurrentSkipListMap 将使用在构造函数中传递给它的 Comparator 处理下面这些事件的排序。

ConcurrentSkipListMap 最显著的优点是可以以无锁方式为其数据制作不可变快照的方法。要获取过去一分钟内到达的所有事件,我们可以使用 tailMap() 方法并传递我们想要获取元素的时间:

public ConcurrentNavigableMap<ZonedDateTime, String> getEventsFromLastMinute() {
    return events.tailMap(ZonedDateTime.now().minusMinutes(1));
}

它将返回过去一分钟的所有事件。这将是一个不可变的快照,最重要的是其他写入线程可以向 ConcurrentSkipListMap 添加新事件,而无需进行显式锁定。

现在,我们可以通过使用 headMap() 方法获取从现在起一分钟后到达的所有事件:

public ConcurrentNavigableMap<ZonedDateTime, String> getEventsOlderThatOneMinute() {
    return events.headMap(ZonedDateTime.now().minusMinutes(1));
}

这将返回所有早于一分钟的事件的不可变快照。以上所有方法都属于 EventWindowSort 类,我们将在下一节中使用它。

3.测试排序流逻辑

一旦我们使用 ConcurrentSkipListMap 实现了我们的排序逻辑,我们现在可以通过创建两个写入线程来测试它,每个线程将发送一百个事件:

ExecutorService executorService = Executors.newFixedThreadPool(3);
EventWindowSort eventWindowSort = new EventWindowSort();
int numberOfThreads = 2;

Runnable producer = () -> IntStream
  .rangeClosed(0, 100)
  .forEach(index -> eventWindowSort.acceptEvent(
      new Event(ZonedDateTime.now().minusSeconds(index), UUID.randomUUID().toString()))
  );

for (int i = 0; i < numberOfThreads; i++) {
    executorService.execute(producer);
}

每个线程都在调用 acceptEvent() 方法,发送具有 eventTime 从现在到“现在减去一百秒”的事件。

同时,我们可以调用 getEventsFromLastMinute() 方法,该方法将返回一分钟窗口内事件的快照:

ConcurrentNavigableMap<ZonedDateTime, String> eventsFromLastMinute 
  = eventWindowSort.getEventsFromLastMinute();

eventsFromLastMinute 中的事件数在每次测试运行中都会有所不同,具体取决于生产者线程将事件发送到 EventWindowSort 的速度。 我们可以断言返回的快照中没有一个事件超过一分钟:

long eventsOlderThanOneMinute = eventsFromLastMinute
  .entrySet()
  .stream()
  .filter(e -> e.getKey().isBefore(ZonedDateTime.now().minusMinutes(1)))
  .count();
 
assertEquals(eventsOlderThanOneMinute, 0);

并且在一分钟窗口内的快照中有超过零个事件:

long eventYoungerThanOneMinute = eventsFromLastMinute
  .entrySet()
  .stream()
  .filter(e -> e.getKey().isAfter(ZonedDateTime.now().minusMinutes(1)))
  .count();
 
assertTrue(eventYoungerThanOneMinute > 0);

我们的 getEventsFromLastMinute() 使用下面的 tailMap()

现在让我们测试 getEventsOlderThatOneMinute() ,它使用 ConcurrentSkipListMap 中的 headMap() 方法:

ConcurrentNavigableMap<ZonedDateTime, String> eventsFromLastMinute 
  = eventWindowSort.getEventsOlderThatOneMinute();

这次我们得到了超过一分钟的事件快照。我们可以断言此类事件的发生次数多于零:

long eventsOlderThanOneMinute = eventsFromLastMinute
  .entrySet()
  .stream()
  .filter(e -> e.getKey().isBefore(ZonedDateTime.now().minusMinutes(1)))
  .count();
 
assertTrue(eventsOlderThanOneMinute > 0);

接下来,没有一个事件是在最后一分钟内发生的:

long eventYoungerThanOneMinute = eventsFromLastMinute
  .entrySet()
  .stream()
  .filter(e -> e.getKey().isAfter(ZonedDateTime.now().minusMinutes(1)))
  .count();
 
assertEquals(eventYoungerThanOneMinute, 0);

需要注意的最重要的一点是,我们可以在其他线程仍在向 ConcurrentSkipListMap 添加新值时拍摄数据快照。

4. 结论

在本快速教程中,我们了解了 ConcurrentSkipListMap 的基础知识,以及一些实际示例

我们利用 ConcurrentSkipListMap 的高性能来实现一种非阻塞算法,该算法可以为我们提供不可变的数据快照,即使多个线程同时更新映射也是如此。

所有这些示例和代码片段的实现都可以在 GitHub项目找到,这是一个 Maven 项目,所以它应该很容易导入和运行。

最后更新2023-01-09
0 个评论
标签