Ani*_*ain 4 caching gridgain ignite
我正在使用 Ignite 1.7.0 并正在测试 Apache Ignite 的 write Behind 功能。提出这个问题的动机是为了更好地了解在 Apache Ignite 中启用 write Behind 功能时幕后发生的情况。
我有一个 Ignite 客户端程序,它将在测试缓存中插入 20 个条目(称为“test_cache”)。
Ignite 服务器运行在同一台计算机上,但运行在不同的 JVM 上。
Ignite 缓存具有以下配置设置:
所有其他属性均设置为默认值。
除此之外,还有一个为缓存配置的缓存存储,代码如下:
package com.ignite.genericpoc;
import java.util.Collection;
import java.util.Map;
import javax.cache.Cache.Entry;
import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CacheWriterException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.resources.CacheNameResource;
import org.apache.ignite.resources.IgniteInstanceResource;
public class IgniteStoreTest implements CacheStore<String, String> {
@IgniteInstanceResource
Ignite gridReference;
@CacheNameResource
String cacheName;
@Override
public String load(String key) throws CacheLoaderException {
System.out.println("load method called for the key [ " + key + " ] and cache [ " + cacheName + " ] ");
return null;
}
@Override
public Map<String, String> loadAll(Iterable<? extends String> keys) throws CacheLoaderException {
IgniteCache<String, String> ic = gridReference.cache(cacheName);
int currentKeyNo = 0;
for (String key : keys) {
ic.put(key, "Value:" + currentKeyNo);
currentKeyNo++;
}
System.out.println("Got " + currentKeyNo + " entries");
return null;
}
@Override
public void write(Entry<? extends String, ? extends String> entry) throws CacheWriterException {
System.out.println("Write method called");
}
@Override
public void writeAll(Collection<Entry<? extends String, ? extends String>> entries) throws CacheWriterException {
System.out.println("Write all method called for [ " + entries.size() + " ] entries in the thread "
+ Thread.currentThread().getName());
System.out.println("Entries recieved by " + Thread.currentThread().getName() + " : " + entries.toString());
try {
Thread.sleep(60000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void delete(Object key) throws CacheWriterException {
System.out.println("Delete method called");
}
@Override
public void deleteAll(Collection<?> keys) throws CacheWriterException {
System.out.println("Delete All method called");
}
@Override
public void loadCache(IgniteBiInClosure<String, String> clo, Object... args) throws CacheLoaderException {
System.out.println("Load cache method called with " + args[0].toString());
}
@Override
public void sessionEnd(boolean commit) throws CacheWriterException {
System.out.println("Session End called");
}
}
Run Code Online (Sandbox Code Playgroud)
我故意在 writeAll() 方法中调用 Thread.sleep() 方法,以模拟缓慢的数据库写入。
Ignite 客户端将数据加载到缓存中的代码如下:
package com.ignite.genericpoc;
import java.util.ArrayList;
import java.util.List;
import javax.cache.configuration.FactoryBuilder;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.configuration.CacheConfiguration;
public class IgnitePersistentStoreClientTest {
public static void main(String[] args) throws InterruptedException {
List<String> addressess = new ArrayList<>();
addressess.add("*.*.*.*:47500"); // Hiding the IP
Ignition.setClientMode(true);
Ignite i = IgniteConfigurationUtil.startIgniteServer(
IgniteConfigurationUtil.getIgniteConfiguration(false, IgniteTestConstants.GRID_NAME, addressess));
System.out.println("Client Started");
CacheConfiguration<String, String> ccfg = new CacheConfiguration<>();
ccfg.setName("Persistent_Store_Test_Cache");
ccfg.setCacheStoreFactory(FactoryBuilder.factoryOf(IgniteStoreTest.class));
ccfg.setReadThrough(true);
ccfg.setWriteThrough(true);
ccfg.setWriteBehindEnabled(true);
ccfg.setWriteBehindFlushSize(13);
ccfg.setWriteBehindFlushThreadCount(1);
System.out.println(ccfg.getWriteBehindBatchSize());
IgniteCache<String, String> ic = i.getOrCreateCache(ccfg);
System.out.println("Cache Created");
for (int t = 1; t <= 20; t++) {
System.out.println("Loading key "+t);
ic.put("Key:" + t,"Value: "+t);
System.out.println("Key "+ t + " loaded ");
}
System.out.println("Cache Loaded");
i.close();
}
}
Run Code Online (Sandbox Code Playgroud)
执行过程如下:
首先启动 Ignite 服务器。
加载数据的Ignite Client在服务器之后启动。
由于 writeAll() 方法定义了 60 秒的睡眠时间,因此 Ignite 客户端在写入第 20 个条目时会卡住。
另外,我可以在服务器日志中看到,两个线程调用了 writeAll() 方法,其中 Flush 线程已收到 15 个要写入存储的条目,系统线程已收到 1 个要写入存储的条目。Ignite服务器日志如下:
将[15]条目调用的所有方法写入线程flusher-0-#66%test_grid%
将 [ 1 ] 个条目调用的所有方法写入线程 sys-#22%test_grid%
我可以理解,Ignite Client put 被卡在写入 20 个条目上,因为 Write Behind 缓存已满,并且所有 Flush 线程也忙于写入数据。
以下是我需要清楚认识的几点:
为什么客户端在插入第 20 个条目时被阻止,它应该在插入第 14 个条目时被阻止(基于 13 个条目的最大缓存大小)
为什么只用 15 个条目调用 Flush 线程,而不是全部 19 个条目,因为我没有设置批量大小,它默认为 512。
使用 writeAll() 方法调用的系统线程是否与处理来自 Ignite 客户端以放置第 20 个条目的请求的线程相同。
考虑到我的缓存启用了后写功能,并且写入顺序模式为 PRIMARY_SYNC (默认),并且缓存中没有备份,因此对缓存的任何 put 调用都应被阻止,直到主节点能够提交写入。这是否也意味着能够将该条目放入 Write Behind 缓存中。
如果在服务器中存储条目,Ignite Server 是否会制作该条目的两份副本,一份用于存储,一份用于后写缓存。或者是否使用相同条目的参考。
感谢您耐心阅读问题。如果问题太长,我深表歉意,但内容对于向相关受众阐述情况至关重要。
后写存储在引擎盖下具有反压控制。这意味着如果系统无法处理所有异步操作,则可以将异步操作即时转换为同步。
当底层 write-behind 缓存的大小超过临界大小(flushSize * 1.5)时,将使用正在执行写操作的线程而不是lusherThread。
这就是您在日志中看到这些线程的原因:
考虑到我的缓存启用了后写功能,并且写入顺序模式为 PRIMARY_SYNC (默认),并且缓存中没有备份,因此对缓存的任何 put 调用都应被阻止,直到主节点能够提交写入。这是否也意味着能够将该条目放入 Write Behind 缓存中。
是的,它确实。
如果在服务器中存储条目,Ignite Server 是否会制作该条目的两份副本,一份用于存储,一份用于后写缓存。或者是否使用相同条目的参考。
应使用相同条目的参考。
让我们逐步考虑这个场景:
客户端线程已上传 14 个条目。GridCacheWriteBehindStore
检测到底层缓存中的条目数量超过刷新大小并发送信号以唤醒刷新器线程。请参见GridCacheWriteBehindStore#updateCache()
ConcurrentLinkedHashMap
lusher 线程唤醒并尝试通过 .write-behind 缓存(即 )获取数据write-behind-cache.entrySet().iterator()
。该迭代器提供弱一致的遍历,即不能保证它反映构造后的任何修改。重要的是客户端线程并行放置新条目。
客户端线程放置最后一个值[key=Key:20, val=Value: 20]
。同时,flusher线程被Thread.sleep()
inwriteAll()
方法阻塞。
GridCacheWriteBehindStore
检测到当前 write-behind 缓存大小超过临界大小(flush size * 1.5),因此应使用背压机制。
GridCacheWriteBehindStore
调用flushSingleValue()
方法以从 write-behind 缓存中刷新最旧的值(当然,该值不应由刷新线程之前获取)。
flushSingleValue()
方法在客户端线程的上下文中调用。
之后,刷新器线程唤醒并处理剩余的条目。
我希望这对理解 write-behind store 的实现有所帮助。
谢谢!