小编Tw *_*Nus的帖子

Spark:对数据进行排序和分区的最有效方法是将其写为镶木地板

我的数据原则上是一个表,除了其他"数据"之外,它还包含一列ID和一列GROUP_ID.

在第一步中,我将CSV读入Spark,进行一些处理以准备第二步的数据,并将数据写为镶木地板.第二步做了很多的groupBy('GROUP_ID')Window.partitionBy('GROUP_ID').orderBy('ID').

现在的目标是 - 为了避免在第二步骤洗牌 - 在第一步骤中有效地加载数据,因为这是一定时器.

问题第1部分: AFAIK,Spark在从镶木地板加载时保留了分区(这实际上是任何"优化写入考虑"的基础) - 对吗?

我提出了三种可能性:

  • df.orderBy('ID').write.partitionBy('TRIP_ID').parquet('/path/to/parquet')
  • df.orderBy('ID').repartition(n, 'TRIP_ID').write.parquet('/path/to/parquet')
  • df.repartition(n, 'TRIP_ID').sortWithinPartitions('ID').write.parquet('/path/to/parquet')

我会设置n个别镶木地板文件大约100MB.

问第2部分:它是正确的,这三个选项的目标(避免在第2步洗牌)方面产生"相同" /类似的结果?如果没有,有什么区别?哪一个'更好'?

问题第3部分:关于步骤1,三个选项中哪一个表现更好?

感谢您分享您的知识!


编辑2017-07-24

在做了一些测试(写入和读取镶木地板)后,似乎Spark 在第二步中默认无法恢复partitionByorderBy信息.分区的数量(从df.rdd.getNumPartitions()似乎由核心数量和/或spark.default.parallelism(如果设置)确定,但不是由镶木地板分区的数量决定.因此问题1的答案错误的,问题2和3将是无关紧要.

因此,原来的真正的问题是:有没有办法告诉星火,该数据已经被列分区X和列进行排序ÿ

apache-spark apache-spark-sql pyspark pyspark-sql

15
推荐指数
1
解决办法
1734
查看次数

在Apache Spark中获取重复行的ID(考虑所有其他列)

我有一个Spark sql数据帧,由一ID列和n"数据"列组成,即

id | dat1 | dat2 | ... | datn
Run Code Online (Sandbox Code Playgroud)

所述idcolumnn是唯一确定的,反之,在看dat1 ... datn有可能重复.

我的目标是找到id那些重复的.

到目前为止我的方法:

  • 使用groupBy以下方法获取重复行:

    dup_df = df.groupBy(df.columns[1:]).count().filter('count > 1')

  • 加入dup_df整个df以获取重复的行,包括 id:

    df.join(dup_df, df.columns[1:])

我很确定这基本上是正确的,它失败了,因为dat1 ... datn列包含null值.

要做到joinnull价值观,我发现.eg 这个SO职位.但这需要构建一个巨大的"字符串连接条件".

因此我的问题:

  1. 是否有一种简单/更通用/更pythonic的方法来joins处理null价值观?
  2. 或者,更好的是,是否有另一种(更简单,更美丽,...)方法来获得所需的ids?

BTW:我使用的是Spark 2.1.0和Python 3.5.3

apache-spark apache-spark-sql pyspark pyspark-sql

8
推荐指数
1
解决办法
8422
查看次数

spark基于另一个数据帧的值过滤(删除)行

我有一个"大"数据集(huge_df),其中包含> 20列.其中一列是一个id字段(由生成pyspark.sql.functions.monotonically_increasing_id()).

使用一些标准,我生成第二个数据帧(filter_df),包含id我想稍后过滤的值huge_df.

目前我使用SQL语法来执行此操作:

filter_df.createOrReplaceTempView('filter_view')
huge_df = huge_df.where('id NOT IN (SELECT id FROM filter_view)')
Run Code Online (Sandbox Code Playgroud)

问题1: 有没有办法只使用Python,即无需注册TempView

问题2: 是否有完全不同的方法来完成同样的事情?

apache-spark apache-spark-sql pyspark pyspark-sql

8
推荐指数
1
解决办法
1万
查看次数

使用Pyspark进行单元测试:未关闭的套接字警告

我想用PySpark进行单元测试。测试本身可以工作,但是对于我得到的每个测试

  • ResourceWarning: unclosed <socket.socket [...]>
  • ResourceWarning: unclosed file <_io.BufferedWriter [...]> 警告和
  • DeprecationWarning关于invalid escape sequence

我想了解为什么/如何解决此问题,以免这些警告使我的单元测试输出混乱。

这是MWE:

# filename: pyspark_unittesting.py
# -*- coding: utf-8 -*-

import unittest


def insert_and_collect(val_in):
    from pyspark.sql import SparkSession
    with SparkSession.builder.getOrCreate() as spark:
        col = 'column_x'
        df = spark.createDataFrame([(val_in,)], [col])

        print('one')
        print(df.count())
        print('two')
        collected = df.collect()
        print('three')
        return collected[0][col]


class MyTest(unittest.TestCase):
    def test(self):
        val = 1
        self.assertEqual(insert_and_collect(val), val)
        print('four')


if __name__ == '__main__':
    val = 1
    print('inserted and collected is equal to original: …
Run Code Online (Sandbox Code Playgroud)

python python-3.x python-unittest pyspark pyspark-sql

7
推荐指数
1
解决办法
807
查看次数

独立集群模式下具有Apache Spark的Docker容器

我正在尝试构建包含Apache Spark的Docker映像。IT建立在openjdk-8-jre官方映像上。

目标是在集群模式下执行Spark,因此至少有一个主控(通过启动sbin/start-master.sh)和一个或多个从属(sbin/start-slave.sh)。有关我的Dockerfile和入口点脚本,请参见spark-standalone- docker。

构建本身实际上会经历,问题是当我想运行容器时,它会在之后不久启动和停止。原因是Spark主服务器启动脚本以守护程序模式启动主服务器并退出。这样容器就终止了,因为前台不再运行任何进程。

显而易见的解决方案是在前台运行Spark master进程,但是我不知道怎么做(Google也没有打开任何东西)。我的“解决方法”是tails -f在Spark日志目录上运行。

因此,我的问题是:

  1. 如何在前台运行Apache Spark Master?
  2. 如果第一个不可能/不可行/不可行,那么使容器保持“活动”状态的首选(即最佳实践)解决方案是什么(我真的不想使用无限循环和sleep命令)?

docker apache-spark dockerfile

5
推荐指数
2
解决办法
1830
查看次数

Apache Spark OutOfMemoryError(HeapSpace)

我有一个大约5M行x20列的数据集,包含groupID和rowID.我的目标是检查(某些)列是否包含组内缺少(空)值的固定分数(例如,50%).如果找到,则该组的整个列设置为missing(null).

df = spark.read.parquet('path/to/parquet/')
check_columns = {'col1': ..., 'col2': ..., ...}  # currently len(check_columns) = 8

for col, _ in check_columns.items():
    total = (df
             .groupBy('groupID').count()
             .toDF('groupID', 'n_total')
             )

    missing = (df
               .where(F.col(col).isNull())
               .groupBy('groupID').count()
               .toDF('groupID', 'n_missing')
               )
    # count_missing = count_missing.persist()  # PERSIST TRY 1
    # print('col {} found {} missing'.format(col, missing.count()))  # missing.count() is b/w 1k-5k

    poor_df = (total
               .join(missing, 'groupID')
               .withColumn('freq', F.col('n_missing') / F.col('n_total'))
               .where(F.col('freq') > 0.5)
               .select('groupID')
               .toDF('poor_groupID')
               )

    df = (df
          .join(poor_df, df['groupID'] == poor_df['poor_groupID'], 'left_outer')
          .withColumn(col, …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql pyspark pyspark-sql

5
推荐指数
0
解决办法
1739
查看次数