正确使用 Hazelcast EntryProcessor

wil*_*ing 1 java multithreading locking hazelcast hazelcast-imap

我们正在尝试找出在不使用悲观锁定的情况下使用 Hazelcast 的 IMap 的最佳方法。

EntryProcessor 似乎是正确的选择,但是我们需要应用两种不同类型的操作:当 containsKey 为假时“创建”,以及当 containsKey 为真时“更新”。

我如何利用 EntryProcessor 来支持这些逻辑检查?

如果两个线程同时命中 containsKey() 并且返回 false 给它们,我不希望它们都创建密钥。我希望第二个线程改为应​​用更新。

这是我们到目前为止所拥有的:

public void put(String key, Object value) {

    IMap<String, Object> map = getMap();

    if (!map.containsKey(key)) {
           // create key here
    } else {
        // update existing value here
        // ...
        map.executeOnKey(key, new TransactionEntryProcessor({my_new_value}));
    }
}

private static class MyEntryProcessor implements
        EntryProcessor<String, Object>, EntryBackupProcessor<String, Object>, Serializable {

    private static final long serialVersionUID = // blah blah

    private static final ThreadLocal<Object> entryToSet = new ThreadLocal<>();

    MyEntryProcessor(Object entryToSet) {
        MyEntryProcessor.entryToSet.set(entryToSet);
    }

    @Override
    public Object process(Map.Entry<String, Object> entry) {
        entry.setValue(entryToSet.get());
        return entry.getValue();
    }

    @Override
    public EntryBackupProcessor<String, Object> getBackupProcessor() {
        return MyEntryProcessor.this;
    }

    @Override
    public void processBackup(Map.Entry<String, Object> entry) {
        entry.setValue(entryToSet.get());
    }
}
Run Code Online (Sandbox Code Playgroud)

可以看到两个线程可以同时进入put方法并调用containsKey。第二个将覆盖第一个的结果。

小智 6

根据定义,EntryProcessor 是在条目本身上执行的处理逻辑,消除了对值进行序列化/反序列化的需要。在内部,EP 由分区线程执行,其中一个分区线程负责多个分区。当 EP 进入 HC 时,它由密钥所属分区的所有者线程选取。一旦处理完成,分区线程就准备好接受并执行其他任务(很可能是同一个密钥的同一个EP,由另一个线程提交)。因此,看起来似乎如此,但不应将 EP 用作悲观锁定的替代方案。

如果您坚持并且非常热衷于为此使用 EP,那么您可以尝试在process方法中放置一个空检查。像这样的东西:

    public Object process(Map.Entry<String, Object> entry) {
        if(null == entry.getValue()) {
            entry.setValue("value123");
        }
        return entry.getValue();
    }
Run Code Online (Sandbox Code Playgroud)

这样会发生两件事: 1. 另一个线程将等待分区线程再次可用 2. 由于该值已经存在,您不会覆盖任何内容

  • 扩展 AbstractEntryProcessor 为您处理备份案例 (2认同)