我使用datastax作为连接cassandra的客户端.我已经通过Java成功连接到cassandra集群/密钥空间/列系列.我正在尝试,对cassandra专栏家族thriugh java发出一些疑问.对我来说,它适用于简单的查询
ResultSet results = session.execute("select * from demodb.customer where id = 1");
Run Code Online (Sandbox Code Playgroud)
现在我想从用户获取id参数并将其传递给session.execute(); 声明.我该怎么办呢?
我正在尝试评估在我们的应用程序中的一个表中创建的墓碑数量.为此,我试图使用nodetool cfstats.我是这样做的:
create table demo.test(a int, b int, c int, primary key (a));
insert into demo.test(a, b, c) values(1,2,3);
Run Code Online (Sandbox Code Playgroud)
现在我正在制作与上面相同的插页.所以我希望创建3个墓碑.但是在为这个列家族运行cfstats时,我仍然看到没有创建墓碑.
nodetool cfstats demo.test
Average live cells per slice (last five minutes): 0.0
Average tombstones per slice (last five minutes): 0.0
Run Code Online (Sandbox Code Playgroud)
现在我尝试删除记录,但我仍然没有看到任何墓碑被创建.我在这里缺少什么东西?请建议.
BTW其他一些细节,*我们正在使用Java驱动程序的2.1.1版本*我们正在运行Cassandra 2.1.0
特别是我在看这个页面时说:
如果使用轻量级事务来写入分区中的行,则只应使用读取和写入操作的轻量级事务.
我很困惑使用LWT进行读取操作的样子.具体来说,这与每个查询的一致性(和serialConsistency)级别有何关联.
该说明对SERIAL读一致性提出了进一步的问题:
允许读取当前(可能未提交)的数据状态,而无需提出新的添加或更新.
这表明SERIAL用于读取不是"使用LWT".
但是之后
ONE)小于用于写作的serialConsistency,那该怎么办?SERIAL读取都被迫接受参与仲裁和交易算法的惩罚?如果我忽略这个建议并进行串行和非串行读/写.LWT以什么方式失败?
我得到批量写入请求,让我们说来自客户端的20个密钥.我可以在一个批处理中将它们写入C*,也可以以异步方式单独写入它们并等待将来完成它们.
批量写入似乎不是一个goo选项,因为我的插入率很高,如果键属于不同的分区,协调员将不得不做额外的工作.
有没有一种方法可以在datastax java驱动程序中使用它来组合可能属于同一分区的键,然后将它们分成小批量,然后在异步中进行不定期的未记录批量写入.通过这种方式,我可以减少对服务器的rpc调用,同时协调器必须在本地写入.我将使用令牌感知策略.
我已将整个代码库更改Thrift为CQL使用datastax java driver 1.0.1和cassandra 1.2.6..
随着节俭,我从一开始就经常超时,我无法继续......采用CQL,按照我的设计表,我获得了成功,减少了超时....
有了这个,我能够插入大量数据,这些数据与thrift无法合作......但经过一个阶段,数据文件夹大约3.5GB.我经常写入超时异常.即使我再次使用相同的早期工作用例,现在也会抛出超时异常.它的随机工作即使在新设置之后也不会再工作.
CASSADNRA服务器日志
这是cassandra服务器部分日志DEBUG模式,然后我收到错误:
客户例外是:
Caused by: com.datastax.driver.core.exceptions.WriteTimeoutException: Cassandra timeout during write query at consistency ONE (1 replica were required but only 0 acknowledged the write)
at com.datastax.driver.core.exceptions.WriteTimeoutException.copy(WriteTimeoutException.java:54)
at com.datastax.driver.core.ResultSetFuture.extractCauseFromExecutionException(ResultSetFuture.java:214)
at com.datastax.driver.core.ResultSetFuture.getUninterruptibly(ResultSetFuture.java:169)
at com.datastax.driver.core.Session.execute(Session.java:107)
at com.datastax.driver.core.Session.execute(Session.java:76)
Run Code Online (Sandbox Code Playgroud)
基础设施: 16GB机器,8GB堆给cassandra,i7处理器..我使用SINGLE节点cassandra与yaml调整超时,其他一切都是默认的:
使用案例: 我正在运行一个用于存储Cassandra组合(我的项目术语)的用例....当前测试存储250个组合与100个并行线程..每个线程存储一个组合...真实情况我需要支持数十数百万,但需要不同的硬件和多节点集群...
在存储一个组合需要大约2秒,涉及:
100个并行线程并行存储100个组合.
我已经发现WRITE TIMEOUTS的行为是随机的,一段时间它会工作到200 000然后抛出超时,有时甚至不能用于10k组合.随机行为.
我最近开始在我们的Cassandra用例中使用Datastax Java驱动程序......我们将使用Datastax Java驱动程序读取/写入Cassandra ...
我成功地能够使用Datastax Java驱动程序创建Cassandra连接......但是我想知道,在生成环境中是否还有其他设置可以使用Datastax Java驱动程序在与Cassandra建立连接时获得更好的性能?
/**
* Creating Cassandra connection using Datastax driver
*
*/
private DatastaxConnection() {
try{
builder = Cluster.builder();
builder.addContactPoint("some-node");
// Can anybody explain me what does below piece of code do?
builder.poolingOptions().setCoreConnectionsPerHost(
HostDistance.LOCAL,
builder.poolingOptions().getMaxConnectionsPerHost(HostDistance.LOCAL));
// And also what does below piece of code is doing?
cluster = builder
.withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE)
.withReconnectionPolicy(new ConstantReconnectionPolicy(100L))
.build();
StringBuilder s = new StringBuilder();
Set<Host> allHosts = cluster.getMetadata().getAllHosts();
for (Host h : allHosts) {
s.append("[");
s.append(h.getDatacenter());
s.append("-");
s.append(h.getRack());
s.append("-");
s.append(h.getAddress());
s.append("]"); …Run Code Online (Sandbox Code Playgroud) 我正在使用EmbeddedCassandraServerHelper单元测试.这是我的pom
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<version>1.4.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.8.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.jcraft</groupId>
<artifactId>jsch</artifactId>
<version>0.1.54</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>1.10.8</version>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>3.4.0</version>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-mapping</artifactId>
<version>3.4.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.datastax.cassandra/cassandra-driver-extras -->
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-extras</artifactId>
<version>3.4.0</version>
</dependency>
<dependency>
<groupId>org.cassandraunit</groupId>
<artifactId>cassandra-unit</artifactId>
<version>3.3.0.2</version>
<scope>test</scope>
</dependency>
</dependencies>
Run Code Online (Sandbox Code Playgroud)
我正在引用文档
https://github.com/jsevellec/cassandra-unit/wiki/How-to-use-it-in-your-code
首先,我试过了
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private …Run Code Online (Sandbox Code Playgroud) 似乎没有任何直接的方法来了解cassandra中受影响的行以进行更新和删除语句.
例如,如果我有这样的查询:
DELETE FROM xyztable WHERE PKEY IN (1,2,3,4,5,6);
Run Code Online (Sandbox Code Playgroud)
当然,现在,因为我已经通过了6个密钥,很明显会有6行受到影响.
但是,就像在RDBMS世界中一样,有没有办法知道datastax-driver中更新/删除语句中受影响的行?
我读过cassandra在这里没有给出写操作的反馈.
除了我通过谷歌看不到关于这个主题的任何其他讨论.
如果那是不可能的,我可以确定使用上面给出的查询类型,它会删除全部还是不能删除全部?
确切的例外情况如下
com.datastax.driver.core.exceptions.CodecNotFoundException:找不到请求的操作的编解码器:[varchar < - > java.math.BigDecimal]
这些是我使用Spark 1.5 Datastax-cassandra 3.2.1 CDH 5.5.1的软件版本
我试图执行的代码是使用java api的Spark程序,它基本上从hdfs读取数据(csv)并将其加载到cassandra表中.我正在使用spark-cassandra-connector.最初我有很多关于google s guava库冲突的问题,我可以通过对guava库进行着色并构建一个包含所有依赖项的快照jar来解决这个问题.
但是我能够为某些文件加载数据但是对于某些文件我得到了Codec异常.当我研究这个问题时,我在同一个问题上得到了以下线程.
https://groups.google.com/a/lists.datastax.com/forum/#!topic/java-driver-user/yZyaOQ-wazk
https://groups.google.com/a/lists.datastax.com/forum/#!topic/java-driver-user/yZyaOQ-wazk
经过这些讨论之后,我理解的是它是我正在使用的cassandra-driver的错误版本.或者仍然存在与番石榴库相关的类路径问题,因为cassandra 3.0及更高版本使用guava 16.0.1,上面的讨论表明类路径中可能存在较低版本的番石榴.
这是pom.xml文件
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.5.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector-java_2.10</artifactId>
<version>1.5.0-M3</version>
</dependency>
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra-clientutil</artifactId>
<version>3.2.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<relocations>
<relocation>
<pattern>com.google</pattern>
<shadedPattern>com.pointcross.shaded.google</shadedPattern>
</relocation>
</relocations>
<minimizeJar>false</minimizeJar>
<shadedArtifactAttached>true</shadedArtifactAttached>
</configuration>
</execution>
</executions> …Run Code Online (Sandbox Code Playgroud) guava datastax-java-driver datastax cloudera-cdh spark-cassandra-connector
我试图通过调用session.executeAsync()代替session.execute()DB写入来加速我们的代码.
我们有数据库连接可能已关闭的用例,当前之前execute()在连接丢失时抛出异常(集群中没有主机可访问).我们可以捕获这些异常,并在其他地方重试或保存数据等...
有了executeAsync(),它看起来没有任何方法来实现这个用例 - ResultSetFuture需要访问返回的对象来检查结果,这将失去executeAsync()首先使用的目的...
有没有办法在executeAsync()调用的任何地方添加一个监听器(或类似的东西),它会异步通知一些其他代码,因为DB写入失败了?
这是相关的吗?Datastax 1.0.2 Java 1.7.40