标签: apache-spark-sql

火花有条件的替代,但保持归档价值

我想有条件地在spark中填充nan值(以确保我考虑了我的数据的每个角落情况,而不是简单地用任何替换值填充任何东西).

样本看起来像

case class FooBar(foo:String, bar:String)
val myDf = Seq(("a","first"),("b","second"),("c",null), ("third","fooBar"), ("someMore","null"))
         .toDF("foo","bar")
         .as[FooBar]

+--------+------+
|     foo|   bar|
+--------+------+
|       a| first|
|       b|second|
|       c|  null|
|   third|fooBar|
|someMore|  null|
+--------+------+
Run Code Online (Sandbox Code Playgroud)

不幸

    myDf
        .withColumn(
          "bar",
          when(
            (($"foo" === "c") and ($"bar" isNull)) , "someReplacement" 
          )
        ).show
Run Code Online (Sandbox Code Playgroud)

重置列中的所有常规其他值

+--------+---------------+
|     foo|            bar|
+--------+---------------+
|       a|           null|
|       b|           null|
|       c|someReplacement|
|   third|           null|
|someMore|           null|
+--------+---------------+
Run Code Online (Sandbox Code Playgroud)

myDf
    .withColumn(
      "bar",
      when(
        (($"foo" === "c") and ($"bar" isNull)) or
        (($"foo" …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql spark-dataframe

0
推荐指数
1
解决办法
314
查看次数

Spark Scala用今天的时间戳填充NA

如何替换类型为timestamp的列中的所有空值?

我希望这会更容易,但是我似乎无法正确获取类型。我认为一个解决方案是将列转换为String,在字符串中填充今天的日期,然后重新转换为timestamp,但是,还有没有更优雅的解决方案?

val today = java.time.LocalDate.now()
var todayStamp = java.sql.Timestamp.valueOf(today.atStartOfDay());
df = df.na.fill(Map("expiration" -> todayStamp))
Run Code Online (Sandbox Code Playgroud)

结果是

java.lang.IllegalArgumentException: Unsupported value type java.sql.Timestamp
Run Code Online (Sandbox Code Playgroud)

使用今天也不起作用,并且使用unix_timestamp(string).cast("timestamp") 期望列而不是字符串。我想我可以在上面提到的“丑陋”方法中使用它。

稍后编辑:忘了提及,在timestamp列上将Int或String与df.na.fill方法一起使用也会导致错误:

org.apache.spark.sql.AnalysisException: cannot resolve 'coalesce(expiration, 0)' due to data type mismatch: input to function coalesce should all be the same type, but it's [timestamp, int];
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql

0
推荐指数
1
解决办法
2830
查看次数

在DataFrame.withColumn中,如何检查列的值是否为空作为第二个参数的条件?

如果我有一个名为df的DataFrame看起来像:

+----+----+
|  a1+  a2|
+----+----+
| foo| bar|
| N/A| baz|
|null| etc|
+----+----+
Run Code Online (Sandbox Code Playgroud)

我可以像这样有选择地替换值:

val df2 = df.withColumn("a1", when($"a1" === "N/A", $"a2"))
Run Code Online (Sandbox Code Playgroud)

所以df2看起来像:

+----+----+
|  a1+  a2|
+----+----+
| foo| bar|
| baz| baz|
|null| etc|
+----+----+
Run Code Online (Sandbox Code Playgroud)

但是为什么我不能检查它是否为null,例如:

val df3 = df2.withColumn("a1", when($"a1" === null, $"a2"))
Run Code Online (Sandbox Code Playgroud)

这样我得到:

+----+----+
|  a1+  a2|
+----+----+
| foo| bar|
| baz| baz|
| etc| etc|
+----+----+
Run Code Online (Sandbox Code Playgroud)

编辑:$“ a1” .isNull似乎不起作用。可能是因为我如何构造用于测试的数据框,如下所示?

val schema = StructType(
                StructField("a1", StringType, false) ::
                StructField("a2", StringType, false) :: Nil …
Run Code Online (Sandbox Code Playgroud)

scala dataframe apache-spark apache-spark-sql spark-dataframe

0
推荐指数
1
解决办法
3450
查看次数

如何从配置单元外部表创建数据框

我们希望在Hive外部表之上创建数据框,并使用Hive模式和数据进行火花级别的计算。

我们可以从配置单元外部表中获取架构并将其用作数据框架构吗?

hadoop hive dataframe apache-spark apache-spark-sql

0
推荐指数
1
解决办法
1万
查看次数

如何计算列的乘积,然后计算所有列的总和?

表1 --Spark DataFrame表

在此输入图像描述

表1中有一个名为"productMe"的栏目; 还有其他列,如a,b,c等,其架构名称包含在架构数组T中.

我想要的是架构数组T中的列的内积(产品中每列的两列)和列productMe(表2).并总结表2的每列以得到表3.

如果您有充分的理由在一步中获得表3,则表2不是必需的.

表2 - 内部产品表

在此输入图像描述

例如,列"a·productMe"为(3*0.2,6*0.6,5*0.4)得到(0.6,3.6,2)

表3 - 总和表

在此输入图像描述

例如,列"sum(a·productMe)"是0.6 + 3.6 + 2 = 6.2.

表1是Spark的DataFrame,我该如何获得表3?

scala apache-spark apache-spark-sql

0
推荐指数
1
解决办法
1692
查看次数

将StructField添加到现有模式

我创建了一个类"属性".每个类对象都有一个名称,一个数据类型和一个布尔值(可以为空或可以为空).所有对象都保存到ListBuffer中.

我尝试从列表中创建一个模式,并将每个值传递给StructField().启动工作,但遗憾的是,没有其他项目添加到架构中.

def create_schema_from_attr_list(attr_list: ListBuffer[Attribute]): StructType = {
  // Get first list item and initiate schema
  var schema = StructType(StructField(attr_list(0).name, attr_list(0).data_type, attr_list(0).nullable) :: Nil)

  // Add remaining items
  for (i <- 1 until attr_list.length) {
    schema.add(attr_list(i).name, attr_list(i).data_type, attr_list(i).nullable)
    println("Test " + attr_list(i).name.toString())
  }        
  return schema
}
Run Code Online (Sandbox Code Playgroud)

loops scala apache-spark apache-spark-sql

0
推荐指数
1
解决办法
479
查看次数

如何使用Spark 2屏蔽列?

我有一些表需要掩盖其某些列。每个表要屏蔽的列各不相同,我正在从application.conf文件中读取这些列。

例如,对于雇员表,如下所示

+----+------+-----+---------+
| id | name | age | address |
+----+------+-----+---------+
| 1  | abcd | 21  | India   |
+----+------+-----+---------+
| 2  | qazx | 42  | Germany |
+----+------+-----+---------+
Run Code Online (Sandbox Code Playgroud)

如果我们要屏蔽名称和年龄列,那么我将按顺序获取这些列。

val mask = Seq("name", "age")
Run Code Online (Sandbox Code Playgroud)

屏蔽后的期望值为:

+----+----------------+----------------+---------+
| id | name           | age            | address |
+----+----------------+----------------+---------+
| 1  | *** Masked *** | *** Masked *** | India   |
+----+----------------+----------------+---------+
| 2  | *** Masked *** | *** Masked *** | Germany | …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql apache-spark-2.0

0
推荐指数
1
解决办法
2873
查看次数

pyspark.sql.utils.IllegalArgumentException:u'字段“功能”不存在。

我正在尝试执行随机森林分类器并使用交叉验证评估模型。我与pySpark合作。输入的CSV文件将以Spark DataFrame格式加载。但是在构建模型时我遇到了一个问题。

下面是代码。

from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics
sc = SparkContext()
sqlContext = SQLContext(sc)
trainingData =(sqlContext.read
         .format("com.databricks.spark.csv")
         .option("header", "true")
         .option("inferSchema", "true")
         .load("/PATH/CSVFile"))
numFolds = 10 
rf = RandomForestClassifier(numTrees=100, maxDepth=5, maxBins=5, labelCol="V5409",featuresCol="features",seed=42)
evaluator = MulticlassClassificationEvaluator().setLabelCol("V5409").setPredictionCol("prediction").setMetricName("accuracy")
paramGrid = ParamGridBuilder().build()

pipeline = Pipeline(stages=[rf])
paramGrid=ParamGridBuilder().build()
crossval = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=numFolds)
model = crossval.fit(trainingData)
print accuracy
Run Code Online (Sandbox Code Playgroud)

我低于错误

Traceback (most …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql pyspark spark-dataframe apache-spark-ml

0
推荐指数
1
解决办法
5985
查看次数

Spark SQL删除空格

我有一个简单的Spark程序,它读取JSON文件并发出CSV文件.在JSON数据中,值包含前导和尾随空格,当我发出CSV时,前导和尾随空格都消失了.有没有办法可以保留空间.我尝试了很多选项,如ignoreTrailingWhiteSpace,ignoreLeadingWhiteSpace,但没有运气

input.json

{"key" : "k1", "value1": "Good String", "value2": "Good String"}
{"key" : "k1", "value1": "With Spaces      ", "value2": "With Spaces      "}
{"key" : "k1", "value1": "with tab\t", "value2": "with tab\t"}
Run Code Online (Sandbox Code Playgroud)

output.csv

_corrupt_record,key,value1,value2
,k1,Good String,Good String
,k1,With Spaces,With Spaces
,k1,with tab,with tab
Run Code Online (Sandbox Code Playgroud)

expected.csv

_corrupt_record,key,value1,value2
,k1,Good String,Good String
,k1,With Spaces      ,With Spaces      
,k1,with tab\t,with tab\t
Run Code Online (Sandbox Code Playgroud)

我的代码:

public static void main(String[] args) {
    SparkSession sparkSession = SparkSession
            .builder()
            .appName(TestSpark.class.getName())
            .master("local[1]").getOrCreate();

    SparkContext context = sparkSession.sparkContext();
    context.setLogLevel("ERROR");
    SQLContext sqlCtx = sparkSession.sqlContext();
    System.out.println("Spark context established"); …
Run Code Online (Sandbox Code Playgroud)

apache-spark spark-streaming apache-spark-sql spark-dataframe apache-spark-mllib

0
推荐指数
1
解决办法
5046
查看次数

将Spark DataFrame转换为对象列表

我知道可以使用以下内容将数据帧列转换为列表:

dataFrame.select("ColumnName").rdd.map(r => r(0)).collect()
Run Code Online (Sandbox Code Playgroud)

假设我已经知道了数据帧的模式,相应地我创建了一个case类,例如:

case class Synonym(URI: String, similarity: Double, FURI: String)
Run Code Online (Sandbox Code Playgroud)

有没有一种有效的方法从数据帧的数据中获取同义词对象列表?

换句话说,我正在尝试创建一个映射器,它将数据帧的每一行转换为我的case类的一个对象,然后以一种我可以在操作结束时拥有这些对象列表的方式返回该对象.这有可能以一种有效的方式吗?

oop scala dataframe apache-spark apache-spark-sql

0
推荐指数
1
解决办法
5595
查看次数