标签: apache-spark-sql

使用 Apache Spark SQL 查找间隔最长且无事故的设施

我有下一个数据集:

|facility|date      |accidents|
| foo    |2019-01-01|1        |
| foo    |2019-01-02|null     |
| foo    |2019-01-03|null     |
| foo    |2019-01-04|2        |
| bar    |2019-01-01|1        |
| bar    |2019-01-02|null     |
| bar    |2019-01-03|3        |
Run Code Online (Sandbox Code Playgroud)

目标是找到一个无事故连续时间最长的设施:

|facility|startDate |interval|
|foo     |2019-01-02|2       |
Run Code Online (Sandbox Code Playgroud)

是否可以使用 Spark SQL 做到这一点?谢谢

PS代码示例:

case class FacilityRecord(name: String, date: java.sql.Date, accidents: Option[Int])
case class IntervalWithoutAccidents(name: String, startDate: java.sql.Date, interval: Int)

implicit val spark: SparkSession = SparkSession.builder
      .appName("Test")
      .master("local")
      .getOrCreate()

import spark.implicits._

val facilityRecords = Seq(
  FacilityRecord("foo", Date.valueOf("2019-01-01"), Some(1)),
  FacilityRecord("foo", Date.valueOf("2019-01-02"), None),
  FacilityRecord("foo", …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql

1
推荐指数
1
解决办法
112
查看次数

如何在 spark scala 中读取多个镶木地板文件

下面是一些文件夹,它们可能会随着时间不断更新。他们有多个 .parquet 文件。如何在 Scala 的 Spark 数据框中读取它们?

  • “id=200393/日期=2019-03-25”
  • “id=200393/日期=2019-03-26”
  • “id=200393/日期=2019-03-27”
  • “id=200393/日期=2019-03-28”
  • “id=200393/date=2019-03-29”等等...

注意:- 可能有 100 个日期文件夹,我只需要选择特定的(比如 25,26 和 28)

有没有比下面更好的方法?

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.sql._

val spark = SparkSession.builder.appName("ScalaCodeTest").master("yarn").getOrCreate()
val parquetFiles = List("id=200393/date=2019-03-25", "id=200393/date=2019-03-26", "id=200393/date=2019-03-28")

spark.read.format("parquet").load(parquetFiles: _*)
Run Code Online (Sandbox Code Playgroud)

上面的代码正在运行,但我想做类似下面的事情-

val parquetFiles = List()
parquetFiles(0) = "id=200393/date=2019-03-25"
parquetFiles(1) = "id=200393/date=2019-03-26"
parquetFiles(2) = "id=200393/date=2019-03-28"
spark.read.format("parquet").load(parquetFiles: _*)
Run Code Online (Sandbox Code Playgroud)

scala list apache-spark parquet apache-spark-sql

1
推荐指数
1
解决办法
4923
查看次数

为什么 Pyspark 作业在没有任何特定错误的情况下在过程中消失

专家们,我注意到生产中的 Pyspark 作业之一(在 YARN 集群模式下运行)有一件奇怪的事情。执行大约一个小时 +(大约 65-75 分钟)后,它就会消失,而不会抛出任何特定的错误消息。我们已经分析了大约 2 周的 YARN 日志,其中没有特别的错误,它只是在执行 ETL 操作(读/写 hive 表、执行简单映射、修剪、lambda 操作等)时在中间死亡,没有任何错误要指出的特定代码段。有时重新运行可以修复它,有时需要多次重新运行。代码已优化, spark-submit --conf 具有所有正确优化的选项。正如我们之前提到的,它对于大约 30 种其他具有非常好的性能统计数据的应用程序来说绝对是完美的。这些是我们所有的选择——

spark-submit --conf spark.yarn.maxAppAttempts=1 --conf spark.sql.broadcastTimeout=36000 --conf spark.dynamicAllocation.executorIdleTimeout=1800 --conf spark.dynamicAllocation.minExecutors=8 --conf spark.dynamicAllocation.initialExecutors=8 --conf spark.dynamicAllocation.maxExecutors=32 --conf spark.yarn.executor.memoryOverhead=4096 --conf spark.kryoserializer.buffer.max=512m --driver-memory 2G --executor-memory 8G --executor-cores 2 --deploy-mode cluster --master yarn
Run Code Online (Sandbox Code Playgroud)

我们想检查是否需要更改某些驱动器配置来解决此问题?或者在 Spark Cluster 模式下有一些可以增加的自动超时?我们在 Python 2.7 中使用 Spark 1.6

错误看起来像(有几条消息说 -

ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL 15: SIGTERM
Run Code Online (Sandbox Code Playgroud)

但是当它遇到驱动程序错误时它会失败(最终发生)-

ERROR executor.CoarseGrainedExecutorBackend: Driver XX.XXX.XXX.XXX:XXXXX disassociated! Shutting down
Run Code Online (Sandbox Code Playgroud)

这是日志-

19/10/24 16:17:03 INFO compress.CodecPool: Got …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql pyspark

1
推荐指数
1
解决办法
647
查看次数

如何在 Spark Java 中创建复杂的 StructType Schema

如何在 Spark Java 中使用 StructType 为以下数据定义数据类型?

sam|mars|1234567|"report": {"Details": [{"subject": "science","grade": "A","remark": "good"},{"subject": "maths","grade": "E","remark": "excellent"},{"subject": "geography","grade": "E","remark": "excellent"}]}
harry|venus|987654|"report": {"Details": [{"subject": "science","grade": "O","remark": "outstanding"},{"subject": "history","grade": "A","remark": "good"}]}
Run Code Online (Sandbox Code Playgroud)

这些字段是:姓名、地址、ID、REPORTCARD

我有以下代码:

        JavaRDD<Row> row = javaRDD.map(new Function<String, Row>(){
            @Override
            public Row call(String line) throws Exception {
                return RowFactory.create((line.split("|")));
            }
        });
Run Code Online (Sandbox Code Playgroud)
    where, 
    javaRDD is created on top of the above input data.
Run Code Online (Sandbox Code Playgroud)

现在我需要使用以下行将 javaRDD 转换为 Dataframe(Dataset df):

            Dataset<Row> df = spark.createDataFrame(row, <STRUCT TYPE SCHEMA>);
Run Code Online (Sandbox Code Playgroud)

我需要为此创建一个 StructType 架构。如何在 Spark Java 中定义它。

我创建了以下 StructType 架构:

            List<StructField> …
Run Code Online (Sandbox Code Playgroud)

java apache-spark rdd apache-spark-sql

1
推荐指数
1
解决办法
5452
查看次数

pyspark:如何按年/月/日/小时子目录编写数据帧分区?

我有如下制表符分隔的数据(csv 文件):

201911240130 a
201911250132 b
201911250143 c
201911250223 z
201911250224 d
...
Run Code Online (Sandbox Code Playgroud)

我想按年、月、日、小时编写目录组。

hdfs://dest/2019/11/24/01/xxxx.csv
hdfs://dest/2019/11/25/01/xxxx.csv
hdfs://dest/2019/11/25/02/xxxx.csv
Run Code Online (Sandbox Code Playgroud)

如何按 yyyy/mm/dd/hh 写入分区?

apache-spark apache-spark-sql pyspark pyspark-sql

1
推荐指数
1
解决办法
3031
查看次数

仅展平 Scala Spark 数据帧中的最深级别

我有一个 Spark 作业,它有一个具有以下值的 DataFrame:

{
  "id": "abchchd",
  "test_id": "ndsbsb",
  "props": {
    "type": {
      "isMale": true,
      "id": "dd",
      "mcc": 1234,
      "name": "Adam"
    }
  }
}

{
  "id": "abc",
  "test_id": "asf",
  "props": {
    "type2": {
      "isMale": true,
      "id": "dd",
      "mcc": 12134,
      "name": "Perth"
    }
  }
}

Run Code Online (Sandbox Code Playgroud)

我想优雅地将它展平(因为没有未知的键和类型等),这样道具仍然是一个,struct但里面的所有东西都被展平了(不管嵌套的级别如何)

所需的输出是:

{
  "id": "abchchd",
  "test_id": "ndsbsb",
  "props": {
    "type.isMale": true,
    "type.id": "dd",
    "type.mcc": 1234,
    "type.name": "Adam"
  }
}

{
  "id": "abc",
  "test_id": "asf",
  "props": {
      "type2.isMale": true,
      "type2.id": "dd",
      "type2.mcc": 12134,
      "type2.name": "Perth" …
Run Code Online (Sandbox Code Playgroud)

json scala flatten apache-spark apache-spark-sql

1
推荐指数
1
解决办法
333
查看次数

如何计算多个浮点列的累积总和?

我在数据框中有 100 个按日期排序的浮动列。

ID   Date         C1       C2 ....... C100
1     02/06/2019   32.09  45.06         99
1     02/04/2019   32.09  45.06         99
2     02/03/2019   32.09  45.06         99
2     05/07/2019   32.09  45.06         99
Run Code Online (Sandbox Code Playgroud)

我需要根据 ID 和日期在累积总和中获得 C1 到 C100。

目标数据框应如下所示:

ID   Date         C1       C2 ....... C100
1     02/04/2019   32.09  45.06         99
1     02/06/2019   64.18  90.12         198
2     02/03/2019   32.09  45.06         99
2     05/07/2019   64.18  90.12         198
Run Code Online (Sandbox Code Playgroud)

我想在不从 C1-C100 循环的情况下实现这一点。

一列的初始代码:

var DF1 =  DF.withColumn("CumSum_c1", sum("C1").over(
         Window.partitionBy("ID")
        .orderBy(col("date").asc)))
Run Code Online (Sandbox Code Playgroud)

我在这里发现了一个类似的问题,但他手动为两列做了这个:Spark 中的累积总和

scala apache-spark apache-spark-sql

1
推荐指数
1
解决办法
130
查看次数

SparkSQL - 以分钟为单位的两个时间戳之间的差异

我正在尝试将两个时间戳之间的分钟差异转换为MM/dd/yyyy hh:mm:ss AM/PM. 我刚开始使用 SparkSQL 并尝试使用datediff其他 SQL 语法支持的基本函数 Ie datediff(minute,start_time,end_time),但这产生了错误:

org.apache.spark.sql.AnalysisException: cannot resolve '`minute`' given input columns: [taxisub.tpep_dropoff_datetime, taxisub.DOLocationID, taxisub.improvement_surcharge, taxisub.VendorID, taxisub.trip_distance, taxisub.tip_amount, taxisub.tolls_amount, taxisub.payment_type, taxisub.fare_amount, taxisub.tpep_pickup_datetime, taxisub.total_amount, taxisub.store_and_fwd_flag, taxisub.extra, taxisub.passenger_count, taxisub.PULocationID, taxisub.mta_tax, taxisub.RatecodeID]; line 1 pos 153;
Run Code Online (Sandbox Code Playgroud)

似乎minutesparkSQL 的 datediff 不支持该参数。我目前的查询是:

spark.sqlContext.sql("Select to_timestamp(tpep_pickup_datetime,'MM/dd/yyyy hh:mm:ss') as pickup,to_timestamp(tpep_dropoff_datetime,'MM/dd/yyyy hh:mm:ss') as dropoff, datediff(to_timestamp(tpep_pickup_datetime,'MM/dd/yyyy hh:mm:ss'),to_timestamp(tpep_dropoff_datetime,'MM/dd/yyyy hh:mm:ss')) as diff from taxisub ").show()
Run Code Online (Sandbox Code Playgroud)

我的结果是:

+-------------------+-------------------+----+
|             pickup|            dropoff|diff|
+-------------------+-------------------+----+
|2018-12-15 08:53:20|2018-12-15 08:57:57|   0|
|2018-12-15 08:03:08|2018-12-15 08:07:30|   0|
|2018-12-15 …
Run Code Online (Sandbox Code Playgroud)

sql apache-spark apache-spark-sql

1
推荐指数
1
解决办法
9778
查看次数

使用pyspark从每行的数组中获取不同的计数

我正在使用 pyspark 数据帧从每行的数组中寻找不同的计数:输入:col1 [1,1,1] [3,4,5] [1,2,1,2]

output:
1
3
2  

I used below code but it is giving me the length of an array:
output:
3
3
4

please help me how do i achieve this using python pyspark dataframe.

slen = udf(lambda s: len(s), IntegerType())
count = Df.withColumn("Count", slen(df.col1))
count.show()

Thanks in advanced !
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql pyspark pyspark-dataframes

1
推荐指数
1
解决办法
1134
查看次数

如何将数组拆分为块并找到块的总和并将输出作为数组存储在 pyspark 中

我有一个数据框,如下所示:

+-----+------------------------+
|Index|   finalArray           |
+-----+------------------------+
|1    |[0, 2, 0, 3, 1, 4, 2, 7]|
|2    |[0, 4, 4, 3, 4, 2, 2, 5]|
+-----+------------------------+
Run Code Online (Sandbox Code Playgroud)

我想将数组分成 2 个块,然后找到每个块的总和并将结果数组存储在列 finalArray 中。它将如下所示:

+-----+---------------------+
|Index|    finalArray       |
+-----+---------------------+
|1    |[2, 3, 5, 9]         |
|2    |[4, 7, 6, 7]         |
+-----+---------------------+
Run Code Online (Sandbox Code Playgroud)

我可以通过创建 UDF 但寻找更好和优化的方法来做到这一点。如果我可以使用 withColumn 并传递 flagArray 来处理它,而不必编写 UDF,则最好。

@udf(ArrayType(DoubleType()))
def aggregate(finalArray,chunkSize):
   n = int(chunkSize)
   aggsum = []
   final = [finalArray[i * n:(i + 1) * n] for i in range((len(finalArray) + n …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql pyspark pyspark-dataframes

1
推荐指数
1
解决办法
542
查看次数