使用Java API创建一个简单的1行Spark DataFrame

sme*_*eeb 9 java apache-spark spark-dataframe

在Scala中,我可以从内存中的字符串创建单行DataFrame,如下所示:

val stringAsList = List("buzz")
val df = sqlContext.sparkContext.parallelize(jsonValues).toDF("fizz")
df.show()
Run Code Online (Sandbox Code Playgroud)

df.show()运行时,它输出:

+-----+
| fizz|
+-----+
| buzz|
+-----+
Run Code Online (Sandbox Code Playgroud)

现在我正在尝试从Java类中执行此操作.显然JavaRDDs没有toDF(String)方法.我试过了:

List<String> stringAsList = new ArrayList<String>();
stringAsList.add("buzz");
SQLContext sqlContext = new SQLContext(sparkContext);
DataFrame df = sqlContext.createDataFrame(sparkContext
    .parallelize(stringAsList), StringType);
df.show();
Run Code Online (Sandbox Code Playgroud)

......但似乎仍然很短暂.现在df.show();执行时,我得到:

++
||
++
||
++
Run Code Online (Sandbox Code Playgroud)

(一个空的DF.)所以我问:使用Java API,如何将内存中的字符串读入一个只有1行1列的DataFrame中,并指定该列的名称?(这df.show()与上面的Scala相同)?

jgp*_*jgp 10

如果您需要升级,我已经为Spark 2创建了2个示例:

简单的Fizz/Buzz(或敌人/酒吧 - 老一代:)):

    SparkSession spark = SparkSession.builder().appName("Build a DataFrame from Scratch").master("local[*]")
            .getOrCreate();

    List<String> stringAsList = new ArrayList<>();
    stringAsList.add("bar");

    JavaSparkContext sparkContext = new JavaSparkContext(spark.sparkContext());

    JavaRDD<Row> rowRDD = sparkContext.parallelize(stringAsList).map((String row) -> RowFactory.create(row));

    // Creates schema
    StructType schema = DataTypes.createStructType(
            new StructField[] { DataTypes.createStructField("foe", DataTypes.StringType, false) });

    Dataset<Row> df = spark.sqlContext().createDataFrame(rowRDD, schema).toDF();
Run Code Online (Sandbox Code Playgroud)

2x2数据:

    SparkSession spark = SparkSession.builder().appName("Build a DataFrame from Scratch").master("local[*]")
            .getOrCreate();

    List<String[]> stringAsList = new ArrayList<>();
    stringAsList.add(new String[] { "bar1.1", "bar2.1" });
    stringAsList.add(new String[] { "bar1.2", "bar2.2" });

    JavaSparkContext sparkContext = new JavaSparkContext(spark.sparkContext());

    JavaRDD<Row> rowRDD = sparkContext.parallelize(stringAsList).map((String[] row) -> RowFactory.create(row));

    // Creates schema
    StructType schema = DataTypes
            .createStructType(new StructField[] { DataTypes.createStructField("foe1", DataTypes.StringType, false),
                    DataTypes.createStructField("foe2", DataTypes.StringType, false) });

    Dataset<Row> df = spark.sqlContext().createDataFrame(rowRDD, schema).toDF();
Run Code Online (Sandbox Code Playgroud)

代码可以从以下网址下载:https://github.com/jgperrin/net.jgp.labs.spark.

  • 你改变你的架构.它应该做的伎俩. (2认同)

cod*_*123 8

您可以通过创建List到Rdd来实现这一点,而不是创建包含列名的Schema.

可能还有其他方式,它只是其中之一.

List<String> stringAsList = new ArrayList<String>();
        stringAsList.add("buzz");

JavaRDD<Row> rowRDD = sparkContext.parallelize(stringAsList).map((String row) -> {
                return RowFactory.create(row);
            });

StructType schema = DataTypes.createStructType(new StructField[] { DataTypes.createStructField("fizz", DataTypes.StringType, false) });

DataFrame df = sqlContext.createDataFrame(rowRDD, schema).toDF();
df.show();

//+----+
|fizz|
+----+
|buzz|
Run Code Online (Sandbox Code Playgroud)