相关疑难解决方法(0)

Apache Spark的主键

我正在与Apache Spark和PostgreSQL建立JDBC连接,我想在我的数据库中插入一些数据.当我使用append模式时,我需要id为每个模式指定DataFrame.Row.Spark有什么方法可以创建主键吗?

database postgresql hadoop apache-spark

25
推荐指数
2
解决办法
2万
查看次数

如何在pySpark数据帧中添加Row id

我有一个csv文件; 我在pyspark中转换为DataFrame(df); 经过一番改造; 我想在df中添加一列; 这应该是简单的行id(从0或1开始到N).

我在rdd中转换了df并使用"zipwithindex".我将生成的rdd转换回df.这种方法有效,但它产生了250k的任务,并且需要花费大量的时间来执行.我想知道是否还有其他方法可以减少运行时间.

以下是我的代码片段; 我正在处理的csv文件很大; 包含数十亿行.

debug_csv_rdd = (sc.textFile("debug.csv")
  .filter(lambda x: x.find('header') == -1)
  .map(lambda x : x.replace("NULL","0")).map(lambda p: p.split(','))
  .map(lambda x:Row(c1=int(x[0]),c2=int(x[1]),c3=int(x[2]),c4=int(x[3]))))

debug_csv_df = sqlContext.createDataFrame(debug_csv_rdd)
debug_csv_df.registerTempTable("debug_csv_table")
sqlContext.cacheTable("debug_csv_table")

r0 = sqlContext.sql("SELECT c2 FROM debug_csv_table WHERE c1 = 'str'")
r0.registerTempTable("r0_table")

r0_1 = (r0.flatMap(lambda x:x)
    .zipWithIndex()
    .map(lambda x: Row(c1=x[0],id=int(x[1]))))

r0_df=sqlContext.createDataFrame(r0_2)
r0_df.show(10) 
Run Code Online (Sandbox Code Playgroud)

python apache-spark apache-spark-sql pyspark spark-dataframe

17
推荐指数
1
解决办法
3万
查看次数

PySpark DataFrames - 枚举的方式而不转换为Pandas?

我有一个非常大的pyspark.sql.dataframe.DataFrame,名为df.我需要一些枚举记录的方法 - 因此,能够访问具有特定索引的记录.(或选择索引范围的记录组)

在熊猫中,我可以做到

indexes=[2,3,6,7] 
df[indexes]
Run Code Online (Sandbox Code Playgroud)

在这里,我想要类似的东西,(并且不将数据帧转换为pandas)

我能得到的最接近的是:

  • 通过以下方式枚举原始数据框中的所有对象:

    indexes=np.arange(df.count())
    df_indexed=df.withColumn('index', indexes)
    
    Run Code Online (Sandbox Code Playgroud)
    • 使用where()函数搜索我需要的值.

问题:

  1. 为什么它不起作用以及如何使其工作?如何向数据框添加行?
  2. 它会在以后工作,如下所示:

     indexes=[2,3,6,7] 
     df1.where("index in indexes").collect()
    
    Run Code Online (Sandbox Code Playgroud)
  3. 有没有更快更简单的方法来处理它?

python bigdata apache-spark rdd pyspark

14
推荐指数
2
解决办法
2万
查看次数

如何以两行划分pyspark数据帧

我在Databricks工作.

我有一个包含500行的数据帧,我想创建包含100行的两个数据帧,另一个包含剩余的400行.

+--------------------+----------+
|              userid| eventdate|
+--------------------+----------+
|00518b128fc9459d9...|2017-10-09|
|00976c0b7f2c4c2ca...|2017-12-16|
|00a60fb81aa74f35a...|2017-12-04|
|00f9f7234e2c4bf78...|2017-05-09|
|0146fe6ad7a243c3b...|2017-11-21|
|016567f169c145ddb...|2017-10-16|
|01ccd278777946cb8...|2017-07-05|
Run Code Online (Sandbox Code Playgroud)

我试过以下但是收到错误

df1 = df[:99]
df2 = df[100:499]


TypeError: unexpected item type: <type 'slice'>
Run Code Online (Sandbox Code Playgroud)

python pyspark spark-dataframe databricks

8
推荐指数
3
解决办法
1万
查看次数

PySpark - 获取组中每行的行号

使用pyspark,我希望能够对spark数据帧进行分组,对组进行排序,然后提供行号.所以

Group    Date
  A      2000
  A      2002
  A      2007
  B      1999
  B      2015
Run Code Online (Sandbox Code Playgroud)

会成为

Group    Date    row_num
  A      2000      0
  A      2002      1
  A      2007      2
  B      1999      0
  B      2015      1
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql pyspark spark-dataframe pyspark-sql

6
推荐指数
2
解决办法
2万
查看次数

如何从pyspark的数据框中选择一系列行

我有一个包含 10609 行的数据框,我想一次将 100 行转换为 JSON 并将它们发送回网络服务。

我曾尝试使用 SQL 的 LIMIT 子句,例如

temptable = spark.sql("select item_code_1 from join_table limit 100")
Run Code Online (Sandbox Code Playgroud)

这将返回前 100 行,但如果我想要接下来的 100 行,我试过这个但没有用。

temptable = spark.sql("select item_code_1 from join_table limit 100, 200")
Run Code Online (Sandbox Code Playgroud)

错误:Py4JJavaError:调用 o22.sql 时发生错误。: org.apache.spark.sql.catalyst.parser.ParseException: 不匹配的输入 ',' 期望(第 1 行,位置 44)

== SQL ==

select item_code_1 from join_table limit 100, 200
Run Code Online (Sandbox Code Playgroud)

apache-spark-sql pyspark

3
推荐指数
1
解决办法
2万
查看次数