标签: elasticsearch-hadoop

ElasticSearch to Spark RDD

我正在使用elasticsearch中加载的一些测试数据在本地计算机上测试ElasticSearch和Spark集成.

val sparkConf = new SparkConf().setAppName("Test").setMaster("local")
val sc = new SparkContext(sparkConf)
val conf = new JobConf()
conf.set("spark.serializer", classOf[KryoSerializer].getName)
conf.set("es.nodes", "localhost:9200")
conf.set("es.resource", "bank/account")
conf.set("es.query", "?q=firstname:Daniel")

val esRDD = sc.hadoopRDD(conf,classOf[EsInputFormat[Text, MapWritable]],
      classOf[Text], classOf[MapWritable])
esRDD.first()
esRDD.collect()
Run Code Online (Sandbox Code Playgroud)

代码运行正常并使用esRDD.first()成功返回正确的结果

但是,esRDD.collect()将生成异常:

java.io.NotSerializableException: org.apache.hadoop.io.Text
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:71)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:193)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)

我相信这与这里提到的问题有关http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html 所以我相应地添加了这一行

conf.set("spark.serializer", classOf[KryoSerializer].getName)
Run Code Online (Sandbox Code Playgroud)

我应该做些什么来让它发挥作用吗?谢谢


更新:序列化设置问题已解决.通过使用

sparkConf.set("spark.serializer", classOf[KryoSerializer].getName)
Run Code Online (Sandbox Code Playgroud)

代替

conf.set("spark.serializer", classOf[KryoSerializer].getName)
Run Code Online (Sandbox Code Playgroud)

现在还有另一个此数据集中有1000条不同的记录

esRDD.count()
Run Code Online (Sandbox Code Playgroud)

但是,返回1000没问题 …

serialization elasticsearch apache-spark elasticsearch-hadoop

10
推荐指数
1
解决办法
3645
查看次数

如何使用spark和elasticsearch-hadoop从/向不同的ElasticSearch集群读写?

原标题:除了HDFS之外,还有什么其他DFS能够引发支持(并且被推荐)?

我很高兴地使用spark和elasticsearch(带有elasticsearch-hadoop驱动程序)和几个巨大的集群.

我不时会将整个数据集拉出来,处理每个文档,并将所有数据放入不同的Elasticsearch(ES)集群中(是的,数据迁移也是如此).

目前,无法将集群中的ES数据读入RDD,并使用spark + elasticsearch-hadoop将SparkContextRDD 写入不同的RDD ,因为这将涉及从RDD 交换.所以我想将RDD写入目标文件,然后再将它们读回到具有不同SparkContexts 的RDD中.

但是,问题出现了:我需要一个DFS(分布式文件系统)来共享整个spark集群中的大文件.最流行的解决方案是HDFS,但我会非常避免将Hadoop引入我的堆栈.是否还有其他推荐的DFS可以支持火花?

在下面更新

感谢@Daniel Darabos在下面的回答,我现在可以使用以下Scala代码从/向不同的ElasticSearch集群读取和写入数据:

val conf = new SparkConf().setAppName("Spark Migrating ES Data")
conf.set("es.nodes", "from.escluster.com")

val sc = new SparkContext(conf)

val allDataRDD = sc.esRDD("some/lovelydata")

val cfg = Map("es.nodes" -> "to.escluster.com")
allDataRDD.saveToEsWithMeta("clone/lovelydata", cfg)
Run Code Online (Sandbox Code Playgroud)

microsoft-distributed-file-system hdfs elasticsearch apache-spark elasticsearch-hadoop

6
推荐指数
1
解决办法
1907
查看次数

无法通过Elasticsearch-hadoop库在多重火花节点上的RDD上应用映射

import org.elasticsearch.spark._
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.serializer._;
import com.esotericsoftware.kryo.Kryo;
import org.elasticsearch.spark.rdd.EsSpark 

sc.stop()

val conf = new SparkConf()
conf.set("es.index.auto.create","true")
conf.set("spark.serializer", classOf[KryoSerializer].getName)

conf.set("es.nodes","localhost")
val sc = new SparkContext(conf)

val getAllQuery = "{\"query\":{\"match_all\":{}}}"
val esRDDAll = sc.esRDD("test-index/typeA", getAllQuery)

//WORKS
esRDDAll.count

//WORKS
EsSpark.saveToEs(esRDDAll, "output-index/typeB")

val esRDDMap = esRDDAll.map(r => r)

//FAILS
esRDDMap.count

//FAILS
EsSpark.saveToEs(esRDDMap, "output-index/typeB")
Run Code Online (Sandbox Code Playgroud)

我得到的错误是:

WARN TaskSetManager: Lost task 0.0 in stage 4.0 (TID 41, localhost): java.lang.ClassNotFoundException: $line594.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:348)
        at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)
        at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613) …
Run Code Online (Sandbox Code Playgroud)

scala elasticsearch apache-spark rdd elasticsearch-hadoop

5
推荐指数
1
解决办法
321
查看次数

Elasticsearch-Hadoop库无法连接到docker容器

我有从Cassandra读取的spark作业,处理/转换/过滤数据,并将结果写入Elasticsearch.我使用docker进行集成测试,而且我遇到了从spark写入Elasticsearch的麻烦.

依赖关系:

"joda-time"              % "joda-time"          % "2.9.4",
"javax.servlet"          %  "javax.servlet-api" % "3.1.0",
"org.elasticsearch"      %  "elasticsearch"     % "2.3.2",
"org.scalatest"          %% "scalatest"         % "2.2.1",
"com.github.nscala-time" %% "nscala-time"       % "2.10.0",
"cascading"              %   "cascading-hadoop" % "2.6.3",
"cascading"              %   "cascading-local"  % "2.6.3",
"com.datastax.spark"     %% "spark-cassandra-connector" % "1.4.2",
"com.datastax.cassandra" % "cassandra-driver-core" % "2.1.5",
"org.elasticsearch"      %  "elasticsearch-hadoop"      % "2.3.2" excludeAll(ExclusionRule("org.apache.storm")),
"org.apache.spark"       %% "spark-catalyst"            % "1.4.0" % "provided"
Run Code Online (Sandbox Code Playgroud)

在我的单元测试中,我可以使用TransportClient连接到elasticsearch来设置我的模板和索引

又名.这有效

val conf = new SparkConf().setAppName("test_reindex").setMaster("local")
  .set("spark.cassandra.input.split.size_in_mb", "67108864")
  .set("spark.cassandra.connection.host", cassandraHostString)
  .set("es.nodes", elasticsearchHostString)
  .set("es.port", "9200")
  .set("http.publish_host", "")
sc = new …
Run Code Online (Sandbox Code Playgroud)

scala elasticsearch docker apache-spark elasticsearch-hadoop

5
推荐指数
1
解决办法
576
查看次数

在Kubernetes上部署Elasticsearch for Apache Spark

我想知道是否有人有使用Elasticsearch for Hadoop库配置Kubernetes集群的经验。尝试从Spark写入Elasticsearch时,我遇到了节点发现超时的问题。由于ES 的elasticsearch-cloud-kubernetes插件可以处理发现,因此我可以启动Elasticsearch并运行,但是我不确定如何最好地配置Elasticsearch-hadoop来了解kubernetes集群中的节点(荚)。我尝试设置spark.es.nodes为es客户端服务,但这似乎不起作用。我也知道我可以启用es.nodes.wan.only,但是正如文档中所述,这将严重影响性能,这使它们无法在同一群集上运行的目的实现了。任何帮助,将不胜感激。

hadoop elasticsearch apache-spark kubernetes elasticsearch-hadoop

5
推荐指数
1
解决办法
210
查看次数

Pypsark-使用collect_list时保留空值

按照接受的答案pyspark collect_set或GROUPBY collect_list,当你做一个collect_list特定列,在null此列值将被删除。我已经检查过了,这是真的。

但就我而言,我需要保留null列-如何实现此目的?

我没有找到有关此类collect_list功能变体的任何信息。


解释我为什么要空值的背景上下文:

我有一个数据框df如下:

cId   |  eId  |  amount  |  city
1     |  2    |   20.0   |  Paris
1     |  2    |   30.0   |  Seoul
1     |  3    |   10.0   |  Phoenix
1     |  3    |   5.0    |  null
Run Code Online (Sandbox Code Playgroud)

我想使用以下映射将其写入Elasticsearch索引:

"mappings": {
    "doc": {
        "properties": {
            "eId": { "type": "keyword" },
            "cId": { "type": "keyword" },
            "transactions": {
                "type": "nested", 
                "properties": {
                    "amount": { "type": …
Run Code Online (Sandbox Code Playgroud)

nested collect elasticsearch-mapping elasticsearch-hadoop pyspark-sql

5
推荐指数
1
解决办法
1740
查看次数

查询中忽略 elasticsearch-spark 连接器大小限制参数

我正在尝试elasticsearch使用elasticsearch-spark连接器进行查询,但我只想返回几个结果:

例如:

val conf = new SparkConf().set("es.nodes","localhost").set("es.index.auto.create", "true").setMaster("local")
val sparkContext = new SparkContext(conf)
val query = "{\"size\":1}"
println(sparkContext.esRDD("index_name/type", query).count())
Run Code Online (Sandbox Code Playgroud)

但是,这将返回索引中的所有文档。

scala elasticsearch apache-spark elasticsearch-hadoop

5
推荐指数
1
解决办法
1224
查看次数

Python Spark Dataframe 到 Elasticsearch

我不知道如何使用 Spark 中的 python 将数据帧写入 Elasticsearch。我从这里开始遵循步骤。

这是我的代码:

# Read file
df = sqlContext.read \
    .format('com.databricks.spark.csv') \
    .options(header='true') \
    .load('/vagrant/data/input/input.csv', schema = customSchema)

df.registerTempTable("data")

# KPIs
kpi1 = sqlContext.sql("SELECT * FROM data")

es_conf = {"es.nodes" : "10.10.10.10","es.port" : "9200","es.resource" : "kpi"}
kpi1.rdd.saveAsNewAPIHadoopFile(
    path='-',
    outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
    keyClass="org.apache.hadoop.io.NullWritable",
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
    conf=es_conf)
Run Code Online (Sandbox Code Playgroud)

上面的代码给出了

原因:net.razorvine.pickle.PickleException:构造 ClassDict 时预期参数为零(对于 pyspark.sql.types._create_row)

我还从以下位置启动了脚本: spark-submit --master spark://aggregator:7077 --jars ../jars/elasticsearch-hadoop-2.4.0/dist/elasticsearch-hadoop-2.4.0.jar /vagrant/scripts/aggregation.py以确保elasticsearch-hadoop已加载

elasticsearch apache-spark pyspark elasticsearch-hadoop

5
推荐指数
1
解决办法
3664
查看次数

什么是ElasticSearch-Hadoop(es-hadoop)及其对HBase的实时Web应用程序的好处?

我不完全清楚es-hadoop是什么来自描述.

这仅仅是一个"连接器",它将数据从ES群集移动到HDFS以进行Hadoop分析吗?如果是这样,为什么不与HBase一起进行低延迟文本查询?

es-Hadoop与普通ES的安装方式不同吗?

请澄清一下.

谢谢.

hadoop hbase elasticsearch elasticsearch-hadoop

4
推荐指数
1
解决办法
4215
查看次数

将Spark Dataframe保存到Elasticsearch中 - 无法处理类型异常

我设计了一个简单的工作来从MySQL读取数据并将其保存在带有Spark的Elasticsearch中.

这是代码:

JavaSparkContext sc = new JavaSparkContext(
        new SparkConf().setAppName("MySQLtoEs")
                .set("es.index.auto.create", "true")
                .set("es.nodes", "127.0.0.1:9200")
                .set("es.mapping.id", "id")
                .set("spark.serializer", KryoSerializer.class.getName()));

SQLContext sqlContext = new SQLContext(sc);

// Data source options
Map<String, String> options = new HashMap<>();
options.put("driver", MYSQL_DRIVER);
options.put("url", MYSQL_CONNECTION_URL);
options.put("dbtable", "OFFERS");
options.put("partitionColumn", "id");
options.put("lowerBound", "10001");
options.put("upperBound", "499999");
options.put("numPartitions", "10");

// Load MySQL query result as DataFrame
LOGGER.info("Loading DataFrame");
DataFrame jdbcDF = sqlContext.load("jdbc", options);
DataFrame df = jdbcDF.select("id", "title", "description",
        "merchantId", "price", "keywords", "brandId", "categoryId");
df.show();
LOGGER.info("df.count : " + df.count());
EsSparkSQL.saveToEs(df, "offers/product");
Run Code Online (Sandbox Code Playgroud)

您可以看到代码非常简单.它将数据读入DataFrame,选择一些列,然后 …

elasticsearch apache-spark elasticsearch-hadoop apache-spark-1.5

4
推荐指数
1
解决办法
7693
查看次数

Spark 2.4 到 Elasticsearch:防止 Dataproc 节点停用期间数据丢失?

我的技术任务是将数据从GCS(Google Cloud Storage)同步到我们的Elasticsearch集群。

我们在 Google Dataproc 集群(启用自动扩展)上使用 Apache Spark 2.4 和 Elastic Hadoop 连接器。

在执行过程中,如果 Dataproc 集群缩小规模,停用节点上的所有任务都会丢失,并且该节点上处理的数据永远不会推送到弹性。

例如,当我保存到 GCS 或 HDFS 时,就不存在此问题。

即使节点退役,如何使这项任务具有弹性?

堆栈跟踪的摘录:

阶段 2.3 中丢失任务 50.0 (TID 427, xxxxxxx-sw-vrb7.c.xxxxxxx, 执行器 43): FetchFailed(BlockManagerId(30, xxxxxxx-w-23.c.xxxxxxx, 7337, None), shuffleId=0, mapId =26,reduceId=170,message=org.apache.spark.shuffle.FetchFailedException:无法连接到 xxxxxxx-w-23.c.xxxxxxx:7337

引起原因:java.net.UnknownHostException:xxxxxxx-w-23.c.xxxxxxx

阶段 2.3 中的任务 50.0 (TID 427) 失败,但该任务不会重新执行(要么是因为该任务因 shuffle 数据获取失败而失败,所以需要重新运行前一个阶段,要么是因为不同的副本任务已经成功)。

谢谢。弗雷德

elasticsearch apache-spark elasticsearch-hadoop google-cloud-dataproc

3
推荐指数
1
解决办法
1432
查看次数

如何通过spark读取elasticsearch的几列内容?

在es集群中,数据规模较大,我们使用spark来计算数据,但是采用的elasticsearch-hadoophttps://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html

我们必须读取索引的完整列。有什么帮助吗?

apache-spark elasticsearch-hadoop

1
推荐指数
1
解决办法
2584
查看次数

Elasticsearch + Spark:使用自定义文档_id编写json

我正在尝试从 Spark 在 Elasticsearch 中编写对象集合。我必须满足两个要求:

  1. 文档已以 JSON 格式序列化,应按原样编写
  2. _id应提供Elasticsearch文档

这是我到目前为止所尝试的。

saveJsonToEs()

我尝试saveJsonToEs()像这样使用(序列化文档包含_id具有所需 Elasticsearch ID 的字段):

val rdd: RDD[String] = job.map{ r => r.toJson() }

val cfg = Map(
  ("es.resource", "myindex/mytype"),
  ("es.mapping.id", "_id"),
  ("es.mapping.exclude", "_id")
)

EsSpark.saveJsonToEs(rdd, cfg)
Run Code Online (Sandbox Code Playgroud)

elasticsearch-hadoop图书馆给出了这个例外:

Caused by: org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: When writing data as JSON, the field exclusion feature is ignored. This is most likely not what the user intended. Bailing out...
    at org.elasticsearch.hadoop.util.Assert.isTrue(Assert.java:60)
    at org.elasticsearch.hadoop.rest.InitializationUtils.validateSettings(InitializationUtils.java:253)
Run Code Online (Sandbox Code Playgroud)

如果我删除es.mapping.exclude但保留es.mapping.id并发送带有内部的 JSON …

scala elasticsearch apache-spark elasticsearch-hadoop

1
推荐指数
1
解决办法
7289
查看次数