ConcurrentHashMap<String, Ref> vs AtomicReference<Map<String, Ref>>

sle*_*esh 1 java java.util.concurrent

什么是更好的选择,为什么?该地图用作临时存储。它将项目保留一段时间,然后刷新到 db。

这是我用原子参考实现的虚拟代码:

public class Service {
    private final AtomicReference<Map<String, Entity>> storedEntities = new AtomicReference<>(new HashMap<>());
    private final AtomicReference<Map<String, Entity>> newEntities = new AtomicReference<>(new HashMap<>());

    private final Dao dao;

    public Service(Dao dao) {
        this.dao = dao;
    }

    @Transactional
    @Async
    public CompletableFuture<Void> save() {
        Map<String, Entity> map = newEntities.getAndSet(new HashMap<>());
        return dao.saveAsync(map.values());
    }

    @Transactional(readOnly = true)
    @Async
    public CompletableFuture<Map<String, Entity>> readAll() {
        return dao.getAllAsync().thenApply(map -> {
            storedEntities.set(map);
            return map;
        });
    }

    @Scheduled(cron = "${cron}")
    public void refreshNow() {
        save();
        readAll();
    }

    public void addNewentity(Entity entity) {
        newEntities.getAndUpdate(map -> {
            map.put(entity.getHash(), entity);
            return map;
        });
    }

    public AtomicReference<List<Entity>> getStoredEntities() {
        return storedEntities.get().values();
    }

    public AtomicReference<List<Entity>> getNewEntities() {
        return newEntities.get().values();
    }
}
Run Code Online (Sandbox Code Playgroud)

正如我所说,我只需要将数据保留一段时间,然后通过 cron 将其刷新到 db。我对什么是更好的方法感兴趣 - AR vs CHM?

Har*_*ole 6

I'll start off by making the assumption that you require shared access to some resource, the temporary storage map, across multiple threads.

The short answer:

Use the ConcurrentHashMap.

The long(er) answer:

If your expectation is that through using an AtomicReference, you'll receive a consistent or thread-safe view of the map, or that your operations will be performed atomically, you're most likely to be incorrect - however, since you have not provided an example of your usage, I cannot say so with full certainty.

Whilst not negligible, performance shouldn't be the primary issue of your concern - rather, you should ensure the programs' ability to perform correctly and as intended, and then seek to improve upon its performance.

If you have multiple threads reading from and / or writing to the temporary storage map, it's important that the internal state of the map remains consistent and correct; you can achieve this through implementing your operations in a manner that is atomic and thread-safe, and to that extent, both a ConcurrentHashMap or map wrapped by SynchronizedMap will achieve this. However, each of the aforementioned implement their means of ensuring this at different granularities, and as a result of the specialised (optimistic) approach taken by the ConcurrentHashMap, as opposed to the naive (pessimistic) approach of the wrapped map, the ConcurrentHashMap is less susceptible to resource contention, and is likely to be the more performant of the two.

Going Forward

I've provided an implementation of the three mechanisms discussed in this post below; have an explore of the code below, in addition to the Java API documentation.

import java.io.PrintStream;
import java.text.DecimalFormat;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class ConcurrencyExample {

    interface TemporaryStorage<K, V> {

        V compute(K key, BiFunction<? super K, ? super V, ? extends V> remapper);

        V put(K key, V value);

        V get(K key);

        void clear();

        @FunctionalInterface
        interface UnitTest<K, V> {

            void test(TemporaryStorage<K, V> store, K key);

        }

    }

    static class ConcurrentHashMapTS<K, V> implements TemporaryStorage<K, V> {

        private final Map<K, V> map = new ConcurrentHashMap<>();

        @Override
        public V compute(K key, BiFunction<? super K, ? super V, ? extends V> remapper) {
            return map.compute(key, remapper);
        }

        @Override
        public V put(K key, V value) {
            return map.put(key, value);
        }

        @Override
        public V get(K key) {
            return map.get(key);
        }

        @Override
        public void clear() {
            map.clear();
        }
    }

    static class AtomicReferenceHashMapTS<K, V> implements TemporaryStorage<K, V> {

        private final AtomicReference<Map<K, V>> map = new AtomicReference<>(new HashMap<>());

        @Override
        public V compute(K key, BiFunction<? super K, ? super V, ? extends V> remapper) {
            return map.get().compute(key, remapper);
        }

        @Override
        public V put(K key, V value) {
            return map.get().put(key, value);
        }

        @Override
        public V get(K key) {
            return map.get().get(key);
        }

        @Override
        public void clear() {
            map.get().clear();
        }
    }

    static class MonitorLockedHashMapTS<K, V> implements TemporaryStorage<K, V> {

        private final Map<K, V> map = new HashMap<>();
        private final Object mutex = new Object(); // could use the map as the mutex

        @Override
        public V compute(K key, BiFunction<? super K, ? super V, ? extends V> remapper) {
            synchronized (mutex) {
                return map.compute(key, remapper);
            }
        }

        @Override
        public V put(K key, V value) {
            synchronized (mutex) {
                return map.put(key, value);
            }
        }

        @Override
        public V get(K key) {
            synchronized (mutex) {
                return map.get(key);
            }
        }

        @Override
        public void clear() {
            synchronized (mutex) {
                map.clear();
            }
        }
    }

    static class WrappedHashMapTS<K, V> implements TemporaryStorage<K, V> {

        private final Map<K, V> map = Collections.synchronizedMap(new HashMap<>());

        @Override
        public V compute(K key, BiFunction<? super K, ? super V, ? extends V> remapper) {
            return map.compute(key, remapper);
        }

        @Override
        public V put(K key, V value) {
            return map.put(key, value);
        }

        @Override
        public V get(K key) {
            return map.get(key);
        }

        @Override
        public void clear() {
            map.clear();
        }
    }

    static class AtomicUnitTest implements TemporaryStorage.UnitTest<Integer, Integer> {

        @Override
        public void test(TemporaryStorage<Integer, Integer> store, Integer key) {
            store.compute(key, (k, v) -> (v == null ? 0 : v) + 1);
        }
    }

    static class UnsafeUnitTest implements TemporaryStorage.UnitTest<Integer, Integer> {

        @Override
        public void test(TemporaryStorage<Integer, Integer> store, Integer key) {
            Integer value = store.get(key);
            store.put(key, (value == null ? 0 : value) + 1);
        }
    }

    public static class TestRunner {

        public static void main(String... args) throws InterruptedException {
            final int iterations = 1_000;
            final List<Integer> keys = IntStream.rangeClosed(1, iterations).boxed().collect(Collectors.toList());

            final int expected = iterations;

            for (int batch = 1; batch <= 5; batch++) {

                System.out.println(String.format("--- START BATCH %d ---", batch));

                test(System.out, new ConcurrentHashMapTS<>(), new AtomicUnitTest(), keys, expected, iterations);
                test(System.out, new ConcurrentHashMapTS<>(), new UnsafeUnitTest(), keys, expected, iterations);

                test(System.out, new AtomicReferenceHashMapTS<>(), new AtomicUnitTest(), keys, expected, iterations);
                test(System.out, new AtomicReferenceHashMapTS<>(), new UnsafeUnitTest(), keys, expected, iterations);

                test(System.out, new MonitorLockedHashMapTS<>(), new AtomicUnitTest(), keys, expected, iterations);
                test(System.out, new MonitorLockedHashMapTS<>(), new UnsafeUnitTest(), keys, expected, iterations);

                test(System.out, new WrappedHashMapTS<>(), new AtomicUnitTest(), keys, expected, iterations);
                test(System.out, new WrappedHashMapTS<>(), new UnsafeUnitTest(), keys, expected, iterations);

                System.out.println(String.format("--- END   BATCH %d ---", batch));

                System.out.println();

            }
        }

        private static <K, V> void test(PrintStream printer, TemporaryStorage<K, V> store, TemporaryStorage.UnitTest<K, V> work, List<K> keys, V expected, int iterations) throws InterruptedException {
            test(printer, store, work, keys, expected, iterations, Runtime.getRuntime().availableProcessors() * 4);
        }

        private static <K, V> void test(PrintStream printer, TemporaryStorage<K, V> store, TemporaryStorage.UnitTest<K, V> work, List<K> keys, V expected, int iterations, int parallelism) throws InterruptedException {
            final ExecutorService workers = Executors.newFixedThreadPool(parallelism);
            final long start = System.currentTimeMillis();

            for (K key : keys) {
                for (int iteration = 1; iteration <= iterations; iteration++) {

                    workers.execute(() -> {
                        try {
                            work.test(store, key);
                        } catch (Exception e) {
                            //e.printStackTrace(); //thrown by the AtomicReference<Map<K, V>> implementation
                        }
                    });

                }
            }

            workers.shutdown();
            workers.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);

            final long finish = System.currentTimeMillis();

            final DecimalFormat formatter = new DecimalFormat("###,###");

            final long correct = keys.stream().filter(key -> expected.equals(store.get(key))).count();

            printer.println(String.format("Store '%s' performed %s iterations of %s across %s threads in %sms. Accuracy: %d / %d (%4.2f percent)", store.getClass().getSimpleName(), formatter.format(iterations), work.getClass().getSimpleName(), formatter.format(parallelism), formatter.format(finish - start), correct, keys.size(), ((double) correct / keys.size()) * 100));
        }

    }

}
Run Code Online (Sandbox Code Playgroud)