eli*_*sah 4 elasticsearch apache-spark elasticsearch-hadoop apache-spark-1.5
我设计了一个简单的工作来从MySQL读取数据并将其保存在带有Spark的Elasticsearch中.
这是代码:
JavaSparkContext sc = new JavaSparkContext(
new SparkConf().setAppName("MySQLtoEs")
.set("es.index.auto.create", "true")
.set("es.nodes", "127.0.0.1:9200")
.set("es.mapping.id", "id")
.set("spark.serializer", KryoSerializer.class.getName()));
SQLContext sqlContext = new SQLContext(sc);
// Data source options
Map<String, String> options = new HashMap<>();
options.put("driver", MYSQL_DRIVER);
options.put("url", MYSQL_CONNECTION_URL);
options.put("dbtable", "OFFERS");
options.put("partitionColumn", "id");
options.put("lowerBound", "10001");
options.put("upperBound", "499999");
options.put("numPartitions", "10");
// Load MySQL query result as DataFrame
LOGGER.info("Loading DataFrame");
DataFrame jdbcDF = sqlContext.load("jdbc", options);
DataFrame df = jdbcDF.select("id", "title", "description",
"merchantId", "price", "keywords", "brandId", "categoryId");
df.show();
LOGGER.info("df.count : " + df.count());
EsSparkSQL.saveToEs(df, "offers/product");
Run Code Online (Sandbox Code Playgroud)
您可以看到代码非常简单.它将数据读入DataFrame,选择一些列,然后count
在Dataframe上执行一个基本操作.到目前为止,一切正常.
然后它尝试将数据保存到Elasticsearch中,但它失败了,因为它无法处理某些类型.您可以在此处查看错误日志.
我不确定为什么它不能处理这种类型.有谁知道为什么会这样?
我正在使用Apache Spark 1.5.0,Elasticsearch 1.4.4和elaticsearch-hadoop 2.1.1
编辑:
eli*_*sah 13
这个问题的答案很棘手,但多亏了samklr,我已经设法解决了问题所在.
尽管如此,解决方案并不简单,可能会考虑一些"不必要的"转换.
首先让我们谈谈序列化.
在Spark序列化数据和函数序列化中要考虑序列化的两个方面.在这种情况下,它是关于数据序列化,因此是反序列化.
从Spark的角度来看,唯一需要的是设置序列化 - Spark默认依赖于Java序列化,这很方便但效率很低.这就是Hadoop自身引入自己的序列化机制及其自身类型的原因 - 即Writables
.因此,InputFormat
并且OutputFormats
需要返回Writables
哪些开箱即用,Spark不理解.
使用elasticsearch-spark连接器,必须启用不同的序列化(Kryo),它可以自动处理转换,并且非常有效地完成此操作.
conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
Run Code Online (Sandbox Code Playgroud)
即使Kryo不要求类实现特定接口进行序列化,这意味着POJO可以在RDD中使用,除了启用Kryo序列化之外没有任何进一步的工作.
也就是说,@ samklr向我指出Kryo需要在使用它们之前注册类.
这是因为Kryo写了一个对被序列化对象的类的引用(为每个写入的对象写了一个引用),如果已经注册了类,它只是一个整数标识符,否则就是完整的类名.Spark代表您注册Scala类和许多其他框架类(如Avro Generic或Thrift类).
使用Kryo注册课程非常简单.创建KryoRegistrator的子类,并覆盖该registerClasses()
方法:
public class MyKryoRegistrator implements KryoRegistrator, Serializable {
@Override
public void registerClasses(Kryo kryo) {
// Product POJO associated to a product Row from the DataFrame
kryo.register(Product.class);
}
}
Run Code Online (Sandbox Code Playgroud)
最后,在驱动程序中,将spark.kryo.registrator属性设置为KryoRegistrator实现的完全限定类名:
conf.set("spark.kryo.registrator", "MyKryoRegistrator")
Run Code Online (Sandbox Code Playgroud)
其次,甚至认为Kryo序列化程序已经设置并且类已注册,并且对Spark 1.5进行了更改,并且由于某种原因,Elasticsearch无法对Dataframe进行反序列化,因为它无法SchemaType
将Dataframe 推断到连接器中.
所以我不得不将Dataframe转换为JavaRDD
JavaRDD<Product> products = df.javaRDD().map(new Function<Row, Product>() {
public Product call(Row row) throws Exception {
long id = row.getLong(0);
String title = row.getString(1);
String description = row.getString(2);
int merchantId = row.getInt(3);
double price = row.getDecimal(4).doubleValue();
String keywords = row.getString(5);
long brandId = row.getLong(6);
int categoryId = row.getInt(7);
return new Product(id, title, description, merchantId, price, keywords, brandId, categoryId);
}
});
Run Code Online (Sandbox Code Playgroud)
现在数据已准备好写入elasticsearch:
JavaEsSpark.saveToEs(products, "test/test");
Run Code Online (Sandbox Code Playgroud)
参考文献:
归档时间: |
|
查看次数: |
7693 次 |
最近记录: |