我有一个像这样的数据框:
+--+--------+--------+----+-------------+------------------------------+
|id|name |lastname|age |timestamp |creditcards |
+--+--------+--------+----+-------------+------------------------------+
|1 |michel |blanc |35 |1496756626921|[[hr6,3569823], [ee3,1547869]]|
|2 |peter |barns |25 |1496756626551|[[ye8,4569872], [qe5,3485762]]|
+--+--------+--------+----+-------------+------------------------------+
Run Code Online (Sandbox Code Playgroud)
我的 df 的架构如下所示:
root
|-- id: string (nullable = true)
|-- name: string (nullable = true)
|-- lastname: string (nullable = true)
|-- age: string (nullable = true)
|-- timestamp: string (nullable = true)
|-- creditcards: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- id: string (nullable = true)
| | |-- number: …Run Code Online (Sandbox Code Playgroud) 我想创建df看起来像这样简单的 DataFrame:
+----------+----------+
| timestamp| col2|
+----------+----------+
|2018-01-11| 123|
+----------+----------+
Run Code Online (Sandbox Code Playgroud)
这就是我所做的:
val values = List(List("timestamp", "2018-01-11"),List("col2","123")).map(x =>(x(0), x(1)))
val df = values.toDF
df.show()
Run Code Online (Sandbox Code Playgroud)
这就是我得到的:
+---------+----------+
| _1| _2|
+---------+----------+
|timestamp|2018-01-11|
| col2| 123|
+---------+----------+
Run Code Online (Sandbox Code Playgroud)
这是怎么回事?
我必须应用左连接来连接我想要连接的数据帧。
df1 =
+----------+---------------+
|product_PK| rec_product_PK|
+----------+---------------+
| 560| 630|
| 710| 240|
| 610| 240|
Run Code Online (Sandbox Code Playgroud)
df2=
+----------+---------------+-----+
|product_PK| rec_product_PK| rank|
+----------+---------------+-----+
| 560| 610| 1|
| 560| 240| 1|
| 610| 240| 0|
Run Code Online (Sandbox Code Playgroud)
问题是df1只包含 500 行,而df2包含 600.000.000 行和 24 个分区。我的左连接需要一段时间才能执行。我等了5个小时还没完。
val result = df1.join(df2,Seq("product_PK","rec_product_PK"),"left")
Run Code Online (Sandbox Code Playgroud)
结果应包含 500 行。我使用以下参数从 Spark-shell 执行代码:
spark-shell -driver-memory 10G --driver-cores 4 --executor-memory 10G --num-executors 2 --executor-cores 4
Run Code Online (Sandbox Code Playgroud)
我怎样才能加快这个过程?
更新:
的输出df2.explain(true):
val result = df1.join(df2,Seq("product_PK","rec_product_PK"),"left")
Run Code Online (Sandbox Code Playgroud) 这在 Spark-Scala 中可能吗?我使用的是火花2.2
val func="""withColumn("seq", lit("this is seq"))
.withColumn("id", lit("this is id"))
.withColumn("type", lit("this is type"))"""
Run Code Online (Sandbox Code Playgroud)
然后在数据框(df)之上使用上面的变量,如下所示
val df2=df.$func
Run Code Online (Sandbox Code Playgroud)
我将这些函数保存到变量的原因是我想根据条件动态应用函数。有时我可能需要 1 个 withColumn 函数,有时我可能需要多个 withColumn 函数。
感谢任何帮助。谢谢!
我有一个日期列,它是数据帧中的字符串,采用 2017-01-01 12:15:43 时间戳格式。
现在我想使用 dataframe 而不是 Spark sql 从该列获取工作日编号(1 到 7)。
像下面这样
df.select(weekday(col("colname")))
Run Code Online (Sandbox Code Playgroud)
我在 python 和 sql 中找到了一个,但在 scala 中没有找到。有谁能帮我解决这个问题吗
在 sql 上下文中
sqlContext.sql("select date_format(to_date('2017-01-01'),'W') as week")
Run Code Online (Sandbox Code Playgroud) 我们正在尝试 Spark DataFrameselectExpr及其对一列的工作,当我添加多于一列时,它会抛出错误。
第一个工作正常,第二个抛出错误。
代码示例:
df1.selectExpr("coalesce(gtr_pd_am,0 )").show(2)
df1.selectExpr("coalesce(gtr_pd_am,0),coalesce(prev_gtr_pd_am,0)").show()
Run Code Online (Sandbox Code Playgroud)
错误日志:
>>> df1.selectExpr("coalesce(gtr_pd_am,0),coalesce(prev_gtr_pd_am,0)").show()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/hdp/2.6.5.0-292/spark2/python/pyspark/sql/dataframe.py", line 1216, in selectExpr
jdf = self._jdf.selectExpr(self._jseq(expr))
File "/usr/hdp/2.6.5.0-292/spark2/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
File "/usr/hdp/2.6.5.0-292/spark2/python/pyspark/sql/utils.py", line 73, in deco
raise ParseException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.ParseException: u"\nmismatched input ',' expecting <EOF>(line 1, pos 21)\n\n== SQL ==\ncoalesce(gtr_pd_am,0),coalesce(prev_gtr_pd_am,0)\n---------------------^^^\n"
Run Code Online (Sandbox Code Playgroud) 我在 HIVE 中有一个 AVRO 格式的表。该表中的一列(字符串数据类型)包含带有换行符的数据,因此当我选择(使用 beeline 或 pyspark)时,我会得到多行。我确实在选择中尝试了选项 REGEXP_REPLACE(col1,"\n","") ,但它仍然返回多行。
当我复制并粘贴到文本编辑器中时,col1 的值如下所示:
NY - Enjoy holidays or Enjoy leaves.
Silver 2000 plan
Silver 2000 plan CSR 1
Silver 2000 plan CSR 2
Gold 600 plan
Enjoy, holidays then leaves for ER, UC and old age only. Primary holidays not subject to Enjoy.
Run Code Online (Sandbox Code Playgroud)
这里有什么替代方案吗?
我正在尝试限制火花应用程序尝试。作业失败一次后,会以yarn client模式重新提交。
我正在使用 Azure 数据工厂中的 HDInsight 活动。如果参数是从 ADF 传递的,则仅限一次尝试。
val conf: SparkConf = new SparkConf()
conf.set("spark.yarn.maxAppAttempts","5")
conf.set("yarn.resourcemanager.am.max-attempts","5")
val sc = SparkSession.builder
.master("yarn")
.config(conf)
.appName("test")
.enableHiveSupport()
//.config("yarn.resourcemanager.am.max-attempts","1")
//.config("spark.yarn.maxAppAttempts","1")
.getOrCreate() ##
sc.conf.set("spark.yarn.maxAppAttempts","1")
Run Code Online (Sandbox Code Playgroud)
从控制台打印参数显示 (spark.yarn.maxAppAttempts,1) (yarn.resourcemanager.am.max-attempts,1)
我有一个名为“lastModified”的列,其中包含如下所示的字符串,表示 GMT 时间。 “2019-06-24T15:36:16.000Z”
我想使用 scala 将此字符串格式化为 Spark 中的yyyy-MM-dd HH:mm:ss格式。为了实现这一目标,我创建了一个带有新列“ConvertedTS”的数据框。这给出了不正确的时间。
我运行的机器位于美国/纽约时区。
df.withColumn("ConvertedTS", date_format(to_utc_timestamp(to_timestamp(col("lastModified"), "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"), "America/New_York"), "yyyy-MM-dd HH:MM:SS").cast(StringType))
Run Code Online (Sandbox Code Playgroud)
我基本上是在寻找 yyyy-MM-dd HH:mm:ss 中以下语句的结果格式
df.withColumn("LastModifiedTS", col("lastModified"))
Run Code Online (Sandbox Code Playgroud)
目前对我有用的方法之一是 udf,但由于不推荐使用 udf,我一直在寻找更多可以使用的直接表达式。
val convertToTimestamp = (logTimestamp: String) => {
println("logTimeStamp: " + logTimestamp)
var newDate = ""
try {
val sourceFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX")
sourceFormat.setTimeZone(TimeZone.getTimeZone("GMT"))
val convertedDate = sourceFormat.parse(logTimestamp)
val destFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
destFormat.setTimeZone(TimeZone.getTimeZone("GMT"))
newDate = destFormat.format(convertedDate)
println("newDate: " + newDate)
} catch {
case e: Exception => e.printStackTrace()
} …Run Code Online (Sandbox Code Playgroud) scala user-defined-functions dataframe apache-spark apache-spark-sql
我试图将数据框中的所有值增加 1,除了 ID 列之外。
例子:
结果:
这是我到目前为止所拥有的,但是当我有很多列要做时(例如 50),它会变得有点长。
df_add = df.select(
'Id',
(df['col_a'] + 1).alias('col_a'),
..
..
)
Run Code Online (Sandbox Code Playgroud)
有没有更Pythonic的方法来达到相同的结果?
apache-spark-sql ×10
apache-spark ×7
scala ×6
pyspark ×3
dataframe ×2
hive ×1
join ×1
json ×1
python ×1
python-3.x ×1