小编kum*_*mar的帖子

如何在Spark中将JavaPairInputDStream转换为DataSet/DataFrame

我正在尝试从kafka接收流数据.在此过程中,我能够接收流数据并将其存储到JavaPairInputDStream中.现在我需要分析这些数据,而不是将其存储到任何数据库中.所以我想将此JavaPairInputDStream转换为DataSetDataFrame

到目前为止我尝试的是:

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalog.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.AbstractJavaDStreamLike;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;


import kafka.serializer.StringDecoder;
import scala.Tuple2;

//Streaming Working Code

public class KafkaToSparkStreaming 
{
    public static  void main(String arr[]) throws InterruptedException
    {


        SparkConf conf = new SparkConf();
        conf.set("spark.app.name", …
Run Code Online (Sandbox Code Playgroud)

java streaming apache-kafka apache-spark spark-streaming

7
推荐指数
1
解决办法
5996
查看次数

如何在spark java中的数据集上应用map函数

我的CSV文件:

YEAR,UTILITY_ID,UTILITY_NAME,OWNERSHIP,STATE_CODE,AMR_METERING_RESIDENTIAL,AMR_METERING_COMMERCIAL,AMR_METERING_INDUSTRIAL,AMR_METERING_TRANS,AMR_METERING_TOTAL,AMI_METERING_RESIDENTIAL,AMI_METERING_COMMERCIAL,AMI_METERING_INDUSTRIAL,AMI_METERING_TRANS,AMI_METERING_TOTAL,ENERGY_SERVED_RESIDENTIAL,ENERGY_SERVED_COMMERCIAL,ENERGY_SERVED_INDUSTRIAL,ENERGY_SERVED_TRANS,ENERGY_SERVED_TOTAL
2011,34,City of Abbeville - (SC),M,SC,880,14,,,894,,,,,,,,,,
2011,84,A & N Electric Coop,C,MD,135,25,,,160,,,,,,,,,,
2011,84,A & N Electric Coop,C,VA,31893,2107,0,,34000,,,,,,,,,,
2011,97,Adams Electric Coop,C,IL,8334,190,,,8524,,,,,0,,,,,0
2011,108,Adams-Columbia Electric Coop,C,WI,33524,1788,709,,36021,,,,,,,,,,
2011,118,Adams Rural Electric Coop, Inc,C,OH,7457,20,,,7477,,,,,,,,,,
2011,122,Village of Arcade,M,NY,3560,498,100,,4158,,,,,,,,,,
2011,155,Agralite Electric Coop,C,MN,4383,227,315,,4925,,,,,,,,,,
Run Code Online (Sandbox Code Playgroud)

在这里下载Spark代码来读取CSV文件:

public class ReadFile8 {

    public static void main(String[] args) throws IOException {

        SparkSession session = new SparkSession.Builder().appName("CsvReader").master("local").getOrCreate();

        //Data taken by Local System
        Dataset<Row> file8Data = session.read().format("com.databricks.spark.csv").option("header", "true").load("file:///home/kumar/Desktop/Eletricaldata/file8_2011.csv");

        // Register the DataFrame as a SQL temporary view
        file8Data.createOrReplaceTempView("EletricalFile8Data");
        file8Data.show();
    } …
Run Code Online (Sandbox Code Playgroud)

java apache-spark apache-spark-sql

-1
推荐指数
1
解决办法
2万
查看次数