我有一个 1000 万条记录数据框。我的要求是我需要对 Pandas 中的这些数据进行一些操作,而且我没有一次将所有 1000 万条记录放入 Pandas 的内存。所以我希望能够将它分块并在每个块上使用 toPandas
df = sqlContext.sql("select * from db.table")
#do chunking to take X records at a time
#how do I generated chunked_df?
p_df = chunked_df.toPandas()
#do things to p_df
Run Code Online (Sandbox Code Playgroud)
我如何将我的数据帧分成相等的 x 部分或按记录计数分成几部分,比如一次 100 万。任何一种解决方案都是可以接受的,我只需要以较小的块处理它。
我有一个包含 1000 万条记录和 150 列的 spark 数据框。我正在尝试将其转换为熊猫 DF。
x = df.toPandas()
# do some things to x
Run Code Online (Sandbox Code Playgroud)
它失败了ordinal must be >= 1。我假设这是因为一次处理太大了。是否可以将其分块并将其转换为每个块的熊猫 DF?
全栈:
ValueError Traceback (most recent call last)
<command-2054265283599157> in <module>()
158 from db.table where snapshot_year_month=201806""")
--> 159 ps = x.toPandas()
160 # ps[["pol_nbr",
161 # "pol_eff_dt",
/databricks/spark/python/pyspark/sql/dataframe.py in toPandas(self)
2029 raise RuntimeError("%s\n%s" % (_exception_message(e), msg))
2030 else:
-> 2031 pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
2032
2033 dtype = {}
/databricks/spark/python/pyspark/sql/dataframe.py in collect(self)
480 with SCCallSiteSync(self._sc) as …Run Code Online (Sandbox Code Playgroud) 我DataFrame在 SQL 中有一个表。DataFrame如果某个列的值是数字,我想过滤它。
val df = sqlContext.sql("select * from myTable");
val filter = df.filter("ISNUMERIC('col_a')");
//I want filter to be a dataframe of df where the values in col_a are numeric.
Run Code Online (Sandbox Code Playgroud)
我目前的解决方案不起作用。我怎样才能做到这一点?
我在 spark 中使用的数据块中有一个简单的 UDF。我不能使用 println 或 log4j 或其他东西,因为它会被输出到执行中,我需要在驱动程序中使用它。我有一个非常系统的日志设置
var logMessage = ""
def log(msg: String){
logMessage += msg + "\n"
}
def writeLog(file: String){
println("start write")
println(logMessage)
println("end write")
}
def warning(msg: String){
log("*WARNING* " + msg)
}
val CleanText = (s: int) => {
log("I am in this UDF")
s+2
}
sqlContext.udf.register("CleanText", CleanText)
Run Code Online (Sandbox Code Playgroud)
我怎样才能让它正常运行并登录到驱动程序?
我有[~]一些我正在阅读的 csv 文件的分隔符。
1[~]a[~]b[~]dd[~][~]ww[~][~]4[~]4[~][~][~][~][~]
Run Code Online (Sandbox Code Playgroud)
我试过这个
val rddFile = sc.textFile("file.csv")
val rddTransformed = rddFile.map(eachLine=>eachLine.split("[~]"))
val df = rddTransformed.toDF()
display(df)
Run Code Online (Sandbox Code Playgroud)
然而,这个问题在于它是作为单个值数组出现的,每个字段中都有[和]。所以数组将是
["1[","]a[","]b[",...]
Run Code Online (Sandbox Code Playgroud)
我不能用
val df = spark.read.option("sep", "[~]").csv("file.csv")
Run Code Online (Sandbox Code Playgroud)
因为不支持多字符分隔符。我可以采取什么其他方法?
1[~]a[~]b[~]dd[~][~]ww[~][~]4[~]4[~][~][~][~][~]
2[~]a[~]b[~]dd[~][~]ww[~][~]4[~]4[~][~][~][~][~]
3[~]a[~]b[~]dd[~][~]ww[~][~]4[~]4[~][~][~][~][~]
Run Code Online (Sandbox Code Playgroud)
编辑 - 这不是重复的,重复的线程是关于多分隔符的,这是多字符单分隔符
我试图在 spark 中生成一列空值,但不是空数据类型。
所以
sqlContext.sql("select null as newcol from db.table")
Run Code Online (Sandbox Code Playgroud)
将不起作用,因为它会将 newcol 数据类型设置为 null。
sqlContext.sql("select cast(null as newcol as string) from db.table")
Run Code Online (Sandbox Code Playgroud)
将不起作用,因为它将使所有值“空”作为 4 个字符的字符串表示形式。
我怎样才能做到这一点?
最终目标是将其写入 csv,它不支持空列类型,我需要数据框中的实际值为空而不是空字符串。
这是我当前的代码:
pipe_exec_df_final_grouped = pipe_exec_df_final.groupBy("application_id").agg(collect_list("table_name").alias("tables"))
Run Code Online (Sandbox Code Playgroud)
但是,在我的收集列表中,我想要多个列值,因此聚合列将是一个数组数组。当前结果如下:
1|[a,b,c,d]
2|[e,f,g,h]
Run Code Online (Sandbox Code Playgroud)
但是,我还想保留另一个附加到聚集的列(我们称其为“状态”列名称)。所以我的新输出将是:
1|[[a,pass],[b,fail],[c,fail],[d,pass]]
...
Run Code Online (Sandbox Code Playgroud)
我试过collect_list("table_name, status") 但是collect_list只用一个列名。我该如何完成我想做的事情?