小编tes*_*acc的帖子

spark 可以将数据帧拆分为 topandas 的部分

我有一个 1000 万条记录数据框。我的要求是我需要对 Pandas 中的这些数据进行一些操作,而且我没有一次将所有 1000 万条记录放入 Pandas 的内存。所以我希望能够将它分块并在每个块上使用 toPandas

df = sqlContext.sql("select * from db.table")
#do chunking to take X records at a time
#how do I generated chunked_df?
p_df = chunked_df.toPandas()
#do things to p_df
Run Code Online (Sandbox Code Playgroud)

我如何将我的数据帧分成相等的 x 部分或按记录计数分成几部分,比如一次 100 万。任何一种解决方案都是可以接受的,我只需要以较小的块处理它。

python pandas apache-spark

7
推荐指数
1
解决办法
4916
查看次数

来自 spark 数据帧的块 topandas

我有一个包含 1000 万条记录和 150 列的 spark 数据框。我正在尝试将其转换为熊猫 DF。

x = df.toPandas()
# do some things to x
Run Code Online (Sandbox Code Playgroud)

它失败了ordinal must be >= 1。我假设这是因为一次处理太大了。是否可以将其分块并将其转换为每个块的熊猫 DF?

全栈:

ValueError                                Traceback (most recent call last)
<command-2054265283599157> in <module>()
    158 from db.table where snapshot_year_month=201806""")
--> 159 ps = x.toPandas()
    160 # ps[["pol_nbr",
    161 # "pol_eff_dt",

/databricks/spark/python/pyspark/sql/dataframe.py in toPandas(self)
   2029                 raise RuntimeError("%s\n%s" % (_exception_message(e), msg))
   2030         else:
-> 2031             pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
   2032 
   2033             dtype = {}

/databricks/spark/python/pyspark/sql/dataframe.py in collect(self)
    480         with SCCallSiteSync(self._sc) as …
Run Code Online (Sandbox Code Playgroud)

python pandas apache-spark

6
推荐指数
1
解决办法
3017
查看次数

可以通过 ISNUMERIC 函数过滤 Spark 数据帧吗?

DataFrame在 SQL 中有一个表。DataFrame如果某个列的值是数字,我想过滤它。

val df = sqlContext.sql("select * from myTable");
val filter = df.filter("ISNUMERIC('col_a')");
//I want filter to be a dataframe of df where the values in col_a are numeric.
Run Code Online (Sandbox Code Playgroud)

我目前的解决方案不起作用。我怎样才能做到这一点?

scala apache-spark

5
推荐指数
1
解决办法
1万
查看次数

从 spark udf 记录到驱动程序

我在 spark 中使用的数据块中有一个简单的 UDF。我不能使用 println 或 log4j 或其他东西,因为它会被输出到执行中,我需要在驱动程序中使用它。我有一个非常系统的日志设置

var logMessage = ""

def log(msg: String){
  logMessage += msg + "\n"
}

def writeLog(file: String){
  println("start write")
  println(logMessage)
  println("end write")
}

def warning(msg: String){
  log("*WARNING* " + msg)
}

val CleanText = (s: int) => {
  log("I am in this UDF")
  s+2
}

sqlContext.udf.register("CleanText", CleanText)
Run Code Online (Sandbox Code Playgroud)

我怎样才能让它正常运行并登录到驱动程序?

scala apache-spark databricks

5
推荐指数
1
解决办法
4254
查看次数

可以在 spark 中处理多字符分隔符

我有[~]一些我正在阅读的 csv 文件的分隔符。

1[~]a[~]b[~]dd[~][~]ww[~][~]4[~]4[~][~][~][~][~]
Run Code Online (Sandbox Code Playgroud)

我试过这个

val rddFile = sc.textFile("file.csv")
val rddTransformed = rddFile.map(eachLine=>eachLine.split("[~]"))
val df = rddTransformed.toDF()
display(df)
Run Code Online (Sandbox Code Playgroud)

然而,这个问题在于它是作为单个值数组出现的,每个字段中都有[]。所以数组将是

["1[","]a[","]b[",...]
Run Code Online (Sandbox Code Playgroud)

我不能用

val df = spark.read.option("sep", "[~]").csv("file.csv")
Run Code Online (Sandbox Code Playgroud)

因为不支持多字符分隔符。我可以采取什么其他方法?

1[~]a[~]b[~]dd[~][~]ww[~][~]4[~]4[~][~][~][~][~]
2[~]a[~]b[~]dd[~][~]ww[~][~]4[~]4[~][~][~][~][~]
3[~]a[~]b[~]dd[~][~]ww[~][~]4[~]4[~][~][~][~][~]
Run Code Online (Sandbox Code Playgroud)

编辑 - 这不是重复的,重复的线程是关于多分隔符的,这是多字符单分隔符

scala apache-spark databricks

3
推荐指数
1
解决办法
1万
查看次数

在火花中选择新列作为空字符串

我试图在 spark 中生成一列空值,但不是空数据类型。

所以

sqlContext.sql("select null as newcol from db.table")
Run Code Online (Sandbox Code Playgroud)

将不起作用,因为它会将 newcol 数据类型设置为 null。

sqlContext.sql("select cast(null as newcol as string) from db.table")
Run Code Online (Sandbox Code Playgroud)

将不起作用,因为它将使所有值“空”作为 4 个字符的字符串表示形式。

我怎样才能做到这一点?

最终目标是将其写入 csv,它不支持空列类型,我需要数据框中的实际值为空而不是空字符串。

sql hiveql apache-spark

2
推荐指数
1
解决办法
2009
查看次数

Spark agg为多个列收集单个列表

这是我当前的代码:

pipe_exec_df_final_grouped = pipe_exec_df_final.groupBy("application_id").agg(collect_list("table_name").alias("tables"))
Run Code Online (Sandbox Code Playgroud)

但是,在我的收集列表中,我想要多个列值,因此聚合列将是一个数组数组。当前结果如下:

1|[a,b,c,d]
2|[e,f,g,h]
Run Code Online (Sandbox Code Playgroud)

但是,我还想保留另一个附加到聚集的列(我们称其为“状态”列名称)。所以我的新输出将是:

1|[[a,pass],[b,fail],[c,fail],[d,pass]]
...
Run Code Online (Sandbox Code Playgroud)

我试过collect_list("table_name, status") 但是collect_list只用一个列名。我该如何完成我想做的事情?

scala group-by apache-spark apache-spark-sql

0
推荐指数
1
解决办法
1133
查看次数