我有从Cassandra读取的spark作业,处理/转换/过滤数据,并将结果写入Elasticsearch.我使用docker进行集成测试,而且我遇到了从spark写入Elasticsearch的麻烦.
依赖关系:
"joda-time" % "joda-time" % "2.9.4",
"javax.servlet" % "javax.servlet-api" % "3.1.0",
"org.elasticsearch" % "elasticsearch" % "2.3.2",
"org.scalatest" %% "scalatest" % "2.2.1",
"com.github.nscala-time" %% "nscala-time" % "2.10.0",
"cascading" % "cascading-hadoop" % "2.6.3",
"cascading" % "cascading-local" % "2.6.3",
"com.datastax.spark" %% "spark-cassandra-connector" % "1.4.2",
"com.datastax.cassandra" % "cassandra-driver-core" % "2.1.5",
"org.elasticsearch" % "elasticsearch-hadoop" % "2.3.2" excludeAll(ExclusionRule("org.apache.storm")),
"org.apache.spark" %% "spark-catalyst" % "1.4.0" % "provided"
Run Code Online (Sandbox Code Playgroud)
在我的单元测试中,我可以使用TransportClient连接到elasticsearch来设置我的模板和索引
又名.这有效
val conf = new SparkConf().setAppName("test_reindex").setMaster("local")
.set("spark.cassandra.input.split.size_in_mb", "67108864")
.set("spark.cassandra.connection.host", cassandraHostString)
.set("es.nodes", elasticsearchHostString)
.set("es.port", "9200")
.set("http.publish_host", "")
sc = new …Run Code Online (Sandbox Code Playgroud) scala elasticsearch docker apache-spark elasticsearch-hadoop