我必须每个客户每秒存储大约250个数值,每小时大约900k个数字.它可能不会是一整天的录音(可能每天5-10个小时),但我会根据客户端ID和读取日期对数据进行分区.最大行长约为22-23M,仍然可以控制.毫无意义,我的计划看起来像这样:
CREATE TABLE measurement (
clientid text,
date text,
event_time timestamp,
value int,
PRIMARY KEY ((clientid,date), event_time)
);
Run Code Online (Sandbox Code Playgroud)
密钥空间的复制因子为2,仅用于测试,snitch是GossipingPropertyFileSnitch和NetworkTopologyStrategy.我知道复制因子3是更多的生产标准.
接下来,我在公司服务器上创建了一个小型集群,三个裸机虚拟化机器,具有2个CPU x 2核心和16GB RAM以及大量空间.我和他们在千兆局域网中.群集基于nodetool运行.
这是我用来测试我的设置的代码:
Cluster cluster = Cluster.builder()
.addContactPoint("192.168.1.100")
.addContactPoint("192.168.1.102")
.build();
Session session = cluster.connect();
DateTime time = DateTime.now();
BlockingQueue<BatchStatement> queryQueue = new ArrayBlockingQueue(50, true);
try {
ExecutorService pool = Executors.newFixedThreadPool(15); //changed the pool size also to throttle inserts
String insertQuery = "insert into keyspace.measurement (clientid,date,event_time,value) values (?, ?, ?, ?)";
PreparedStatement preparedStatement = session.prepare(insertQuery);
BatchStatement …Run Code Online (Sandbox Code Playgroud) 我需要使用Datastax Java驱动程序批量写入Cassandra,这是我第一次尝试使用批处理与datastax Java驱动程序,所以我有一些困惑 -
下面是我的代码,我在其中尝试创建一个Statement对象并将其添加到Batch并将ConsistencyLevel设置为QUORUM.
Session session = null;
Cluster cluster = null;
// we build cluster and session object here and we use DowngradingConsistencyRetryPolicy as well
// cluster = builder.withSocketOptions(socketOpts).withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE)
public void insertMetadata(List<AddressMetadata> listAddress) {
// what is the purpose of unloggedBatch here?
Batch batch = QueryBuilder.unloggedBatch();
try {
for (AddressMetadata data : listAddress) {
Statement insert = insertInto("test_table").values(
new String[] { "address", "name", "last_modified_date", "client_id" },
new Object[] { data.getAddress(), data.getName(), data.getLastModifiedDate(), 1 });
// is this the …Run Code Online (Sandbox Code Playgroud)