我有下一个数据集:
|facility|date |accidents|
| foo |2019-01-01|1 |
| foo |2019-01-02|null |
| foo |2019-01-03|null |
| foo |2019-01-04|2 |
| bar |2019-01-01|1 |
| bar |2019-01-02|null |
| bar |2019-01-03|3 |
Run Code Online (Sandbox Code Playgroud)
目标是找到一个无事故连续时间最长的设施:
|facility|startDate |interval|
|foo |2019-01-02|2 |
Run Code Online (Sandbox Code Playgroud)
是否可以使用 Spark SQL 做到这一点?谢谢
PS代码示例:
case class FacilityRecord(name: String, date: java.sql.Date, accidents: Option[Int])
case class IntervalWithoutAccidents(name: String, startDate: java.sql.Date, interval: Int)
implicit val spark: SparkSession = SparkSession.builder
.appName("Test")
.master("local")
.getOrCreate()
import spark.implicits._
val facilityRecords = Seq(
FacilityRecord("foo", Date.valueOf("2019-01-01"), Some(1)),
FacilityRecord("foo", Date.valueOf("2019-01-02"), None),
FacilityRecord("foo", …Run Code Online (Sandbox Code Playgroud) 注意:- 可能有 100 个日期文件夹,我只需要选择特定的(比如 25,26 和 28)
有没有比下面更好的方法?
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
val spark = SparkSession.builder.appName("ScalaCodeTest").master("yarn").getOrCreate()
val parquetFiles = List("id=200393/date=2019-03-25", "id=200393/date=2019-03-26", "id=200393/date=2019-03-28")
spark.read.format("parquet").load(parquetFiles: _*)
Run Code Online (Sandbox Code Playgroud)
上面的代码正在运行,但我想做类似下面的事情-
val parquetFiles = List()
parquetFiles(0) = "id=200393/date=2019-03-25"
parquetFiles(1) = "id=200393/date=2019-03-26"
parquetFiles(2) = "id=200393/date=2019-03-28"
spark.read.format("parquet").load(parquetFiles: _*)
Run Code Online (Sandbox Code Playgroud) 专家们,我注意到生产中的 Pyspark 作业之一(在 YARN 集群模式下运行)有一件奇怪的事情。执行大约一个小时 +(大约 65-75 分钟)后,它就会消失,而不会抛出任何特定的错误消息。我们已经分析了大约 2 周的 YARN 日志,其中没有特别的错误,它只是在执行 ETL 操作(读/写 hive 表、执行简单映射、修剪、lambda 操作等)时在中间死亡,没有任何错误要指出的特定代码段。有时重新运行可以修复它,有时需要多次重新运行。代码已优化, spark-submit --conf 具有所有正确优化的选项。正如我们之前提到的,它对于大约 30 种其他具有非常好的性能统计数据的应用程序来说绝对是完美的。这些是我们所有的选择——
spark-submit --conf spark.yarn.maxAppAttempts=1 --conf spark.sql.broadcastTimeout=36000 --conf spark.dynamicAllocation.executorIdleTimeout=1800 --conf spark.dynamicAllocation.minExecutors=8 --conf spark.dynamicAllocation.initialExecutors=8 --conf spark.dynamicAllocation.maxExecutors=32 --conf spark.yarn.executor.memoryOverhead=4096 --conf spark.kryoserializer.buffer.max=512m --driver-memory 2G --executor-memory 8G --executor-cores 2 --deploy-mode cluster --master yarn
Run Code Online (Sandbox Code Playgroud)
我们想检查是否需要更改某些驱动器配置来解决此问题?或者在 Spark Cluster 模式下有一些可以增加的自动超时?我们在 Python 2.7 中使用 Spark 1.6
错误看起来像(有几条消息说 -
ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL 15: SIGTERM
Run Code Online (Sandbox Code Playgroud)
但是当它遇到驱动程序错误时它会失败(最终发生)-
ERROR executor.CoarseGrainedExecutorBackend: Driver XX.XXX.XXX.XXX:XXXXX disassociated! Shutting down
Run Code Online (Sandbox Code Playgroud)
这是日志-
19/10/24 16:17:03 INFO compress.CodecPool: Got …Run Code Online (Sandbox Code Playgroud) 如何在 Spark Java 中使用 StructType 为以下数据定义数据类型?
sam|mars|1234567|"report": {"Details": [{"subject": "science","grade": "A","remark": "good"},{"subject": "maths","grade": "E","remark": "excellent"},{"subject": "geography","grade": "E","remark": "excellent"}]}
harry|venus|987654|"report": {"Details": [{"subject": "science","grade": "O","remark": "outstanding"},{"subject": "history","grade": "A","remark": "good"}]}
Run Code Online (Sandbox Code Playgroud)
这些字段是:姓名、地址、ID、REPORTCARD
我有以下代码:
JavaRDD<Row> row = javaRDD.map(new Function<String, Row>(){
@Override
public Row call(String line) throws Exception {
return RowFactory.create((line.split("|")));
}
});
Run Code Online (Sandbox Code Playgroud)
where,
javaRDD is created on top of the above input data.
Run Code Online (Sandbox Code Playgroud)
现在我需要使用以下行将 javaRDD 转换为 Dataframe(Dataset df):
Dataset<Row> df = spark.createDataFrame(row, <STRUCT TYPE SCHEMA>);
Run Code Online (Sandbox Code Playgroud)
我需要为此创建一个 StructType 架构。如何在 Spark Java 中定义它。
我创建了以下 StructType 架构:
List<StructField> …Run Code Online (Sandbox Code Playgroud) 我有如下制表符分隔的数据(csv 文件):
201911240130 a
201911250132 b
201911250143 c
201911250223 z
201911250224 d
...
Run Code Online (Sandbox Code Playgroud)
我想按年、月、日、小时编写目录组。
hdfs://dest/2019/11/24/01/xxxx.csv
hdfs://dest/2019/11/25/01/xxxx.csv
hdfs://dest/2019/11/25/02/xxxx.csv
Run Code Online (Sandbox Code Playgroud)
如何按 yyyy/mm/dd/hh 写入分区?
我有一个 Spark 作业,它有一个具有以下值的 DataFrame:
{
"id": "abchchd",
"test_id": "ndsbsb",
"props": {
"type": {
"isMale": true,
"id": "dd",
"mcc": 1234,
"name": "Adam"
}
}
}
{
"id": "abc",
"test_id": "asf",
"props": {
"type2": {
"isMale": true,
"id": "dd",
"mcc": 12134,
"name": "Perth"
}
}
}
Run Code Online (Sandbox Code Playgroud)
我想优雅地将它展平(因为没有未知的键和类型等),这样道具仍然是一个,struct但里面的所有东西都被展平了(不管嵌套的级别如何)
所需的输出是:
{
"id": "abchchd",
"test_id": "ndsbsb",
"props": {
"type.isMale": true,
"type.id": "dd",
"type.mcc": 1234,
"type.name": "Adam"
}
}
{
"id": "abc",
"test_id": "asf",
"props": {
"type2.isMale": true,
"type2.id": "dd",
"type2.mcc": 12134,
"type2.name": "Perth" …Run Code Online (Sandbox Code Playgroud) 我在数据框中有 100 个按日期排序的浮动列。
ID Date C1 C2 ....... C100
1 02/06/2019 32.09 45.06 99
1 02/04/2019 32.09 45.06 99
2 02/03/2019 32.09 45.06 99
2 05/07/2019 32.09 45.06 99
Run Code Online (Sandbox Code Playgroud)
我需要根据 ID 和日期在累积总和中获得 C1 到 C100。
目标数据框应如下所示:
ID Date C1 C2 ....... C100
1 02/04/2019 32.09 45.06 99
1 02/06/2019 64.18 90.12 198
2 02/03/2019 32.09 45.06 99
2 05/07/2019 64.18 90.12 198
Run Code Online (Sandbox Code Playgroud)
我想在不从 C1-C100 循环的情况下实现这一点。
一列的初始代码:
var DF1 = DF.withColumn("CumSum_c1", sum("C1").over(
Window.partitionBy("ID")
.orderBy(col("date").asc)))
Run Code Online (Sandbox Code Playgroud)
我在这里发现了一个类似的问题,但他手动为两列做了这个:Spark 中的累积总和
我正在尝试将两个时间戳之间的分钟差异转换为MM/dd/yyyy hh:mm:ss AM/PM. 我刚开始使用 SparkSQL 并尝试使用datediff其他 SQL 语法支持的基本函数 Ie datediff(minute,start_time,end_time),但这产生了错误:
org.apache.spark.sql.AnalysisException: cannot resolve '`minute`' given input columns: [taxisub.tpep_dropoff_datetime, taxisub.DOLocationID, taxisub.improvement_surcharge, taxisub.VendorID, taxisub.trip_distance, taxisub.tip_amount, taxisub.tolls_amount, taxisub.payment_type, taxisub.fare_amount, taxisub.tpep_pickup_datetime, taxisub.total_amount, taxisub.store_and_fwd_flag, taxisub.extra, taxisub.passenger_count, taxisub.PULocationID, taxisub.mta_tax, taxisub.RatecodeID]; line 1 pos 153;
Run Code Online (Sandbox Code Playgroud)
似乎minutesparkSQL 的 datediff 不支持该参数。我目前的查询是:
spark.sqlContext.sql("Select to_timestamp(tpep_pickup_datetime,'MM/dd/yyyy hh:mm:ss') as pickup,to_timestamp(tpep_dropoff_datetime,'MM/dd/yyyy hh:mm:ss') as dropoff, datediff(to_timestamp(tpep_pickup_datetime,'MM/dd/yyyy hh:mm:ss'),to_timestamp(tpep_dropoff_datetime,'MM/dd/yyyy hh:mm:ss')) as diff from taxisub ").show()
Run Code Online (Sandbox Code Playgroud)
我的结果是:
+-------------------+-------------------+----+
| pickup| dropoff|diff|
+-------------------+-------------------+----+
|2018-12-15 08:53:20|2018-12-15 08:57:57| 0|
|2018-12-15 08:03:08|2018-12-15 08:07:30| 0|
|2018-12-15 …Run Code Online (Sandbox Code Playgroud) 我正在使用 pyspark 数据帧从每行的数组中寻找不同的计数:输入:col1 [1,1,1] [3,4,5] [1,2,1,2]
output:
1
3
2
I used below code but it is giving me the length of an array:
output:
3
3
4
please help me how do i achieve this using python pyspark dataframe.
slen = udf(lambda s: len(s), IntegerType())
count = Df.withColumn("Count", slen(df.col1))
count.show()
Thanks in advanced !
Run Code Online (Sandbox Code Playgroud) 我有一个数据框,如下所示:
+-----+------------------------+
|Index| finalArray |
+-----+------------------------+
|1 |[0, 2, 0, 3, 1, 4, 2, 7]|
|2 |[0, 4, 4, 3, 4, 2, 2, 5]|
+-----+------------------------+
Run Code Online (Sandbox Code Playgroud)
我想将数组分成 2 个块,然后找到每个块的总和并将结果数组存储在列 finalArray 中。它将如下所示:
+-----+---------------------+
|Index| finalArray |
+-----+---------------------+
|1 |[2, 3, 5, 9] |
|2 |[4, 7, 6, 7] |
+-----+---------------------+
Run Code Online (Sandbox Code Playgroud)
我可以通过创建 UDF 但寻找更好和优化的方法来做到这一点。如果我可以使用 withColumn 并传递 flagArray 来处理它,而不必编写 UDF,则最好。
@udf(ArrayType(DoubleType()))
def aggregate(finalArray,chunkSize):
n = int(chunkSize)
aggsum = []
final = [finalArray[i * n:(i + 1) * n] for i in range((len(finalArray) + n …Run Code Online (Sandbox Code Playgroud) apache-spark ×10
apache-spark-sql ×10
pyspark ×4
scala ×4
flatten ×1
java ×1
json ×1
list ×1
parquet ×1
pyspark-sql ×1
rdd ×1
sql ×1