我尝试使用Spark和Cassandra Spark Connector将流数据保存到Cassandra中.
我做了类似以下的事情:
创建一个模型类:
public class ContentModel {
String id;
String available_at; //may be null
public ContentModel(String id, String available_at){
this.id=id;
this.available_at=available_at,
}
}
Run Code Online (Sandbox Code Playgroud)
将流内容映射到模型:
JavaDStream<ContentModel> contentsToModel = myStream.map(new Function<String, ContentModel>() {
@Override
public ContentModel call(String content) throws Exception {
String[] parts = content.split(",");
return new ContentModel(parts[0], parts[1]);
}
});
Run Code Online (Sandbox Code Playgroud)
保存:
CassandraStreamingJavaUtil.javaFunctions(contentsToModel).writerBuilder("data", "contents", CassandraJavaUtil.mapToRow(ContentModel.class)).saveToCassandra();
Run Code Online (Sandbox Code Playgroud)
如果某些值是null
我得到以下错误:
com.datastax.spark.connector.types.TypeConversionException: Cannot convert object null to struct.ValueRepr.
Run Code Online (Sandbox Code Playgroud)
有没有办法使用Spark Cassandra Connector存储空值?
我必须按照本教程使用maven为我的Apache Spark应用程序创建一个超级jar.
我已经在pom中设置了所有Spark Dependencies <scope>provided</scope>
.这非常有效,但现在当我在本地运行应用程序时,我收到错误的Spark依赖项错误.
目前我不得不provided
从pom中删除标签.
如何在构建应用程序以供发布时,如何提供提供的spark依赖项?
我使用Intellij作为IDE来开发应用程序.
尝试使用google-cloud节点模块将数据(作为流)插入BigQuery时,我收到ECONNRESET错误.
var writeToBigQuery = function(message,t){
var table = dataset.table(t);
var options ={ "ignoreUnknownValues":true}
table.insert(message,options,insertHandler);
}
var insertHandler = function(err, insertErrors, apiResponse) {
//api error?
if(err){
//notify
console.log("fail to write data: "+ JSON.stringify(err))
return;
}
//insert error?
if(insertErrors.length > 0){
//notify
console.log("fail to write data: "+ JSON.stringify(insertErrors))
return;
}
//row saved successfully!
}
Run Code Online (Sandbox Code Playgroud)
我正在尝试用一些jsons写一个文件.我知道我可以使用bigquery工作,但我需要在将它写入bigquery之前修改一些json的值.
可以是我的本地连接问题吗?
例如,我有一份文件
{
_id: 1,
list:[
{
key: "a",
"value":"ssss"
},
{
key: "b",
"value":"ssss"
},
{
key: "c",
"value":"ssss"
},
]
}
Run Code Online (Sandbox Code Playgroud)
我需要从列表中删除带"c"键的元素.
使用mongo控制台命令,我这样做,并删除正确的密钥
db.test.update({{_id: 1},{"$pull" : {"list" : { "key" : "c"}}},false,false)
Run Code Online (Sandbox Code Playgroud)
有一种方法可以用吗啡吗?
我有两个容器,一个链接到另一个.如下......
docker run -i -t --name container1 ubuntu:trusty
docker run -i -t --name container2 --link container1:aliasc1 ubuntu:trusty /bin/bash
Run Code Online (Sandbox Code Playgroud)
有一个简单的方法来获得的IP container1
从container2
使用环节的别名aliasc1
?
我使用docker-compose编排容器
我正在尝试使用apache-beam创建一个流管道,从google pub/sub读取句子并将这些单词写入Bigquery表.
我正在使用0.6.0
apache-beam版本.
按照这些例子,我做了这个:
public class StreamingWordExtract {
/**
* A DoFn that tokenizes lines of text into individual words.
*/
static class ExtractWords extends DoFn<String, String> {
@ProcessElement
public void processElement(ProcessContext c) {
String[] words = ((String) c.element()).split("[^a-zA-Z']+");
for (String word : words) {
if (!word.isEmpty()) {
c.output(word);
}
}
}
}
/**
* A DoFn that uppercases a word.
*/
static class Uppercase extends DoFn<String, String> {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element().toUpperCase());
}
} …
Run Code Online (Sandbox Code Playgroud) apache-spark ×2
java ×2
apache-beam ×1
cassandra ×1
docker ×1
list ×1
maven ×1
morphia ×1
node.js ×1
updates ×1