java并发:很多作家,一个读者

Jan*_*ing 17 java concurrency

我需要在我的软件中收集一些统计信息,我正在努力使其快速正确,这对我来说并不容易(对我而言!)

首先我的代码到目前为止有两个类,一个StatsService和一个StatsHarvester

public class StatsService
{
private Map<String, Long>   stats   = new HashMap<String, Long>(1000);

public void notify ( String key )
{
    Long value = 1l;
    synchronized (stats)
    {
        if (stats.containsKey(key))
        {
            value = stats.get(key) + 1;
        }
        stats.put(key, value);
    }
}

public Map<String, Long> getStats ( )
{
    Map<String, Long> copy;
    synchronized (stats)
    {
        copy = new HashMap<String, Long>(stats);
        stats.clear();
    }
    return copy;
}
}
Run Code Online (Sandbox Code Playgroud)

这是我的第二个类,一个不时收集统计数据并将它们写入数据库的收集器.

public class StatsHarvester implements Runnable
{
private StatsService    statsService;
private Thread          t;

public void init ( )
{
    t = new Thread(this);
    t.start();
}

public synchronized void run ( )
{
    while (true)
    {
        try
        {
            wait(5 * 60 * 1000); // 5 minutes
            collectAndSave();
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
    }
}

private void collectAndSave ( )
{
    Map<String, Long> stats = statsService.getStats();
    // do something like:
    // saveRecords(stats);
}
}
Run Code Online (Sandbox Code Playgroud)

在运行时,它将有大约30个并发运行的线程,每个线程调用notify(key)大约100次.只有一个StatsHarvester正在打电话statsService.getStats()

所以我有很多作家,只有一位读者.拥有准确的统计数据会很好,但我不在乎是否有一些记录在高并发性上丢失了.

读者应每5分钟或任何合理的时间运行.

写作应该尽可能快.阅读应该很快,但如果它每5分钟锁定约300毫秒,那很好.

我已经阅读了很多文档(实践中的Java并发,有效的Java等),但我有强烈的感觉,我需要你的建议才能做到正确.

我希望我说清楚我的问题,并且足够短以获得有价值的帮助.


编辑

感谢大家的详细和有用的答案.正如我所料,有不止一种方法可以做到这一点.

我测试了大部分建议(我理解的那些)并将测试项目上传到谷歌代码以供进一步参考(maven项目)

http://code.google.com/p/javastats/

我已经测试了我的StatsService的不同实现

  • HashMapStatsService(HMSS)
  • ConcurrentHashMapStatsService(CHMSS)
  • LinkedQueueStatsService(LQSS)
  • GoogleStatsService(GSS)
  • ExecutorConcurrentHashMapStatsService(ECHMSS)
  • ExecutorHashMapStatsService(EHMSS)

我用x每个调用通知y时间的线程数测试它们,结果以毫秒为单位

         10,100   10,1000  10,5000  50,100   50,1000  50,5000  100,100  100,1000 100,5000 
GSS       1        5        17       7        21       117      7        37       254       Summe: 466
ECHMSS    1        6        21       5        32       132      8        54       249       Summe: 508
HMSS      1        8        45       8        52       233      11       103      449       Summe: 910
EHMSS     1        5        24       7        31       113      8        67       235       Summe: 491
CHMSS     1        2        9        3        11       40       7        26       72        Summe: 171
LQSS      0        3        11       3        16       56       6        27       144       Summe: 266
Run Code Online (Sandbox Code Playgroud)

此刻我想我将使用ConcurrentHashMap,因为它提供了良好的性能,同时它很容易理解.

感谢您的输入!Janning

Joh*_*int 16

正如杰克所说,你可以使用包含ConcurrentHashMap和AtomicLong的java.util.concurrent库.如果没有其他原因,您可以将AtomicLong置于其中,您可以增加该值.由于AtomicLong是线程安全的,因此您可以在不担心并发问题的情况下增加变量.

public void notify(String key) {
    AtomicLong value = stats.get(key);
    if (value == null) {
        value = stats.putIfAbsent(key, new AtomicLong(1));
    }
    if (value != null) {
        value.incrementAndGet();
    }
}
Run Code Online (Sandbox Code Playgroud)

这应该既快又线程安全

编辑:稍微重构,因此最多只有两次查找.

  • 根据javadoc,您不需要第一次调用stats.get或第一次空检查.putIfAbsent将返回先前的映射值(在这种情况下应该递增),如果没有先前的值,则返回null,在这种情况下,插入了提供的AtomicLong(1). (3认同)

Jac*_*ack 8

你为什么不用java.util.concurrent.ConcurrentHashMap<K, V>?它在内部处理所有内容,避免地图上无用的锁定并为您节省大量工作:您不必关心get和put上的同步.

从文档:

一个哈希表,支持检索的完全并发性和可更新的预期并发性.该类遵循与Hashtable相同的功能规范,并包括与Hashtable的每个方法相对应的方法版本.但是,即使所有操作都是线程安全的,检索操作也不需要锁定,并且不支持以阻止所有访问的方式锁定整个表.

您可以指定其并发级别:

更新操作之间允许的并发性由可选的concurrencyLevel构造函数参数(缺省值16)引导,该参数用作内部大小调整的提示.该表在内部进行分区,以尝试允许指定数量的并发更新而不会发生争用.因为散列表中的放置基本上是随机的,所以实际的并发性会有所不同.理想情况下,您应该选择一个值来容纳与同时修改表一样多的线程.使用比您需要的更高的值会浪费空间和时间,而显着更低的值可能导致线程争用.但是,在一个数量级内过高估计和低估通常不会产生明显的影响.当知道只有一个线程会修改而其他所有线程只能读取时,值为1是合适的.此外,调整此哈希表或任何其他类型的哈希表是一个相对较慢的操作,因此,在可能的情况下,最好在构造函数中提供预期表大小的估计值.

正如评论中所建议的那样仔细阅读ConcurrentHashMap的文档,特别是当它说明原子操作或非原子操作时.

为了保证原子性,你应该考虑哪些操作是原子的,从ConcurrentMap接口你会知道:

V putIfAbsent(K key, V value)
V replace(K key, V value)
boolean replace(K key,V oldValue, V newValue)
boolean remove(Object key, Object value)
Run Code Online (Sandbox Code Playgroud)

可以安全使用.

  • 此外,AtomicLong/AtomicInteger应被视为地图值,这将消除丢失增量的问题(可能除了初始种群之外,除非密钥都是预先填充的或者包含了对mattIfAbsent的调用,如matt b所指出的那样) ). (4认同)
  • 为了确保这一点,你需要循环直到`replace(key,currentValue,currentValue + 1)`返回true. (2认同)

Chr*_*ail 5

我建议看一下Java的util.concurrent库.我认为你可以更清洁地实现这个解决方案.我认为你根本不需要地图.我建议使用ConcurrentLinkedQueue实现它.每个"生产者"都可以自由地写入此队列而不必担心其他人.它可以将一个对象放在队列中,并提供其统计数据.

收割机可以消耗队列,不断地拉出数据并处理它.然后它可以存储它需要它.