Swa*_*rma 10 bigdata apache-spark
我有一个自定义数据源,我想将数据加载到我的Spark集群中以执行一些计算.为此我发现我可能需要RDD为我的数据源实现一个新的.
我是一个完整的Scala noob,我希望我能RDD在Java中实现它.我环顾互联网,找不到任何资源.有什么指针吗?
我的数据在S3中,并在Dynamo中编入索引.例如,如果我想加载给定时间范围的数据,我首先需要查询Dynamo以查找相应时间范围的S3文件密钥,然后在Spark中加载它们.这些文件可能并不总是具有相同的S3路径前缀,因此sc.testFile("s3://directory_path/")不起作用.
我要寻找关于如何实现一些类似于指针HadoopRDD或JdbcRDD但在Java.类似于他们在这里所做的事:DynamoDBRDD.这个从Dynamo读取数据,我的自定义RDD将查询DynamoDB以获取S3文件密钥,然后从S3加载它们.
Dan*_*rke 12
您可以在Java中扩展RDD并实现getPartitions和compute方法.
Java可以扩展Scala类,但有一些限制.
例:
package com.openmarket.danyal;
// Other imports left out
import org.apache.spark.Dependency;
import org.apache.spark.Partition;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.RDD;
import scala.collection.AbstractIterator;
import scala.collection.Iterator;
import scala.collection.mutable.ArrayBuffer;
import scala.reflect.ClassManifestFactory$;
import scala.reflect.ClassTag;
public class AlphaTest {
private static final ClassTag<String> STRING_TAG = ClassManifestFactory$.MODULE$.fromClass(String.class);
public static void main(final String[] args) {
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("Learn ABCs");
try(JavaSparkContext sc = new JavaSparkContext(conf)) {
System.out.println(new AlphabetRDD(sc.sc()).toJavaRDD().collect());
}
}
public static class AlphabetRDD extends RDD<String> {
private static final long serialVersionUID = 1L;
public AlphabetRDD(SparkContext sc) {
super(sc, new ArrayBuffer<Dependency<?>>(), STRING_TAG);
}
@Override
public Iterator<String> compute(Partition arg0, TaskContext arg1) {
AlphabetRangePartition p = (AlphabetRangePartition)arg0;
return new CharacterIterator(p.from, p.to);
}
@Override
public Partition[] getPartitions() {
return new Partition[] {new AlphabetRangePartition(1, 'A', 'M'), new AlphabetRangePartition(2, 'P', 'Z')};
}
}
/**
* A partition representing letters of the Alphabet between a range
*/
public static class AlphabetRangePartition implements Partition {
private static final long serialVersionUID = 1L;
private int index;
private char from;
private char to;
public AlphabetRangePartition(int index, char c, char d) {
this.index = index;
this.from = c;
this.to = d;
}
@Override
public int index() {
return index;
}
@Override
public boolean equals(Object obj) {
if(!(obj instanceof AlphabetRangePartition)) {
return false;
}
return ((AlphabetRangePartition)obj).index != index;
}
@Override
public int hashCode() {
return index();
}
}
/**
* Iterators over all characters between two characters
*/
public static class CharacterIterator extends AbstractIterator<String> {
private char next;
private char last;
public CharacterIterator(char from, char to) {
next = from;
this.last = to;
}
@Override
public boolean hasNext() {
return next <= last;
}
@Override
public String next() {
// Post increments next after returning it
return Character.toString(next++);
}
}
}
Run Code Online (Sandbox Code Playgroud)
一种选择是阅读 Hadoop 规范,但如果您的数据是结构化的,Spark SQL 有一个新的数据源 API以及Spark Packages上发布的一些实现,包括 avro、redshift 和 csv。
| 归档时间: |
|
| 查看次数: |
6024 次 |
| 最近记录: |