标签: apache-spark-sql

将具有模式的 Spark Dataframe 转换为 json 字符串的 DataFrame

我有一个像这样的数据框:

+--+--------+--------+----+-------------+------------------------------+
|id|name    |lastname|age |timestamp    |creditcards                   |
+--+--------+--------+----+-------------+------------------------------+
|1 |michel  |blanc   |35  |1496756626921|[[hr6,3569823], [ee3,1547869]]|
|2 |peter   |barns   |25  |1496756626551|[[ye8,4569872], [qe5,3485762]]|
+--+--------+--------+----+-------------+------------------------------+
Run Code Online (Sandbox Code Playgroud)

我的 df 的架构如下所示:

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- age: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- creditcards: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- number: …
Run Code Online (Sandbox Code Playgroud)

json scala apache-spark apache-spark-sql

0
推荐指数
1
解决办法
6044
查看次数

如何从列表创建数据框?

我想创建df看起来像这样简单的 DataFrame:

+----------+----------+
| timestamp|      col2|
+----------+----------+
|2018-01-11|       123|
+----------+----------+
Run Code Online (Sandbox Code Playgroud)

这就是我所做的:

val values = List(List("timestamp", "2018-01-11"),List("col2","123")).map(x =>(x(0), x(1)))    
val df = values.toDF    
df.show()
Run Code Online (Sandbox Code Playgroud)

这就是我得到的:

+---------+----------+
|       _1|        _2|
+---------+----------+
|timestamp|2018-01-11|
|     col2|       123|
+---------+----------+
Run Code Online (Sandbox Code Playgroud)

这是怎么回事?

scala dataframe apache-spark apache-spark-sql

0
推荐指数
1
解决办法
6783
查看次数

左连接操作永远运行

我必须应用左连接来连接我想要连接的数据帧。

df1 =

+----------+---------------+
|product_PK| rec_product_PK|
+----------+---------------+
|       560|            630|
|       710|            240|
|       610|            240|
Run Code Online (Sandbox Code Playgroud)

df2=

+----------+---------------+-----+
|product_PK| rec_product_PK| rank|
+----------+---------------+-----+
|       560|            610|    1|
|       560|            240|    1|
|       610|            240|    0|
Run Code Online (Sandbox Code Playgroud)

问题是df1只包含 500 行,而df2包含 600.000.000 行和 24 个分区。我的左连接需要一段时间才能执行。我等了5个小时还没完。

val result = df1.join(df2,Seq("product_PK","rec_product_PK"),"left")
Run Code Online (Sandbox Code Playgroud)

结果应包含 500 行。我使用以下参数从 Spark-shell 执行代码:

spark-shell -driver-memory 10G --driver-cores 4 --executor-memory 10G --num-executors 2 --executor-cores 4
Run Code Online (Sandbox Code Playgroud)

我怎样才能加快这个过程?

更新:

的输出df2.explain(true)

val result = df1.join(df2,Seq("product_PK","rec_product_PK"),"left")
Run Code Online (Sandbox Code Playgroud)

scala join apache-spark apache-spark-sql

0
推荐指数
1
解决办法
3403
查看次数

如何在spark scala中的数据帧上动态调用withColumn函数

这在 Spark-Scala 中可能吗?我使用的是火花2.2

val func="""withColumn("seq", lit("this is seq"))
           .withColumn("id", lit("this is id"))
           .withColumn("type", lit("this is type"))"""
Run Code Online (Sandbox Code Playgroud)

然后在数据框(df)之上使用上面的变量,如下所示

val df2=df.$func
Run Code Online (Sandbox Code Playgroud)

我将这些函数保存到变量的原因是我想根据条件动态应用函数。有时我可能需要 1 个 withColumn 函数,有时我可能需要多个 withColumn 函数。

感谢任何帮助。谢谢!

scala apache-spark apache-spark-sql

0
推荐指数
1
解决办法
3781
查看次数

如何使用 Scala API 从 Spark 数据框中提取工作日作为数字

我有一个日期列,它是数据帧中的字符串,采用 2017-01-01 12:15:43 时间戳格式。

现在我想使用 dataframe 而不是 Spark sql 从该列获取工作日编号(1 到 7)。

像下面这样

df.select(weekday(col("colname")))
Run Code Online (Sandbox Code Playgroud)

我在 python 和 sql 中找到了一个,但在 scala 中没有找到。有谁能帮我解决这个问题吗

在 sql 上下文中

sqlContext.sql("select date_format(to_date('2017-01-01'),'W') as week")
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql

0
推荐指数
1
解决办法
3425
查看次数

pyspark DataFrame selectExpr 不适用于多于一列

我们正在尝试 Spark DataFrameselectExpr及其对一列的工作,当我添加多于一列时,它会抛出错误。

第一个工作正常,第二个抛出错误。

代码示例:

 df1.selectExpr("coalesce(gtr_pd_am,0 )").show(2)
 df1.selectExpr("coalesce(gtr_pd_am,0),coalesce(prev_gtr_pd_am,0)").show()
Run Code Online (Sandbox Code Playgroud)

错误日志:

>>> df1.selectExpr("coalesce(gtr_pd_am,0),coalesce(prev_gtr_pd_am,0)").show()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/hdp/2.6.5.0-292/spark2/python/pyspark/sql/dataframe.py", line 1216, in selectExpr
    jdf = self._jdf.selectExpr(self._jseq(expr))
  File "/usr/hdp/2.6.5.0-292/spark2/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
  File "/usr/hdp/2.6.5.0-292/spark2/python/pyspark/sql/utils.py", line 73, in deco
    raise ParseException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.ParseException: u"\nmismatched input ',' expecting <EOF>(line 1, pos 21)\n\n== SQL ==\ncoalesce(gtr_pd_am,0),coalesce(prev_gtr_pd_am,0)\n---------------------^^^\n" 
Run Code Online (Sandbox Code Playgroud)

apache-spark-sql pyspark

0
推荐指数
1
解决办法
7565
查看次数

如何在从 HIVE 表中选择时替换换行符

我在 HIVE 中有一个 AVRO 格式的表。该表中的一列(字符串数据类型)包含带有换行符的数据,因此当我选择(使用 beeline 或 pyspark)时,我会得到多行。我确实在选择中尝试了选项 REGEXP_REPLACE(col1,"\n","") ,但它仍然返回多行。

当我复制并粘贴到文本编辑器中时,col1 的值如下所示:

NY - Enjoy holidays or Enjoy leaves.  
Silver 2000 plan
Silver 2000 plan CSR 1
Silver 2000 plan CSR 2
Gold 600 plan
Enjoy, holidays then leaves for ER, UC and old age only.  Primary holidays not subject to Enjoy.
Run Code Online (Sandbox Code Playgroud)

这里有什么替代方案吗?

hive apache-spark-sql

0
推荐指数
1
解决办法
9602
查看次数

Spark 重试尝试配置在 Spark 会话中不起作用

我正在尝试限制火花应用程序尝试。作业失败一次后,会以yarn client模式重新提交。

我正在使用 Azure 数据工厂中的 HDInsight 活动。如果参数是从 ADF 传递的,则仅限一次尝试。

#

 val conf: SparkConf = new SparkConf()
  conf.set("spark.yarn.maxAppAttempts","5")
  conf.set("yarn.resourcemanager.am.max-attempts","5")

  val sc = SparkSession.builder
     .master("yarn")
    .config(conf)
    .appName("test")
    .enableHiveSupport()
    //.config("yarn.resourcemanager.am.max-attempts","1")
    //.config("spark.yarn.maxAppAttempts","1")
    .getOrCreate() ##

sc.conf.set("spark.yarn.maxAppAttempts","1")
Run Code Online (Sandbox Code Playgroud)

从控制台打印参数显示 (spark.yarn.maxAppAttempts,1) (yarn.resourcemanager.am.max-attempts,1)

apache-spark apache-spark-sql pyspark

0
推荐指数
1
解决办法
2884
查看次数

使用 Spark 和 scala 将 TZ 时间戳字符串转换为 UTC 中的给定格式

我有一个名为“lastModified”的列,其中包含如下所示的字符串,表示 GMT 时间。 “2019-06-24T15:36:16.000Z”

我想使用 scala 将此字符串格式化为 Spark 中的yyyy-MM-dd HH:mm:ss格式。为了实现这一目标,我创建了一个带有新列“ConvertedTS”的数据框。这给出了不正确的时间。

我运行的机器位于美国/纽约时区。

df.withColumn("ConvertedTS", date_format(to_utc_timestamp(to_timestamp(col("lastModified"), "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"), "America/New_York"), "yyyy-MM-dd HH:MM:SS").cast(StringType))
Run Code Online (Sandbox Code Playgroud)

我基本上是在寻找 yyyy-MM-dd HH:mm:ss 中以下语句的结果格式

df.withColumn("LastModifiedTS", col("lastModified"))
Run Code Online (Sandbox Code Playgroud)

目前对我有用的方法之一是 udf,但由于不推荐使用 udf,我一直在寻找更多可以使用的直接表达式。

val convertToTimestamp = (logTimestamp: String) => {
    println("logTimeStamp: " + logTimestamp)
    var newDate = ""
    try {
      val sourceFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX")
      sourceFormat.setTimeZone(TimeZone.getTimeZone("GMT"))
      val convertedDate = sourceFormat.parse(logTimestamp)
      val destFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
      destFormat.setTimeZone(TimeZone.getTimeZone("GMT"))
      newDate = destFormat.format(convertedDate)
      println("newDate: " + newDate)
    } catch {
      case e: Exception => e.printStackTrace()
    } …
Run Code Online (Sandbox Code Playgroud)

scala user-defined-functions dataframe apache-spark apache-spark-sql

0
推荐指数
1
解决办法
6213
查看次数

Pyspark 将所有数据帧值增加 1

我试图将数据框中的所有值增加 1,除了 ID 列之外。

例子:

在此输入图像描述

结果:

在此输入图像描述

这是我到目前为止所拥有的,但是当我有很多列要做时(例如 50),它会变得有点长。

df_add = df.select(
  'Id',
  (df['col_a'] + 1).alias('col_a'),
  ..
  ..
)
Run Code Online (Sandbox Code Playgroud)

有没有更Pythonic的方法来达到相同的结果?

python python-3.x apache-spark-sql pyspark

0
推荐指数
1
解决办法
4309
查看次数