我的数据原则上是一个表,除了其他"数据"之外,它还包含一列ID和一列GROUP_ID.
在第一步中,我将CSV读入Spark,进行一些处理以准备第二步的数据,并将数据写为镶木地板.第二步做了很多的groupBy('GROUP_ID')和Window.partitionBy('GROUP_ID').orderBy('ID').
现在的目标是 - 为了避免在第二步骤洗牌 - 在第一步骤中有效地加载数据,因为这是一定时器.
问题第1部分: AFAIK,Spark在从镶木地板加载时保留了分区(这实际上是任何"优化写入考虑"的基础) - 对吗?
我提出了三种可能性:
df.orderBy('ID').write.partitionBy('TRIP_ID').parquet('/path/to/parquet')df.orderBy('ID').repartition(n, 'TRIP_ID').write.parquet('/path/to/parquet')df.repartition(n, 'TRIP_ID').sortWithinPartitions('ID').write.parquet('/path/to/parquet')我会设置n个别镶木地板文件大约100MB.
问第2部分:它是正确的,这三个选项的目标(避免在第2步洗牌)方面产生"相同" /类似的结果?如果没有,有什么区别?哪一个'更好'?
问题第3部分:关于步骤1,三个选项中哪一个表现更好?
感谢您分享您的知识!
编辑2017-07-24
在做了一些测试(写入和读取镶木地板)后,似乎Spark 在第二步中默认无法恢复partitionBy和orderBy信息.分区的数量(从df.rdd.getNumPartitions()似乎由核心数量和/或spark.default.parallelism(如果设置)确定,但不是由镶木地板分区的数量决定.因此问题1的答案是错误的,问题2和3将是无关紧要.
因此,原来的真正的问题是:有没有办法告诉星火,该数据已经被列分区X和列进行排序ÿ?
我有一个Spark sql数据帧,由一ID列和n"数据"列组成,即
id | dat1 | dat2 | ... | datn
Run Code Online (Sandbox Code Playgroud)
所述idcolumnn是唯一确定的,反之,在看dat1 ... datn有可能重复.
我的目标是找到id那些重复的.
到目前为止我的方法:
使用groupBy以下方法获取重复行:
dup_df = df.groupBy(df.columns[1:]).count().filter('count > 1')
加入dup_df整个df以获取重复的行,包括 id:
df.join(dup_df, df.columns[1:])
我很确定这基本上是正确的,它失败了,因为dat1 ... datn列包含null值.
要做到join的null价值观,我发现.eg 这个SO职位.但这需要构建一个巨大的"字符串连接条件".
因此我的问题:
joins处理null价值观?ids?BTW:我使用的是Spark 2.1.0和Python 3.5.3
我有一个"大"数据集(huge_df),其中包含> 20列.其中一列是一个id字段(由生成pyspark.sql.functions.monotonically_increasing_id()).
使用一些标准,我生成第二个数据帧(filter_df),包含id我想稍后过滤的值huge_df.
目前我使用SQL语法来执行此操作:
filter_df.createOrReplaceTempView('filter_view')
huge_df = huge_df.where('id NOT IN (SELECT id FROM filter_view)')
Run Code Online (Sandbox Code Playgroud)
问题1:
有没有办法只使用Python,即无需注册TempView?
问题2: 是否有完全不同的方法来完成同样的事情?
我想用PySpark进行单元测试。测试本身可以工作,但是对于我得到的每个测试
ResourceWarning: unclosed <socket.socket [...]> 和ResourceWarning: unclosed file <_io.BufferedWriter [...]> 警告和DeprecationWarning关于invalid escape sequence。我想了解为什么/如何解决此问题,以免这些警告使我的单元测试输出混乱。
这是MWE:
# filename: pyspark_unittesting.py
# -*- coding: utf-8 -*-
import unittest
def insert_and_collect(val_in):
from pyspark.sql import SparkSession
with SparkSession.builder.getOrCreate() as spark:
col = 'column_x'
df = spark.createDataFrame([(val_in,)], [col])
print('one')
print(df.count())
print('two')
collected = df.collect()
print('three')
return collected[0][col]
class MyTest(unittest.TestCase):
def test(self):
val = 1
self.assertEqual(insert_and_collect(val), val)
print('four')
if __name__ == '__main__':
val = 1
print('inserted and collected is equal to original: …Run Code Online (Sandbox Code Playgroud) 我正在尝试构建包含Apache Spark的Docker映像。IT建立在openjdk-8-jre官方映像上。
目标是在集群模式下执行Spark,因此至少有一个主控(通过启动sbin/start-master.sh)和一个或多个从属(sbin/start-slave.sh)。有关我的Dockerfile和入口点脚本,请参见spark-standalone- docker。
构建本身实际上会经历,问题是当我想运行容器时,它会在之后不久启动和停止。原因是Spark主服务器启动脚本以守护程序模式启动主服务器并退出。这样容器就终止了,因为前台不再运行任何进程。
显而易见的解决方案是在前台运行Spark master进程,但是我不知道怎么做(Google也没有打开任何东西)。我的“解决方法”是tails -f在Spark日志目录上运行。
因此,我的问题是:
我有一个大约5M行x20列的数据集,包含groupID和rowID.我的目标是检查(某些)列是否包含组内缺少(空)值的固定分数(例如,50%).如果找到,则该组的整个列设置为missing(null).
df = spark.read.parquet('path/to/parquet/')
check_columns = {'col1': ..., 'col2': ..., ...} # currently len(check_columns) = 8
for col, _ in check_columns.items():
total = (df
.groupBy('groupID').count()
.toDF('groupID', 'n_total')
)
missing = (df
.where(F.col(col).isNull())
.groupBy('groupID').count()
.toDF('groupID', 'n_missing')
)
# count_missing = count_missing.persist() # PERSIST TRY 1
# print('col {} found {} missing'.format(col, missing.count())) # missing.count() is b/w 1k-5k
poor_df = (total
.join(missing, 'groupID')
.withColumn('freq', F.col('n_missing') / F.col('n_total'))
.where(F.col('freq') > 0.5)
.select('groupID')
.toDF('poor_groupID')
)
df = (df
.join(poor_df, df['groupID'] == poor_df['poor_groupID'], 'left_outer')
.withColumn(col, …Run Code Online (Sandbox Code Playgroud)