1. Обзор

Map, естественно, являются одним из самых распространенных стилей коллекции Java.

И, что важно, HashMap не является потокобезопасной реализацией, в то время как Hashtable обеспечивает безопасность потоков благодаря синхронизации операций.

Несмотря на то, что Hashtable безопасен для потоков, он не очень эффективен. Другая полностью синхронизированная Map, Collections.synchronizedMap, также не демонстрирует большой эффективности. Если мы хотим обеспечить безопасность потоков с высокой пропускной способностью в условиях высокого параллелизма, эти реализации не подходят.

Чтобы решить эту проблему, Java Collections Framework представила ConcurrentMap в Java.

Следующие обсуждения основаны на Java 1.8.

2. ConcurrentMap

ConcurrentMap является расширением интерфейса Map. Он призван предоставить структуру и руководство для решения проблемы согласования пропускной способности с безопасностью потоков.

Переопределяя несколько методов интерфейса по умолчанию, ConcurrentMap дает рекомендации для правильных реализаций, чтобы обеспечить потоковую безопасность и согласованные с памятью атомарные операции.

Несколько реализаций по умолчанию переопределяются, отключая поддержку нулевого значения ключа

  • getOrDefault
  • forEachдля каждого
  • replaceAll
  • computeIfAbsent
  • computeIfPresent
  • compute
  • merge

Следующие API также переопределяются для поддержки атомарности, без реализации интерфейса по умолчанию

  • putIfAbsent
  • remove
  • replace(key, oldValue, newValue)
  • replace(key, value)

Остальные действия напрямую наследуются и в основном соответствуют Map.

3. ConcurrentHashMap

ConcurrentHashMap является готовой реализацией ConcurrentMap.

Для повышения производительности он состоит из массива узлов в виде таблицы корзин (которые раньше были сегментами таблиц до Java), и главным образом использует операции CAS во время обновления.

Корзины инициализируются лениво, после первой вставки. Каждая корзина может быть независимо заблокировано путем блокировки самого первого узла в корзине. Операции чтения не блокируются, а количество обновлений сводится к минимуму.

Количество требуемых сегментов зависит от количества потоков, обращающихся к таблице, поэтому выполняемое обновление для сегмента будет составлять не более одного большую часть времени.

До Java 8 требуемое количество сегментов было связано с количеством потоков, обращающихся к таблице, поэтому выполняемое обновление для каждого сегмента не превышало бы большую часть времени.

Вот почему конструкторы, по сравнению с HashMap, предоставляют дополнительный аргумент concurrencyLevel для управления количеством предполагаемых потоков, которые нужно использовать.

public ConcurrentHashMap(
public ConcurrentHashMap(
 int initialCapacity, float loadFactor, int concurrencyLevel)

Два других аргумента: initialCapacity и loadFactor работали так же, как и HashMap.

Однако, поскольку Java, конструкторы присутствуют только для обратной совместимости, параметры могут влиять только на начальный размер карты.

3.1. Потоко-безопасность

ConcurrentMap гарантирует согласованность памяти при выполнении операций с ключами/значениями в многопоточной среде.

Действия в потоке предшествующие помещению объекта в ConcurrentMap в качестве ключа или значения выполняются перед действиями после доступа или удаления этого объекта в другом потоке.

Чтобы подтвердить, давайте посмотрим на случай несовместимости памяти:

@Test
public void givenHashMap_whenSumParallel_thenError() throws Exception {
    Map<String, Integer> map = new HashMap<>();
    List<Integer> sumList = parallelSum100(map, 100);

    assertNotEquals(1, sumList
      .stream()
      .distinct()
      .count());
    long wrongResultCount = sumList
      .stream()
      .filter(num -> num != 100)
      .count();
    
    assertTrue(wrongResultCount > 0);
}

private List<Integer> parallelSum100(Map<String, Integer> map, 
  int executionTimes) throws InterruptedException {
    List<Integer> sumList = new ArrayList<>(1000);
    for (int i = 0; i < executionTimes; i++) {
        map.put("test", 0);
        ExecutorService executorService = 
          Executors.newFixedThreadPool(4);
        for (int j = 0; j < 10; j++) {
            executorService.execute(() -> {
                for (int k = 0; k < 10; k++)
                    map.computeIfPresent(
                      "test", 
                      (key, value) -> value + 1
                    );
            });
        }
        executorService.shutdown();
        executorService.awaitTermination(5, TimeUnit.SECONDS);
        sumList.add(map.get("test"));
    }
    return sumList;
}

Для каждого параллельного действия map.computeIfPresent HashMap не предоставляет согласованного представления о том, каким должно быть текущее целочисленное значение, что приводит к противоречивым и нежелательным результатам.

Что касается ConcurrentHashMap, мы можем получить последовательный и правильный результат:

@Test
public void givenConcurrentMap_whenSumParallel_thenCorrect() 
  throws Exception {
    Map<String, Integer> map = new ConcurrentHashMap<>();
    List<Integer> sumList = parallelSum100(map, 1000);

    assertEquals(1, sumList
      .stream()
      .distinct()
      .count());
    long wrongResultCount = sumList
      .stream()
      .filter(num -> num != 100)
      .count();
    
    assertEquals(0, wrongResultCount);
}

3.2. Нулловые ключи/значения

Большинство API, предоставляемых ConcurrentMap, не допускают нулевой ключ или значение, например:

@Test(expected = NullPointerException.class)
public void givenConcurrentHashMap_whenPutWithNullKey_thenThrowsNPE() {
    concurrentMap.put(null, new Object());
}

@Test(expected = NullPointerException.class)
public void givenConcurrentHashMap_whenPutNullValue_thenThrowsNPE() {
    concurrentMap.put("test", null);
}

Однако для действий вычисления и слияния вычисленное значение может быть нулевым, что указывает на то, что отображение значения ключа удаляется, если присутствует, или остается отсутствующим, если ранее отсутствовало.

@Test
public void givenKeyPresent_whenComputeRemappingNull_thenMappingRemoved() {
    Object oldValue = new Object();
    concurrentMap.put("test", oldValue);
    concurrentMap.compute("test", (s, o) -> null);

    assertNull(concurrentMap.get("test"));
}

3.3. Поддержка Stream

Java 8 также обеспечивает поддержку Stream в ConcurrentHashMap.

В отличие от большинства потоковых методов, массовые последовательные и параллельные операции позволяют безопасно выполнять параллельное изменение. Не будет выброшено исключение ConcurrentModificationException, что также относится к его итераторам. Относящиеся к потокам, также добавлено несколько методов forEach, search и reduce для поддержки более сложных операций обхода и преобразования.

3.4. Производительность

Под капотом ConcurrentHashMap чем-то похож на HashMap, с доступом к данным и их обновлением на основе хеш-таблицы, хотя и более сложным.

И, конечно, ConcurrentHashMap должен обеспечивать гораздо лучшую производительность в большинстве параллельных случаев для извлечения и обновления данных.

Давайте напишем быстрый микробенчмарк для производительности get() и put() и сравним его с Hashtable и Collections.synchronizedMap, для которых обе операции выполняются 500 000 раз 4 в потоках.

@Test
public void givenMaps_whenGetPut500KTimes_thenConcurrentMapFaster() 
  throws Exception {
    Map<String, Object> hashtable = new Hashtable<>();
    Map<String, Object> synchronizedHashMap = 
      Collections.synchronizedMap(new HashMap<>());
    Map<String, Object> concurrentHashMap = new ConcurrentHashMap<>();

    long hashtableAvgRuntime = timeElapseForGetPut(hashtable);
    long syncHashMapAvgRuntime = 
      timeElapseForGetPut(synchronizedHashMap);
    long concurrentHashMapAvgRuntime = 
      timeElapseForGetPut(concurrentHashMap);

    assertTrue(hashtableAvgRuntime > concurrentHashMapAvgRuntime);
    assertTrue(syncHashMapAvgRuntime > concurrentHashMapAvgRuntime);
}

private long timeElapseForGetPut(Map<String, Object> map) 
  throws InterruptedException {
    ExecutorService executorService = 
      Executors.newFixedThreadPool(4);
    long startTime = System.nanoTime();
    for (int i = 0; i < 4; i++) {
        executorService.execute(() -> {
            for (int j = 0; j < 500_000; j++) {
                int value = ThreadLocalRandom
                  .current()
                  .nextInt(10000);
                String key = String.valueOf(value);
                map.put(key, value);
                map.get(key);
            }
        });
    }
    executorService.shutdown();
    executorService.awaitTermination(1, TimeUnit.MINUTES);
    return (System.nanoTime() - startTime) / 500_000;
}

Имейте в виду, что микро-бенчмарки рассматривают только один сценарий и не всегда являются хорошим отражением производительности в реальном мире.

Тем не менее, в системе OS X со средней системой разработки наблюдали средний результат выборки для последовательных прогонов в наносекундах:

Hashtable: 1142.45
SynchronizedHashMap: 1273.89
ConcurrentHashMap: 230.2

В многопоточной среде, где ожидается, что несколько потоков получат доступ к общей карте, ConcurrentHashMap явно предпочтительнее.

Однако, когда карта доступна только для одного потока, HashMap может быть лучшим выбором из-за ее простоты и высокой производительности.

3.5. Подводные камни

Операции извлечения обычно не блокируются в ConcurrentHashMap и могут перекрываться с операциями обновления. Так что для повышения производительности они отражают только результаты самых последних завершенных операций обновления, как указано в официальном Javadoc.

Есть несколько других фактов, которые следует иметь в виду:

@Test
public void givenConcurrentMap_whenUpdatingAndGetSize_thenError() 
  throws InterruptedException {
    Runnable collectMapSizes = () -> {
        for (int i = 0; i < MAX_SIZE; i++) {
            mapSizes.add(concurrentMap.size());
        }
    };
    Runnable updateMapData = () -> {
        for (int i = 0; i < MAX_SIZE; i++) {
            concurrentMap.put(String.valueOf(i), i);
        }
    };
    executorService.execute(updateMapData);
    executorService.execute(collectMapSizes);
    executorService.shutdown();
    executorService.awaitTermination(1, TimeUnit.MINUTES);

    assertNotEquals(MAX_SIZE, mapSizes.get(MAX_SIZE - 1).intValue());
    assertEquals(MAX_SIZE, concurrentMap.size());
}
  • Результаты статистических методов состояния, включая size, isEmpty и containsValue, обычно полезны, только когда карта не подвергается одновременным обновлениям в других потоках:

Если одновременные обновления находятся под строгим контролем, совокупный статус все еще будет надежным.

Хотя эти методы совокупного статуса не гарантируют точность в реальном времени, они могут быть адекватными для целей мониторинга или оценки.

Обратите внимание, что использование size() ConcurrentHashMap должно быть заменено на mappingCount(), поскольку последний метод возвращает длинный счет, хотя в глубине они основаны на одной и той же оценке.

  • hashCode важно помнить, что использование множества ключей с одинаковым hashCode() - верный способ снизить производительность любой хеш-таблицы.

Чтобы уменьшить влияние, когда ключи сравнимы, ConcurrentHashMap может использовать порядок сравнения между ключами, чтобы помочь разорвать связи. Тем не менее, мы должны избегать использования одного и того же хэш-кода, насколько это возможно.

public ConcurrentHashMap(
  int initialCapacity, float loadFactor, int concurrencyLevel) {
 
    //...
    if (initialCapacity < concurrencyLevel) {
        initialCapacity = concurrencyLevel;
    }
    //...
}
  • итераторы предназначены для использования только в одном потоке, поскольку они обеспечивают слабую согласованность, а не обход быстрого доступа, и они никогда не вызовут исключение ConcurrentModificationException.
  • начальная емкость таблицы по умолчанию равна, и она корректируется в соответствии с указанным уровнем параллелизма
  • предостережение о функциях переотображения, хотя мы можем выполнять операции переотображения с помощью предоставленных методов вычисления и слияния, мы должны сохранять их быстрыми, короткими и простыми и сосредоточиться на текущем отображении, чтобы избежать неожиданной блокировки.
  • ключи в ConcurrentHashMap расположены не в отсортированном порядке, поэтому для случаев, когда требуется упорядочение, ConcurrentSkipListMap является подходящим выбором.

4. ConcurrentNavigableMap

В случаях, когда требуется упорядочение ключей, мы можем использовать ConcurrentSkipListMap, параллельную версию TreeMap.

В качестве дополнения к ConcurrentMap, ConcurrentNavigableMap по умолчанию поддерживает полное упорядочение своих ключей в порядке возрастания и обеспечивает одновременную навигацию. Методы, которые возвращают представления карты, переопределяются для совместимости параллелизма

  • subMap
  • headMap
  • tailMap
  • subMap
  • headMap
  • tailMap
  • descendingMap

keySet() просматривает итераторы и сплитераторы со слабой памятью

  • navigableKeySet
  • keySet
  • descendingKeySet

5. ConcurrentSkipListMap

Ранее мы рассмотрели интерфейс NavigableMap и его реализацию TreeMap. ConcurrentSkipListMap можно рассматривать как масштабируемую параллельную версию TreeMap.

На практике в Java нет одновременной реализации красно-черные дерева. В ConcurrentSkipListMap реализован параллельный вариант SkipLists, обеспечивающий ожидаемую среднюю стоимость времени входа в систему для операций containsKey, get, put и remove и их вариантов.

В дополнение к функциям TreeMaps безопасность операций вставки, удаления, обновления и доступа к ключам гарантируется. Вот сравнение с TreeMap при одновременной навигации

@Test
public void givenSkipListMap_whenNavConcurrently_thenCountCorrect() 
  throws InterruptedException {
    NavigableMap<Integer, Integer> skipListMap
      = new ConcurrentSkipListMap<>();
    int count = countMapElementByPollingFirstEntry(skipListMap, 10000, 4);
 
    assertEquals(10000 * 4, count);
}

@Test
public void givenTreeMap_whenNavConcurrently_thenCountError() 
  throws InterruptedException {
    NavigableMap<Integer, Integer> treeMap = new TreeMap<>();
    int count = countMapElementByPollingFirstEntry(treeMap, 10000, 4);
 
    assertNotEquals(10000 * 4, count);
}

private int countMapElementByPollingFirstEntry(
  NavigableMap<Integer, Integer> navigableMap, 
  int elementCount, 
  int concurrencyLevel) throws InterruptedException {
 
    for (int i = 0; i < elementCount * concurrencyLevel; i++) {
        navigableMap.put(i, i);
    }
    
    AtomicInteger counter = new AtomicInteger(0);
    ExecutorService executorService
      = Executors.newFixedThreadPool(concurrencyLevel);
    for (int j = 0; j < concurrencyLevel; j++) {
        executorService.execute(() -> {
            for (int i = 0; i < elementCount; i++) {
                if (navigableMap.pollFirstEntry() != null) {
                    counter.incrementAndGet();
                }
            }
        });
    }
    executorService.shutdown();
    executorService.awaitTermination(1, TimeUnit.MINUTES);
    return counter.get();
}

6. Заключение

В этой статье мы в основном представили интерфейс ConcurrentMap и функции ConcurrentHashMap и рассказали о необходимости переупорядочения ключей для ConcurrentNavigableMap.