标签: apache-spark-sql

2 个列表和/或 1 个二维数组的 udf 声明

我想声明一个返回 2 1D 数组或 1 2D 数组的 udf(两者的示例都很好)。我知道这适用于 1D:

@udf("array<int>")
Run Code Online (Sandbox Code Playgroud)

但是,我尝试了许多变体,例如以下没有运气:

@udf("array<int>,array<int>")
@udf("array<int>","array<int>")
@udf("array<int,int>")
etc. 
Run Code Online (Sandbox Code Playgroud)

user-defined-functions apache-spark apache-spark-sql pyspark

1
推荐指数
1
解决办法
560
查看次数

检查另一个数组pyspark中存在的数组的所有元素

我有一个 df1 spark 数据框

id     transactions
1      [1, 2, 3, 5]
2      [1, 2, 3, 6]
3      [1, 2, 9, 8]
4      [1, 2, 5, 6]

root
 |-- id: int (nullable = true)
 |-- transactions: array (nullable = false)
     |-- element: int(containsNull = true)
 None
Run Code Online (Sandbox Code Playgroud)

我有一个 df2 spark 数据框

items   cost
  [1]    1.0
  [2]    1.0
 [2, 1]  2.0
 [6, 1]  2.0

root
 |-- items: array (nullable = false)
    |-- element: int (containsNull = true)
 |-- cost: int (nullable = true) …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql pyspark

1
推荐指数
1
解决办法
4304
查看次数

如何从scala数据框中的日期列值中检索月份?

鉴于:

val df = Seq((1L, "04-04-2015")).toDF("id", "date")
val df2 = df.withColumn("month", from_unixtime(unix_timestamp($"date", "dd/MM/yy"), "MMMMM"))
df2.show()
Run Code Online (Sandbox Code Playgroud)

我得到了这个输出:

+---+----------+-----+
| id|      date|month|
+---+----------+-----+
|  1|04-04-2015| null|
+---+----------+-----+
Run Code Online (Sandbox Code Playgroud)

但是,我希望输出如下:

+---+----------+-----+
| id|      date|month|
+---+----------+-----+
|  1|04-04-2015|April|
+---+----------+-----+
Run Code Online (Sandbox Code Playgroud)

我如何使用 Scala 在 sparkSQL 中做到这一点?

scala dataframe apache-spark apache-spark-sql

1
推荐指数
1
解决办法
3061
查看次数

在 PySpark 中提取特定行

我有一个这样的数据框

data = [(("ID1", "A", 1)), (("ID1", "B", 5)), (("ID2", "A", 12)), 
       (("ID3", "A", 3)), (("ID3", "B", 3)), (("ID3", "C", 5)), (("ID4", "A", 10))]
df = spark.createDataFrame(data, ["ID", "Type", "Value"])
df.show()

+---+----+-----+
| ID|Type|Value|
+---+----+-----+
|ID1|   A|    1|
|ID1|   B|    5|
|ID2|   A|   12|
|ID3|   A|    3|
|ID3|   B|    3|
|ID3|   C|    5|
|ID4|   A|   10|
+---+----+-----+
Run Code Online (Sandbox Code Playgroud)

我只想提取那些只包含一种特定类型 - “A”的行(或 ID)

因此我的预期输出将包含以下行

+---+----+-----+
| ID|Type|Value|
+---+----+-----+
|ID2|   A|    1|
|ID4|   A|   10|
+---+----+-----+
Run Code Online (Sandbox Code Playgroud)

对于每个 ID 可以包含任何类型 - A、B、C 等。我想提取那些包含一个且仅包含一个类型的 ID …

python apache-spark apache-spark-sql pyspark

1
推荐指数
2
解决办法
4190
查看次数

如何在 spark-java 项目中的信息/调试级别记录 spark 数据集 printSchema

试图将我的 spark scala 项目转换为 spark-java 项目。我有一个登录 Scala 如下

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

    class ClassName{
      val logger  = LoggerFactory.getLogger("ClassName")
      ...
      val dataframe1 = ....///read dataframe from text file.
      ...

      logger.debug("dataframe1.printSchema : \n " + dataframe1.printSchema; //this is working fine.
    }
Run Code Online (Sandbox Code Playgroud)

现在我正在尝试用 java 1.8 编写它,如下所示

public class ClassName{

    public static final Logger logger  = oggerFactory.getLogger("ClassName"); 
      ...
     Dataset<Row> dataframe1 = ....///read dataframe from text file.
     ...

     logger.debug("dataframe1.printSchema : \n " + dataframe1.printSchema()); //this is not working 

}
Run Code Online (Sandbox Code Playgroud)

我尝试了几种方法,但没有任何方法可以在调试/信息模式下记录 printSchema。

dataframe1.printSchema() // 这实际上返回 void …

java sql scala apache-spark apache-spark-sql

1
推荐指数
1
解决办法
3666
查看次数

如何将托管在 HDFS 中的配置文件传递给 Spark 应用程序?

我正在使用 Spark Structured Streaming。此外,我正在与Scala. 我想将配置文件传递给我的 spark 应用程序。此配置文件托管在HDFS. 例如;

spark_job.conf (HOCON)

spark {
  appName: "",
  master: "",
  shuffle.size: 4 
  etc..
}

kafkaSource {
  servers: "",
  topic: "",
  etc..
}

redisSink {
  host: "",
  port: 999,
  timeout: 2000,
  checkpointLocation: "hdfs location",
  etc..
}
Run Code Online (Sandbox Code Playgroud)

如何将其传递给 Spark 应用程序?如何hosted HDFS在 Spark 中读取此文件()?

configuration hadoop apache-spark apache-spark-sql spark-structured-streaming

1
推荐指数
1
解决办法
1502
查看次数

如何在 Spark 中的 AWS Glue 创建的数据帧上运行 SQL SELECT?

我在 AWS Glue 中有以下工作,它基本上从一个表中读取数据并将其提取为 S3 中的 csv 文件,但是我想在这个表上运行查询(A Select、SUM 和 GROUPBY)并希望得到该输出CSV,如何在 AWS Glue 中执行此操作?我是 Spark 的新手,所以请帮忙

args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = 
"db1", table_name = "dbo1_expdb_dbo_stg_plan", transformation_ctx = 
"datasource0")

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = 
[("plan_code", "int", "plan_code", "int"), ("plan_id", "int", "plan_id", 
"int")], transformation_ctx = "applymapping1")

datasink2 = glueContext.write_dynamic_frame.from_options(frame = 
applymapping1, connection_type = "s3", connection_options = {"path": 
"s3://bucket"}, format = "csv", transformation_ctx = …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark-sql pyspark aws-glue

1
推荐指数
1
解决办法
7354
查看次数

为什么 DataFrame.stat.approxQuantile 失败,n 个任务的序列化结果大小 (1030.8 MB) 大于 spark.driver.maxResultSize

val postsQuantiles = posts.stat.approxQuantile("_score", Array(0.25, 0.75), 0.0)因以下错误而失败。我显然可以设置spark.driver.maxResultSize克服这个错误,但我很好奇为什么这会向驱动程序收集数据?

[Stage 3:==================>                                      (7 + 15) / 22]19/06/01 20:46:30 ERROR TaskSetManager: Total size of serialized results of 18 tasks (1030.8 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
19/06/01 20:46:30 ERROR TaskSetManager: Total size of serialized results of 19 tasks (1087.7 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
19/06/01 20:46:30 ERROR TaskSetManager: Total size of serialized results of 20 tasks (1145.6 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
19/06/01 20:46:30 ERROR TaskSetManager: …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql

1
推荐指数
1
解决办法
851
查看次数

在 spark 中使用 partitionBy 和合并

我需要根据特定Partition键将数据写入 s3 ,这可以通过使用write.partitionBy. 但是,在这种情况下,我只需要在每个路径中写入一个文件。我正在使用下面的代码来做到这一点。

    orderFlow.coalesce(1).write.partitionBy("SellerYearMonthWeekKey")
      .mode(SaveMode.Overwrite)
      .format("com.databricks.spark.csv")
      .option("delimiter", ",")
      .option("header", "true")
      .save(outputS3Path + "/")
Run Code Online (Sandbox Code Playgroud)

你能帮我找到实现这一目标的最佳方法吗?在上述情况下,我收到 OutOfMemory 错误。

scala apache-spark apache-spark-sql

1
推荐指数
1
解决办法
803
查看次数

如何在 PySpark 中汇总中值和标准差?

我有以下代码:

from pyspark.sql import functions as func

cols = ("id","size")

result = df.groupby(*cols).agg({
    func.max("val1"),
    func.median("val2"),
    func.std("val2")
})
Run Code Online (Sandbox Code Playgroud)

但它在无法找到func.median("val2")的消息行中失败。同样发生在.medianfuncstd

python apache-spark apache-spark-sql pyspark

1
推荐指数
1
解决办法
3930
查看次数