试图在Spark DataFrame上使用map

Let*_*zee 6 java java-8 apache-spark apache-spark-sql spark-dataframe

我最近开始尝试使用Spark和Java.我最初WordCount使用着名的例子RDD,一切都按预期进行.现在我试图实现我自己的例子,但使用DataFrames而不是RDD.

所以我正在从一个文件中读取数据集

DataFrame df = sqlContext.read()
        .format("com.databricks.spark.csv")
        .option("inferSchema", "true")
        .option("delimiter", ";")
        .option("header", "true")
        .load(inputFilePath);
Run Code Online (Sandbox Code Playgroud)

然后我尝试选择一个特定的列并对每一行应用一个简单的转换

df = df.select("start")
        .map(text -> text + "asd");
Run Code Online (Sandbox Code Playgroud)

但是编译发现了第二行的问题,我并不完全理解(起始列是按类型推断的string).

在scala.Function1接口中找到多个非重写抽象方法

为什么我的lambda函数被视为Scala函数,错误消息实际上是什么意思?

小智 9

如果select在数据帧上使用该函数,则会返回数据帧.然后在Row数据类型上应用函数而不是行的值.之后你应该先获得值,这样你就应该做到以下几点:

df.select("start").map(el->el.getString(0)+"asd")

但是你会得到一个RDD作为返回值而不是DF

  • 您还需要在映射之前使用 .javaRDD()。另外据我所知,没有明确的方法可以将函数应用于 DataFrame 的行并取回 DataFrame 对吗? (3认同)

Dee*_*Dee 5

我使用 concat 来实现这一点

df.withColumn( concat(col('start'), lit('asd'))
Run Code Online (Sandbox Code Playgroud)

当您映射相同的文本两次时,我不确定您是否也想替换字符串的第一部分?但如果你是,我会这样做:

df.withColumn('start', concat(
                      when(col('start') == 'text', lit('new'))
                      .otherwise(col('start))
                     , lit('asd')
                     )

Run Code Online (Sandbox Code Playgroud)

该解决方案在使用大数据时可以扩展,因为它连接两列而不是迭代值。