我正在使用elasticsearch中加载的一些测试数据在本地计算机上测试ElasticSearch和Spark集成.
val sparkConf = new SparkConf().setAppName("Test").setMaster("local")
val sc = new SparkContext(sparkConf)
val conf = new JobConf()
conf.set("spark.serializer", classOf[KryoSerializer].getName)
conf.set("es.nodes", "localhost:9200")
conf.set("es.resource", "bank/account")
conf.set("es.query", "?q=firstname:Daniel")
val esRDD = sc.hadoopRDD(conf,classOf[EsInputFormat[Text, MapWritable]],
classOf[Text], classOf[MapWritable])
esRDD.first()
esRDD.collect()
Run Code Online (Sandbox Code Playgroud)
代码运行正常并使用esRDD.first()成功返回正确的结果
但是,esRDD.collect()将生成异常:
java.io.NotSerializableException: org.apache.hadoop.io.Text
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:71)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:193)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)
我相信这与这里提到的问题有关http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html 所以我相应地添加了这一行
conf.set("spark.serializer", classOf[KryoSerializer].getName)
Run Code Online (Sandbox Code Playgroud)
我应该做些什么来让它发挥作用吗?谢谢
更新:序列化设置问题已解决.通过使用
sparkConf.set("spark.serializer", classOf[KryoSerializer].getName)
Run Code Online (Sandbox Code Playgroud)
代替
conf.set("spark.serializer", classOf[KryoSerializer].getName)
Run Code Online (Sandbox Code Playgroud)
现在还有另一个此数据集中有1000条不同的记录
esRDD.count()
Run Code Online (Sandbox Code Playgroud)
但是,返回1000没问题 …
serialization elasticsearch apache-spark elasticsearch-hadoop
原标题:除了HDFS之外,还有什么其他DFS能够引发支持(并且被推荐)?
我很高兴地使用spark和elasticsearch(带有elasticsearch-hadoop驱动程序)和几个巨大的集群.
我不时会将整个数据集拉出来,处理每个文档,并将所有数据放入不同的Elasticsearch(ES)集群中(是的,数据迁移也是如此).
目前,无法将集群中的ES数据读入RDD,并使用spark + elasticsearch-hadoop将SparkContextRDD 写入不同的RDD ,因为这将涉及从RDD 交换.所以我想将RDD写入目标文件,然后再将它们读回到具有不同SparkContexts 的RDD中.
但是,问题出现了:我需要一个DFS(分布式文件系统)来共享整个spark集群中的大文件.最流行的解决方案是HDFS,但我会非常避免将Hadoop引入我的堆栈.是否还有其他推荐的DFS可以支持火花?
在下面更新
感谢@Daniel Darabos在下面的回答,我现在可以使用以下Scala代码从/向不同的ElasticSearch集群读取和写入数据:
val conf = new SparkConf().setAppName("Spark Migrating ES Data")
conf.set("es.nodes", "from.escluster.com")
val sc = new SparkContext(conf)
val allDataRDD = sc.esRDD("some/lovelydata")
val cfg = Map("es.nodes" -> "to.escluster.com")
allDataRDD.saveToEsWithMeta("clone/lovelydata", cfg)
Run Code Online (Sandbox Code Playgroud) microsoft-distributed-file-system hdfs elasticsearch apache-spark elasticsearch-hadoop
import org.elasticsearch.spark._
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.serializer._;
import com.esotericsoftware.kryo.Kryo;
import org.elasticsearch.spark.rdd.EsSpark
sc.stop()
val conf = new SparkConf()
conf.set("es.index.auto.create","true")
conf.set("spark.serializer", classOf[KryoSerializer].getName)
conf.set("es.nodes","localhost")
val sc = new SparkContext(conf)
val getAllQuery = "{\"query\":{\"match_all\":{}}}"
val esRDDAll = sc.esRDD("test-index/typeA", getAllQuery)
//WORKS
esRDDAll.count
//WORKS
EsSpark.saveToEs(esRDDAll, "output-index/typeB")
val esRDDMap = esRDDAll.map(r => r)
//FAILS
esRDDMap.count
//FAILS
EsSpark.saveToEs(esRDDMap, "output-index/typeB")
Run Code Online (Sandbox Code Playgroud)
我得到的错误是:
WARN TaskSetManager: Lost task 0.0 in stage 4.0 (TID 41, localhost): java.lang.ClassNotFoundException: $line594.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613) …Run Code Online (Sandbox Code Playgroud) 我有从Cassandra读取的spark作业,处理/转换/过滤数据,并将结果写入Elasticsearch.我使用docker进行集成测试,而且我遇到了从spark写入Elasticsearch的麻烦.
依赖关系:
"joda-time" % "joda-time" % "2.9.4",
"javax.servlet" % "javax.servlet-api" % "3.1.0",
"org.elasticsearch" % "elasticsearch" % "2.3.2",
"org.scalatest" %% "scalatest" % "2.2.1",
"com.github.nscala-time" %% "nscala-time" % "2.10.0",
"cascading" % "cascading-hadoop" % "2.6.3",
"cascading" % "cascading-local" % "2.6.3",
"com.datastax.spark" %% "spark-cassandra-connector" % "1.4.2",
"com.datastax.cassandra" % "cassandra-driver-core" % "2.1.5",
"org.elasticsearch" % "elasticsearch-hadoop" % "2.3.2" excludeAll(ExclusionRule("org.apache.storm")),
"org.apache.spark" %% "spark-catalyst" % "1.4.0" % "provided"
Run Code Online (Sandbox Code Playgroud)
在我的单元测试中,我可以使用TransportClient连接到elasticsearch来设置我的模板和索引
又名.这有效
val conf = new SparkConf().setAppName("test_reindex").setMaster("local")
.set("spark.cassandra.input.split.size_in_mb", "67108864")
.set("spark.cassandra.connection.host", cassandraHostString)
.set("es.nodes", elasticsearchHostString)
.set("es.port", "9200")
.set("http.publish_host", "")
sc = new …Run Code Online (Sandbox Code Playgroud) scala elasticsearch docker apache-spark elasticsearch-hadoop
我想知道是否有人有使用Elasticsearch for Hadoop库配置Kubernetes集群的经验。尝试从Spark写入Elasticsearch时,我遇到了节点发现超时的问题。由于ES 的elasticsearch-cloud-kubernetes插件可以处理发现,因此我可以启动Elasticsearch并运行,但是我不确定如何最好地配置Elasticsearch-hadoop来了解kubernetes集群中的节点(荚)。我尝试设置spark.es.nodes为es客户端服务,但这似乎不起作用。我也知道我可以启用es.nodes.wan.only,但是正如文档中所述,这将严重影响性能,这使它们无法在同一群集上运行的目的实现了。任何帮助,将不胜感激。
hadoop elasticsearch apache-spark kubernetes elasticsearch-hadoop
按照接受的答案在pyspark collect_set或GROUPBY collect_list,当你做一个collect_list特定列,在null此列值将被删除。我已经检查过了,这是真的。
但就我而言,我需要保留null列-如何实现此目的?
我没有找到有关此类collect_list功能变体的任何信息。
解释我为什么要空值的背景上下文:
我有一个数据框df如下:
cId | eId | amount | city
1 | 2 | 20.0 | Paris
1 | 2 | 30.0 | Seoul
1 | 3 | 10.0 | Phoenix
1 | 3 | 5.0 | null
Run Code Online (Sandbox Code Playgroud)
我想使用以下映射将其写入Elasticsearch索引:
"mappings": {
"doc": {
"properties": {
"eId": { "type": "keyword" },
"cId": { "type": "keyword" },
"transactions": {
"type": "nested",
"properties": {
"amount": { "type": …Run Code Online (Sandbox Code Playgroud) nested collect elasticsearch-mapping elasticsearch-hadoop pyspark-sql
我正在尝试elasticsearch使用elasticsearch-spark连接器进行查询,但我只想返回几个结果:
例如:
val conf = new SparkConf().set("es.nodes","localhost").set("es.index.auto.create", "true").setMaster("local")
val sparkContext = new SparkContext(conf)
val query = "{\"size\":1}"
println(sparkContext.esRDD("index_name/type", query).count())
Run Code Online (Sandbox Code Playgroud)
但是,这将返回索引中的所有文档。
我不知道如何使用 Spark 中的 python 将数据帧写入 Elasticsearch。我从这里开始遵循步骤。
这是我的代码:
# Read file
df = sqlContext.read \
.format('com.databricks.spark.csv') \
.options(header='true') \
.load('/vagrant/data/input/input.csv', schema = customSchema)
df.registerTempTable("data")
# KPIs
kpi1 = sqlContext.sql("SELECT * FROM data")
es_conf = {"es.nodes" : "10.10.10.10","es.port" : "9200","es.resource" : "kpi"}
kpi1.rdd.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_conf)
Run Code Online (Sandbox Code Playgroud)
上面的代码给出了
原因:net.razorvine.pickle.PickleException:构造 ClassDict 时预期参数为零(对于 pyspark.sql.types._create_row)
我还从以下位置启动了脚本:
spark-submit --master spark://aggregator:7077 --jars ../jars/elasticsearch-hadoop-2.4.0/dist/elasticsearch-hadoop-2.4.0.jar /vagrant/scripts/aggregation.py以确保elasticsearch-hadoop已加载
我不完全清楚es-hadoop是什么来自描述.
这仅仅是一个"连接器",它将数据从ES群集移动到HDFS以进行Hadoop分析吗?如果是这样,为什么不与HBase一起进行低延迟文本查询?
es-Hadoop与普通ES的安装方式不同吗?
请澄清一下.
谢谢.
我设计了一个简单的工作来从MySQL读取数据并将其保存在带有Spark的Elasticsearch中.
这是代码:
JavaSparkContext sc = new JavaSparkContext(
new SparkConf().setAppName("MySQLtoEs")
.set("es.index.auto.create", "true")
.set("es.nodes", "127.0.0.1:9200")
.set("es.mapping.id", "id")
.set("spark.serializer", KryoSerializer.class.getName()));
SQLContext sqlContext = new SQLContext(sc);
// Data source options
Map<String, String> options = new HashMap<>();
options.put("driver", MYSQL_DRIVER);
options.put("url", MYSQL_CONNECTION_URL);
options.put("dbtable", "OFFERS");
options.put("partitionColumn", "id");
options.put("lowerBound", "10001");
options.put("upperBound", "499999");
options.put("numPartitions", "10");
// Load MySQL query result as DataFrame
LOGGER.info("Loading DataFrame");
DataFrame jdbcDF = sqlContext.load("jdbc", options);
DataFrame df = jdbcDF.select("id", "title", "description",
"merchantId", "price", "keywords", "brandId", "categoryId");
df.show();
LOGGER.info("df.count : " + df.count());
EsSparkSQL.saveToEs(df, "offers/product");
Run Code Online (Sandbox Code Playgroud)
您可以看到代码非常简单.它将数据读入DataFrame,选择一些列,然后 …
elasticsearch apache-spark elasticsearch-hadoop apache-spark-1.5
我的技术任务是将数据从GCS(Google Cloud Storage)同步到我们的Elasticsearch集群。
我们在 Google Dataproc 集群(启用自动扩展)上使用 Apache Spark 2.4 和 Elastic Hadoop 连接器。
在执行过程中,如果 Dataproc 集群缩小规模,停用节点上的所有任务都会丢失,并且该节点上处理的数据永远不会推送到弹性。
例如,当我保存到 GCS 或 HDFS 时,就不存在此问题。
即使节点退役,如何使这项任务具有弹性?
堆栈跟踪的摘录:
阶段 2.3 中丢失任务 50.0 (TID 427, xxxxxxx-sw-vrb7.c.xxxxxxx, 执行器 43): FetchFailed(BlockManagerId(30, xxxxxxx-w-23.c.xxxxxxx, 7337, None), shuffleId=0, mapId =26,reduceId=170,message=org.apache.spark.shuffle.FetchFailedException:无法连接到 xxxxxxx-w-23.c.xxxxxxx:7337
引起原因:java.net.UnknownHostException:xxxxxxx-w-23.c.xxxxxxx
阶段 2.3 中的任务 50.0 (TID 427) 失败,但该任务不会重新执行(要么是因为该任务因 shuffle 数据获取失败而失败,所以需要重新运行前一个阶段,要么是因为不同的副本任务已经成功)。
谢谢。弗雷德
elasticsearch apache-spark elasticsearch-hadoop google-cloud-dataproc
在es集群中,数据规模较大,我们使用spark来计算数据,但是采用的elasticsearch-hadoop是https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html
我们必须读取索引的完整列。有什么帮助吗?
我正在尝试从 Spark 在 Elasticsearch 中编写对象集合。我必须满足两个要求:
_id应提供Elasticsearch文档这是我到目前为止所尝试的。
saveJsonToEs()我尝试saveJsonToEs()像这样使用(序列化文档包含_id具有所需 Elasticsearch ID 的字段):
val rdd: RDD[String] = job.map{ r => r.toJson() }
val cfg = Map(
("es.resource", "myindex/mytype"),
("es.mapping.id", "_id"),
("es.mapping.exclude", "_id")
)
EsSpark.saveJsonToEs(rdd, cfg)
Run Code Online (Sandbox Code Playgroud)
但elasticsearch-hadoop图书馆给出了这个例外:
Caused by: org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: When writing data as JSON, the field exclusion feature is ignored. This is most likely not what the user intended. Bailing out...
at org.elasticsearch.hadoop.util.Assert.isTrue(Assert.java:60)
at org.elasticsearch.hadoop.rest.InitializationUtils.validateSettings(InitializationUtils.java:253)
Run Code Online (Sandbox Code Playgroud)
如果我删除es.mapping.exclude但保留es.mapping.id并发送带有内部的 JSON …
apache-spark ×11
elasticsearch ×11
scala ×4
hadoop ×2
collect ×1
docker ×1
hbase ×1
hdfs ×1
kubernetes ×1
microsoft-distributed-file-system ×1
nested ×1
pyspark ×1
pyspark-sql ×1
rdd ×1