我试图通过使用以下方法过滤大型Cassandra表的一小部分:
val snapshotsFiltered = sc.parallelize(startDate to endDate).map(TableKey(_2)).joinWithCassandraTable("listener","snapshots_test_b")
Run Code Online (Sandbox Code Playgroud)
我想映射cassandra表中作为分区键的一部分的'created'列中的行.
我的表键(表的分区键)定义为:
case class TableKey(imei: String, created: Long, when: Long)
Run Code Online (Sandbox Code Playgroud)
结果是错误:
[error] /home/ubuntu/scala/test/test.scala:61:没有足够的方法适用于方法:( imei:String,created:Long)test.TableKey in object TableKey.[error]已创建未指定的值参数.[error] val snapshotsFiltered = sc.parallelize(startDate to endDate).map(TableKey(_2)).joinWithCassandraTable("listener","snapshots_test_b")[error] ^ [error]发现一个错误[error](编译:编译) )编译失败
它只与文档中的分区键中的一个对象一起使用.
为什么多分区密钥有问题? - 已回答.
编辑:我试图以正确的形式使用joinWithCassandraTable:
val snapshotsFiltered = sc.parallelize(startDate to endDate).map(TableKey("*",_,startDate)).joinWithCassandraTable("listener","snapshots_test_c")
Run Code Online (Sandbox Code Playgroud)
当我试图在Spark上运行时没有错误,但它永远停留在"[阶段0:>(0 + 2)/ 2]"
出了什么问题?
我试图通过使用以下方法过滤大型C*表的一小部分:
val snapshotsFiltered = sc.parallelize(startDate to endDate).map(TableKey(_)).joinWithCassandraTable("listener","snapshots_tspark")
println("Done Join")
//*******
//get only the snapshots and create rdd temp table
val jsons = snapshotsFiltered.map(_._2.getString("snapshot"))
val jsonSchemaRDD = sqlContext.jsonRDD(jsons)
jsonSchemaRDD.registerTempTable("snapshots_json")
Run Code Online (Sandbox Code Playgroud)
附:
case class TableKey(created: Long) //(created, imei, when)--> created = partititon key | imei, when = clustering key
Run Code Online (Sandbox Code Playgroud)
cassandra表模式是:
CREATE TABLE listener.snapshots_tspark (
created timestamp,
imei text,
when timestamp,
snapshot text,
PRIMARY KEY (created, imei, when) ) WITH CLUSTERING ORDER BY (imei ASC, when ASC)
AND bloom_filter_fp_chance = 0.01
AND caching = …Run Code Online (Sandbox Code Playgroud) cqlsh不允许嵌套查询,所以我不能选择导出数据到CSV ..我想选择的数据(约200,000行与一列)使用卡桑德拉出口:
echo "SELECT distinct imei FROM listener.snapshots;" > select.cql
bin/cqlsh -f select.cql > output.txt
它只是永远没有任何错误,并且文件没有增长.
如果我在最后一行使用strace,我会得到很多行:
select(0, NULL, NULL, NULL, {0, 2000}) = 0 (Timeout)
select(0, NULL, NULL, NULL, {0, 4000}) = 0 (Timeout)
select(0, NULL, NULL, NULL, {0, 8000}) = 0 (Timeout)
select(0, NULL, NULL, NULL, {0, 16000}) = 0 (Timeout)
select(0, NULL, NULL, NULL, {0, 32000}) = 0 (Timeout)
select(0, NULL, NULL, NULL, {0, 1000}) = 0 (Timeout)
select(0, NULL, NULL, NULL, {0, 2000}) = 0 (Timeout) …Run Code Online (Sandbox Code Playgroud) 当我尝试运行datastax spark-sql-thriftserver时,我收到这些错误:
dse spark-sql-thriftserver start \
--conf spark.cores.max=10 \
--conf spark.executor.memory=2g \
--hiveconf hive.server2.thrift.port=10001
Run Code Online (Sandbox Code Playgroud)
Spark命令:/opt/jdk1.8.0_112/jre//bin/java -cp
/etc/dse/spark/:/usr/share/dse/spark/jars/*:/etc/dse/hadoop2-client/ -Djava.library.path=/usr/share/dse/hadoop2-client/lib/native:/usr/share/dse/cassandra/lib/sigar-bin: -Dcassandra.logdir=/var/log/cassandra -XX:MaxHeapFreeRatio=50 -XX:MinHeapFreeRatio=20 -Dguice_include_stack_traces=OFF -Ddse.system_memory_in_mb=32174 -Dcassandra.config.loader=com.datastax.bdp.config.DseConfigurationLoader -Dlogback.configurationFile=/etc/dse/spark/logback-spark.xml -Dcassandra.logdir=/var/log/cassandra -Ddse.client.configuration.impl=com.datastax.bdp.transport.client.HadoopBasedClientConfiguration -Dderby.stream.error.method=com.datastax.bdp.derby.LogbackBridge.getLogger -Xmx1024M org.apache.spark.deploy.SparkSubmit --conf spark.executor.memory=2g --conf spark.cores.max=10 --class
org.apache.spark.sql.hive.thriftserver.HiveThriftServer2
spark-internal --hiveconf hive.server2.thrift.port=10001
======================================== WARN 2017-05-07 22:21:55 org.apache.spark.SparkContext: Use an existing SparkContext, some
configuration may not take effect. ERROR 2017-05-07 22:22:04
org.apache.spark.deploy.DseSparkSubmitBootstrapper: Failed to start or submit Spark application java.lang.NoSuchMethodError:
org.apache.hive.service.cli.operation.LogDivertAppender.setWriter(Ljava/io/Writer;)V at
org.apache.hive.service.cli.operation.LogDivertAppender.(LogDivertAppender.java:166) ~[spark-hive-thriftserver_2.11-2.0.2.6.jar:2.0.2.6] at
org.apache.hive.service.cli.operation.OperationManager.initOperationLogCapture(OperationManager.java:85) ~[spark-hive-thriftserver_2.11-2.0.2.6.jar:2.0.2.6] at
org.apache.hive.service.cli.operation.OperationManager.init(OperationManager.java:63) ~[spark-hive-thriftserver_2.11-2.0.2.6.jar:2.0.2.6] at
org.apache.spark.sql.hive.thriftserver.ReflectedCompositeService$$anonfun$initCompositeService$1.apply(SparkSQLCLIService.scala:79) ~[spark-hive-thriftserver_2.11-2.0.2.6.jar:2.0.2.6] at …
如果我有一个多DC集群,DC1和DC2,其中DC2仅用于故障转移.在客户端的驱动程序中,我使用域名定义联系点(foo1.net, foo2.net, and foo3.net).我有foo*指向DC1,如果我检测到任何DC1错误,我将使DNS路由foo*指向DC2.
这种方法似乎在纸上工作,但它实际上会起作用吗?这种方法有什么问题吗?
我想使用solr_query查找结果,其中地图类型的字段包含给定的键值对。
我正在尝试创建与此类似的查询:
SELECT * FROM my_table WHERE pathId= 5 AND solr_query='validity: [1970-01-01T00:01:00 TO *] ’ AND metadata[1] = '2' LIMIT 1 ALLOW FILTERING;
Run Code Online (Sandbox Code Playgroud)
要么
SELECT * FROM my_table where metadata['1'] = '2' AND solr_query=$${ "q": "pathid:5", "fq": "validity:[1970-01-01T00:01:00 TO *]";
Run Code Online (Sandbox Code Playgroud)
但是每次我得到ServerError: java.lang.IllegalArgumentException: Search queries must have only one index expression.错误。
我希望能够以某种方式在'where'子句- pathid validity和中查询这3个条件metadata。是否可以查询在内部包含给定键值对的映射solr_query,或者还有其他方法可以做到这一点?
我在字段上创建了搜索索引:
create SEARCH index on my_table with columns validity, pathId, metadata;
Run Code Online (Sandbox Code Playgroud) 我是Cassandra的新手 - 我一直在单个节点上搜索与Cassandra中的提交和崩溃恢复相关的信息.而且,希望有人能澄清细节.
我正在测试Cassandra - 所以,将它设置在一个节点上.我在datastax上使用stresstool插入数百万行.如果出现电气故障或系统关闭会发生什么?在Cassandra重新启动时,Cassandra内存中的所有数据都会被写入磁盘(我猜commitlog充当中介)吗?这个过程需要多长时间?
谢谢!
有没有人知道如何在使用amazon ec2 M3.Xlarge机器的集群中使用Datastax enterprise(使用opscenter)?当我尝试使用这些类型的实例(使用ssd)时,我收到以下错误:启动实例失败.指定的节点大小无效
如果我使用M1.Xlarge它可以正常工作.
我正在使用cassandra-connector将数据从cassandra分区读取到spark.我尝试了以下解决方案来读取partitions.I尝试通过尽可能多地创建rdds来并行化任务,但解决方案ONE和解决方案TWO都具有相同的性能.
在解决方案ONE中,我可以立即看到spark UI中的各个阶段.我试图在解决方案TWO中避免一个for循环.
在解决方案TWO中,阶段在相当长的时间之后出现.随着用户ID的数量增加,在阶段出现在解决方案TWO的火花UI中之前,时间显着增加.
Version
spark - 1.1
Dse - 4.6
cassandra-connector -1.1
Setup
3 - Nodes with spark cassandra
Each node has 1 core dedicated to this task.
512MB ram for the executor memory.
Run Code Online (Sandbox Code Playgroud)
我的cassandra表架构,
CREATE TABLE test (
user text,
userid bigint,
period timestamp,
ip text,
data blob,
PRIMARY KEY((user,userid,period),ip)
);
Run Code Online (Sandbox Code Playgroud)
val users = List("u1","u2","u3")
val period = List("2000-05-01","2000-05-01")
val partitions = users.flatMap(x => period.map(y => (x,y))))
val userids = 1 to 10
for (userid <- userids){ …Run Code Online (Sandbox Code Playgroud) 我有一个场景,我将接收由我的火花流程序处理的流数据,并且每个间隔的输出将附加到我现有的cassandra表中.
目前我的火花流程序将生成一个数据框,我需要保存在我的cassandra表中.我目前面临的问题是当我使用下面的命令时,我无法将数据/行附加到我现有的cassandra表中
dff.write.format("org.apache.spark.sql.cassandra").options(Map("table" -> "xxx", "yyy" -> "retail")).save()
Run Code Online (Sandbox Code Playgroud)
我已阅读以下链接http://rustyrazorblade.com/2015/08/migrating-from-mysql-to-cassandra-using-spark/,他将mode ="append"传递给save方法但其抛出语法错误
此外,我还能够从以下链接了解我需要修复的位置 https://groups.google.com/a/lists.datastax.com/forum/#!topic/spark-connector-user/rlGGWQF2wnM
如何解决这个问题需要帮助.我正在scala中编写我的spark流媒体作业
cassandra datastax-enterprise apache-spark spark-streaming apache-spark-sql