我得到批量写入请求,让我们说来自客户端的20个密钥.我可以在一个批处理中将它们写入C*,也可以以异步方式单独写入它们并等待将来完成它们.
批量写入似乎不是一个goo选项,因为我的插入率很高,如果键属于不同的分区,协调员将不得不做额外的工作.
有没有一种方法可以在datastax java驱动程序中使用它来组合可能属于同一分区的键,然后将它们分成小批量,然后在异步中进行不定期的未记录批量写入.通过这种方式,我可以减少对服务器的rpc调用,同时协调器必须在本地写入.我将使用令牌感知策略.
如果我没有错,可以连接到Cassandra集群,知道集群中至少有一个节点,然后可以发现其他节点.
假设我有三个节点(1,2和3),我连接到这些节点,如下所示:
Cluster.builder().addContactPoints("1,2,3".split(",")).build();
然后,如果节点3例如关闭,并且无法解析IP,则此行代码将IllegalArgumentException按照文档中的说明抛出:
@throws IllegalArgumentException if no IP address for at least one of {@code addresses} could be found
为什么有人想要这种行为?我的意思是,如果其中一个节点关闭,我希望应用程序能够运行,因为Cassandra仍然正常工作.
我检查了这个Cassandra Java驱动程序:有多少接触点是合理的? 但这并没有回答我的问题,因为它没有说出任何关于主机的信息,而无法达到.
我该怎么处理?也许这在另一个版本的java驱动程序中有所改变?我目前正在使用cassandra-driver-core-3.0.3
这是我想要做的.
我创建了两个DataStax企业集群节点,在此基础上我创建了一个java程序来获取一个表的计数(Cassandra数据库表).
这个程序是在eclipse中构建的,实际上是从一个Windows框中.
在从Windows运行此程序时,它在运行时失败并出现以下错误:
初始工作没有接受任何资源; 检查群集UI以确保工作人员已注册并具有足够的内存
已经在这些集群上成功编译和运行相同的代码而没有任何问题.可能是什么原因导致错误?
码:
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SchemaRDD;
import org.apache.spark.sql.cassandra.CassandraSQLContext;
import com.datastax.bdp.spark.DseSparkConfHelper;
public class SparkProject {
public static void main(String[] args) {
SparkConf conf = DseSparkConfHelper.enrichSparkConf(new SparkConf()).setMaster("spark://10.63.24.14X:7077").setAppName("DatastaxTests").set("spark.cassandra.connection.host","10.63.24.14x").set("spark.executor.memory", "2048m").set("spark.driver.memory", "1024m").set("spark.local.ip","10.63.24.14X");
JavaSparkContext sc = new JavaSparkContext(conf);
CassandraSQLContext cassandraContext = new CassandraSQLContext(sc.sc());
SchemaRDD employees = cassandraContext.sql("SELECT * FROM portware_ants.orders");
//employees.registerTempTable("employees");
//SchemaRDD managers = cassandraContext.sql("SELECT symbol FROM employees");
System.out.println(employees.count());
sc.stop();
}
}
Run Code Online (Sandbox Code Playgroud) 我正在使用Cassandra Java驱动程序中的QueryBuilder来插入一些数据.
似乎没有简单的选项来设置我插入的行上的TTL.DataStax网站上的所有示例都没有显示TTL的示例.
我怀疑它与使用()有关,但再一次,代码中没有示例和最少的文档......
Row row = DataSession._getSession().execute("select count (*) from sivri_service.bronzelist").one();
int expected = row.getVarint("count").intValue();
Run Code Online (Sandbox Code Playgroud)
我试图从表中获取计数,但我似乎无法通过此异常:com.datastax.driver.core.exceptions.InvalidTypeException:列计数的类型为bigint
我正在尝试获取Cassandra列族中的键值对的数量.以下是我使用的代码.
PreparedStatement statement = client.session
.prepare("select count(*) from corpus.word_usage");
ResultSet results = client.session.execute(statement.bind());
Row row = results.one();
System.out.println(row.getVarint(0));
Run Code Online (Sandbox Code Playgroud)
但是,当我运行此代码时,我遇到了异常.
Exception in thread "main" com.datastax.driver.core.exceptions.InvalidTypeException: Column count is of type bigint
at com.datastax.driver.core.ColumnDefinitions.checkType(ColumnDefinitions.java:291)
at com.datastax.driver.core.ArrayBackedRow.getVarint(ArrayBackedRow.java:185)
at SimpleClient.main(SimpleClient.java:57)
Run Code Online (Sandbox Code Playgroud)
根据datastax文档(http://www.datastax.com/drivers/java/2.0/com/datastax/driver/core/Row.html),getVarint应该返回一个BigInteger.那么为什么我在这里得到例外?我做错了什么?
我运行Cassandra docker容器:
docker pull cassandra
run --name cassandra -p 9042:9042 -p 9160:9160 -d cassandra
Run Code Online (Sandbox Code Playgroud)
netstat -tpln是:
Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name
LISTEN - tcp6 0 0 [::]:9160 [::]:*
LISTEN - tcp6 0 0 [::]:9042 [::]:*
Run Code Online (Sandbox Code Playgroud)
从本地cqlsh连接到C*是好的:
docker exec -it cassandra /bin/bash
#cqlsh
Connected to Test Cluster at 127.0.0.1:9042.
[cqlsh 5.0.1 | Cassandra 3.1.1 | CQL spec 3.3.1 | Native protocol v4]
Use HELP for help.
cqlsh> show host
Connected to Test Cluster at 127.0.0.1:9042.
Run Code Online (Sandbox Code Playgroud)
我安装本地cqlsh: …
正如标题中所述,我想知道是否有必要激发提交*.jar?
我正在使用Datastax Enterprise Cassandra一段时间,但现在我也需要使用Spark.我观看了几乎所有来自DS320的视频:使用Apache Spark的DataStax Enterprise Analytics,并且没有任何关于从Java应用程序远程连接到spark的信息.
现在我有3个DSE运行节点.我可以从火花壳连接到Spark.但是在尝试从java代码连接Spark后2天我放弃了.
这是我的Java代码
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("AppName");
//sparkConf.set("spark.shuffle.blockTransferService", "nio");
//sparkConf.set("spark.driver.host", "*.*.*.*");
//sparkConf.set("spark.driver.port", "7007");
sparkConf.setMaster("spark://*.*.*.*:7077");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
Run Code Online (Sandbox Code Playgroud)
连接结果
16/01/18 14:32:43 ERROR TransportResponseHandler: Still have 2 requests outstanding when connection from *.*.*.*/*.*.*.*:7077 is closed
16/01/18 14:32:43 WARN AppClient$ClientEndpoint: Failed to connect to master *.*.*.*:7077
java.io.IOException: Connection from *.*.*.*/*.*.*.*:7077 closed
at org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:124)
at org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:94)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) …Run Code Online (Sandbox Code Playgroud) 我正在使用Python cassandra驱动程序来连接和查询我们的Cassandra集群.
我想通过Pandas操纵我的数据,cassandra驱动程序的文档中有一个区域提到了这一点:https: //datastax.github.io/python-driver/api/cassandra/protocol.html
NumpyProtocolHander:将结果直接反序列化为NumPy数组.这有助于与分析工具包(如Pandas)的高效集成.
按照上面的说明并在Cassandra中进行SELECT查询,我可以看到输出(通过type()函数)为:
<class 'cassandra.cluster.ResultSet'>
Run Code Online (Sandbox Code Playgroud)
迭代结果,这就是打印出来的行:
{u'reversals_rejected': array([0, 0]), u'revenue': array([ 0, 10]), u'reversals_revenue': array([0, 0]), u'rejected': array([3, 1]), u'impressions_positive': array([3, 3]), u'site_user_id': array([226226, 354608], dtype=int32), u'error': array([0, 0]), u'impressions_negative': array([0, 0]), u'accepted': array([0, 2])}
Run Code Online (Sandbox Code Playgroud)
(我限制了查询结果,我正在使用更大量的数据 - 因此想要使用numpy和pandas).
我对熊猫的了解有限,我试图运行非常基本的功能:
rslt = cassandraSession.execute("SELECT accepted FROM table")
test = rslt[["accepted"]].head(1)
Run Code Online (Sandbox Code Playgroud)
这会输出以下错误:
Traceback (most recent call last):
File "/UserStats.py", line 27, in <module>
test = rslt[["accepted"]].head(1)
File "cassandra/cluster.py", line 3380, in cassandra.cluster.ResultSet.__getitem__ (cassandra/cluster.c:63998)
TypeError: list indices must be …Run Code Online (Sandbox Code Playgroud) 确切的例外情况如下
com.datastax.driver.core.exceptions.CodecNotFoundException:找不到请求的操作的编解码器:[varchar < - > java.math.BigDecimal]
这些是我使用Spark 1.5 Datastax-cassandra 3.2.1 CDH 5.5.1的软件版本
我试图执行的代码是使用java api的Spark程序,它基本上从hdfs读取数据(csv)并将其加载到cassandra表中.我正在使用spark-cassandra-connector.最初我有很多关于google s guava库冲突的问题,我可以通过对guava库进行着色并构建一个包含所有依赖项的快照jar来解决这个问题.
但是我能够为某些文件加载数据但是对于某些文件我得到了Codec异常.当我研究这个问题时,我在同一个问题上得到了以下线程.
https://groups.google.com/a/lists.datastax.com/forum/#!topic/java-driver-user/yZyaOQ-wazk
https://groups.google.com/a/lists.datastax.com/forum/#!topic/java-driver-user/yZyaOQ-wazk
经过这些讨论之后,我理解的是它是我正在使用的cassandra-driver的错误版本.或者仍然存在与番石榴库相关的类路径问题,因为cassandra 3.0及更高版本使用guava 16.0.1,上面的讨论表明类路径中可能存在较低版本的番石榴.
这是pom.xml文件
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.5.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector-java_2.10</artifactId>
<version>1.5.0-M3</version>
</dependency>
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra-clientutil</artifactId>
<version>3.2.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<relocations>
<relocation>
<pattern>com.google</pattern>
<shadedPattern>com.pointcross.shaded.google</shadedPattern>
</relocation>
</relocations>
<minimizeJar>false</minimizeJar>
<shadedArtifactAttached>true</shadedArtifactAttached>
</configuration>
</execution>
</executions> …Run Code Online (Sandbox Code Playgroud) guava datastax-java-driver datastax cloudera-cdh spark-cassandra-connector
datastax ×10
cassandra ×8
java ×6
apache-spark ×2
biginteger ×1
cloudera-cdh ×1
cqlsh ×1
docker ×1
guava ×1
insert ×1
numpy ×1
pandas ×1
python ×1
resultset ×1
ttl ×1