Rah*_*kar 3 java machine-learning forecasting apache-spark spark-streaming
我已经开始从Spark引擎学习火花流,并且很新的数据分析和火花.我只是想创建一个小IOT应用程序,我想在其中预测未来的数据.
我有Tiva硬件,它发送实时传感器JSON数据如下,
[{"t":1478091719000,"sensors":[{"s":"s1","d":"+253.437"},{"s":"s2","d":"+129.750"},{"s":"s3","d":"+45.500"},{"s":"s4","d":"+255.687"},{"s":"s5","d":"+290.062"},{"s":"s6","d":"+281.500"},{"s":"s7","d":"+308.250"},{"s":"s8","d":"+313.812"}]}]
Run Code Online (Sandbox Code Playgroud)
在此t中是发布数据的unix时间戳.传感器是传感器阵列,每个传感器('s')数据为'd'.
我想要做的是,使用这些数据并创建火花流的对象,然后通过spark的Mlib(机器学习)或等效库传递所有数据以预测未来的数据.
我想要了解所有技术选择是否可行
这是我用来消费来自KAFKA的消息的代码.
SparkConf conf = new SparkConf().setAppName("DattusSpark").setMaster("local[2]");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000));
// TODO: processing pipeline
Map<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", "kafkaserver_address:9092");
Set<String> topics = Collections.singleton("RAH");
JavaPairInputDStream<String, String> directKafkaStream =
KafkaUtils.createDirectStream(ssc, String.class, String.class, StringDecoder.class,
StringDecoder.class, kafkaParams, topics);
JavaDStream<String> json = directKafkaStream.map(new Function<Tuple2<String,String>, String>() {
public String call(Tuple2<String,String> message) throws Exception {
System.out.println(message._2());
return message._2();
};
});
System.out.println(" json is 0------ 0"+ json);
json.foreachRDD(rdd -> {
rdd.foreach(
record -> System.out.println(record));
});
ssc.start();
ssc.awaitTermination();
Run Code Online (Sandbox Code Playgroud)
PS:我想在Java中做到这一点,以保持线性和良好的性能.
由于您使用的是SparkSession的SPark 2.0,因此您可以阅读JSON
json.foreachRDD( rdd -> {
DataFrame df= spark.read.json(rdd)
//process json with this DF.
}
Run Code Online (Sandbox Code Playgroud)
或者您可以将rdd转换为Row的RDD,然后您可以使用createDataFrame方法.
json.foreachRDD( rdd -> {
DataFrame df= spark.createDataFrame(rdd);
//process json with this DF.
}
Run Code Online (Sandbox Code Playgroud)
嵌套JSON处理可能从DF,您可以按照此文章.
此外,一旦你将你的json转换为DF,你可以在任何火花模块中使用它(如spark sql,ML)
| 归档时间: |
|
| 查看次数: |
8325 次 |
| 最近记录: |