小编hi-*_*zir的帖子

Spark + Parquet + Snappy:spark shuffle 数据后整体压缩率下降

社区!

请帮助我了解如何使用 Spark 获得更好的压缩率?

让我描述一下案例:

  1. 我有数据集,让我们把它的产品在其上的实木复合地板文件使用的编解码器使用Sqoop ImportTool进口HDFS瞬间。作为导入的结果,我有 100 个文件,总大小为46 GB,文件大小不同(最小 11MB,最大 1.5GB,平均 ~ 500MB)。记录总数超过80 亿条,84 列

  2. 我也在使用snappy对 Spark 进行简单的读取/重新分区/写入,结果我得到:

~ 100 GB输出大小,具有相同的文件数、相同的编解码器、相同的数量和相同的列。

代码片段:

val productDF = spark.read.parquet("/ingest/product/20180202/22-43/")

productDF
.repartition(100)
.write.mode(org.apache.spark.sql.SaveMode.Overwrite)
.option("compression", "snappy")
.parquet("/processed/product/20180215/04-37/read_repartition_write/general")
Run Code Online (Sandbox Code Playgroud)
  1. 使用镶木地板工具,我查看了摄取和处理的随机文件,它们如下所示:

摄取:

creator:                        parquet-mr version 1.5.0-cdh5.11.1 (build ${buildNumber}) 
extra:                          parquet.avro.schema = {"type":"record","name":"AutoGeneratedSchema","doc":"Sqoop import of QueryResult","fields"

and almost all columns looks like
AVAILABLE: OPTIONAL INT64 R:0 D:1

row group 1:                    RC:3640100 TS:36454739 OFFSET:4 

AVAILABLE:                       INT64 SNAPPY …
Run Code Online (Sandbox Code Playgroud)

snappy apache-spark parquet apache-spark-sql spark-dataframe

11
推荐指数
1
解决办法
1万
查看次数

Spark - 使用不可序列化的成员序列化对象

我将在Spark的上下文中提出这个问题,因为这就是我所面临的问题,但这可能是一个普通的Java问题.

在我们的火花工作中,我们Resolver需要在所有工人中使用它(它在udf中使用).问题是它不可序列化,我们无法改变它.解决方案是将其作为序列化的另一个类的成员.

所以我们最终得到:

public class Analyzer implements Serializable {
    transient Resolver resolver;

    public Analyzer() {
        System.out.println("Initializing a Resolver...");
        resolver = new Resolver();
    }

    public int resolve(String key) {
         return resolver.find(key);
    }
}
Run Code Online (Sandbox Code Playgroud)

然后我们broadcast使用Spark API这个类:

 val analyzer = sparkContext.broadcast(new Analyzer())
Run Code Online (Sandbox Code Playgroud)

(有关Spark广播的更多信息,请点击此处)

然后,我们继续analyzer在UDF中使用,作为我们的火花代码的一部分,具体如下:

val resolve = udf((key: String) => analyzer.value.resolve(key))
val result = myDataFrame.select("key", resolve("key")).count()
Run Code Online (Sandbox Code Playgroud)

这一切都按预期工作,但让我们感到疑惑.

Resolver没有实现Serializable,因此标记为transient- 意味着它不会与它的所有者对象一起序列化Analyzer.

但是正如您从上面的代码中可以清楚地看到的那样,该resolve()方法使用resolver,因此它不能为null.事实上并非如此.代码有效.

因此,如果字段未通过序列化传递,那么resolver …

java serialization scala kryo apache-spark

9
推荐指数
1
解决办法
923
查看次数

在Pandas数据帧列的子集上使用Pipeline中的scikit StandardScaler

我想在pandas dataframe列的子集上使用sklearn.preprocessing.StandardScaler.在管道之外,这是微不足道的:

df[['A', 'B']] = scaler.fit_transform(df[['A', 'B']])
Run Code Online (Sandbox Code Playgroud)

但现在假设我在类型字符串的df中有列'C',以及下面的管道定义

from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline

pipeline =  Pipeline([
                ('standard', StandardScaler())
            ])

df_scaled = pipeline.fit_transform(df)
Run Code Online (Sandbox Code Playgroud)

如何告诉StandardScaler只扩展A列和B列?

我已经习惯了SparkML管道,其中要缩放的特征可以传递给缩放器组件的构造函数:

normalizer = Normalizer(inputCol="features", outputCol="features_norm", p=1.0)
Run Code Online (Sandbox Code Playgroud)

注意:要素列包含稀疏向量,其中包含Spark的VectorAssembler创建的所有数字要素列

python pandas scikit-learn

9
推荐指数
2
解决办法
1207
查看次数

将序列转换为列表的功能

我们知道

list(map(f,[1,2],[3,4],[6,7]))
Run Code Online (Sandbox Code Playgroud)

相当于

[f(1,3,6),f(2,4,7)]
Run Code Online (Sandbox Code Playgroud)

我想知道是否有tolist相当于的内置函数[],所以

tolist(a,b,c,d)
Run Code Online (Sandbox Code Playgroud)

相当于[a,b,c,d].

我认为这样的函数在函数式编程中很有用.因为许多函数采用list参数而不是sequence.

当然,一个简单的自定义方式是lambda *x:list(x),但我总觉得它在语法上很麻烦,特别是在函数式编程风格中使用它

map(lambda *x:list(x),[1,2],[3,4],[6,7])
Run Code Online (Sandbox Code Playgroud)

所以我的问题是,如果没有这样的内置tolist,我们是否可以使用FP包更加优雅地构建它toolz

PS:我实际想要实现的是添加的线程版本(我知道numpy,但我现在不想使用它)

from toolz.curried import *
import operator as op
def addwise(*args):
    return list(map(compose(reduce(op.add),lambda *x:list(x)),*args))
Run Code Online (Sandbox Code Playgroud)

然后

addwise([1,2],[3,4],[6,7])
Run Code Online (Sandbox Code Playgroud)

会给

[10, 13]
Run Code Online (Sandbox Code Playgroud)

python functional-programming toolz

8
推荐指数
1
解决办法
137
查看次数

UDF导致警告:CachedKafkaConsumer未在UninterruptibleThread中运行(KAFKA-1894)

在通常的structured_kafka_wordcount.py代码中,

当我将线条分成udf如下所示的单词时,

my_split = udf(lambda x: x.split(' '), ArrayType(StringType()))

words = lines.select(
    explode(
        my_split(lines.value)
    )
)
Run Code Online (Sandbox Code Playgroud)

警告将继续显示:

WARN CachedKafkaConsumer:CachedKafkaConsumer未在UninterruptibleThread中运行.当CachedKafkaConsumer的方法由于KAFKA-1894而中断时,它可能会挂起

另一方面,当我将线条分成单词时pyspark.sql.functions.split,一切都运行良好.

words = lines.select(
    explode(
        split(lines.value, ' ') 
    ) 
)
Run Code Online (Sandbox Code Playgroud)

为什么会这样,以及如何修复警告?

这是我试图在实践中执行的代码:

pattern = "(.+) message repeated (\\d) times: \\[ (.+)\\]"
prog = re.compile(pattern)


def _unfold(x):
    ret = []
    result = prog.match(x)
    if result:
        log = " ".join((result.group(1), result.group(3)))
        times = result.group(2)
        for _ in range(int(times)):
            ret.append(log)
    else:
        ret.append(x)

    return ret

_udf = udf(lambda x: …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-spark spark-streaming apache-spark-sql pyspark

8
推荐指数
1
解决办法
1144
查看次数

Spark自定义聚合:collect_list + UDF vs UDAF

我经常需要在spark 2.1中对数据帧执行自定义聚合,并使用以下两种方法:

  • 使用groupby/collect_list获取单行中的所有值,然后应用UDF来聚合值
  • 编写自定义UDAF(用户定义的聚合函数)

我通常更喜欢第一个选项,因为它比UDAF实现更容易实现和更易读.但我认为第一个选项通常较慢,因为在网络周围发送了更多数据(没有部分聚合),但我的经验表明UDAF通常很慢.这是为什么?

具体示例:计算直方图:

数据在蜂巢表中(1E6随机双值)

val df = spark.table("testtable")

def roundToMultiple(d:Double,multiple:Double) = Math.round(d/multiple)*multiple
Run Code Online (Sandbox Code Playgroud)

UDF方法:

val udf_histo = udf((xs:Seq[Double]) => xs.groupBy(x => roundToMultiple(x,0.25)).mapValues(_.size))

df.groupBy().agg(collect_list($"x").as("xs")).select(udf_histo($"xs")).show(false)

+--------------------------------------------------------------------------------+
|UDF(xs)                                                                         |
+--------------------------------------------------------------------------------+
|Map(0.0 -> 125122, 1.0 -> 124772, 0.75 -> 250819, 0.5 -> 248696, 0.25 -> 250591)|
+--------------------------------------------------------------------------------+
Run Code Online (Sandbox Code Playgroud)

UDAF-方法

import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._

import scala.collection.mutable

class HistoUDAF(binWidth:Double) extends UserDefinedAggregateFunction {

  override def inputSchema: StructType =
    StructType(
      StructField("value", DoubleType) :: Nil
    )

  override def bufferSchema: StructType =
    new StructType() …
Run Code Online (Sandbox Code Playgroud)

aggregate-functions user-defined-functions dataframe apache-spark

8
推荐指数
1
解决办法
1120
查看次数

找不到密钥:_PYSPARK_DRIVER_CALLBACK_HOST

我正在尝试运行此代码:

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
        .master("local") \
        .appName("Word Count") \
        .getOrCreate()

df = spark.createDataFrame([
    (1, 144.5, 5.9, 33, 'M'),
    (2, 167.2, 5.4, 45, 'M'),
    (3, 124.1, 5.2, 23, 'F'),
    (4, 144.5, 5.9, 33, 'M'),
    (5, 133.2, 5.7, 54, 'F'),
    (3, 124.1, 5.2, 23, 'F'),
    (5, 129.2, 5.3, 42, 'M'),
   ], ['id', 'weight', 'height', 'age', 'gender'])

df.show()
print('Count of Rows: {0}'.format(df.count()))
print('Count of distinct Rows: {0}'.format((df.distinct().count())))

spark.stop()
Run Code Online (Sandbox Code Playgroud)

并得到一个错误

18/06/22 11:58:39 ERROR SparkUncaughtExceptionHandler: Uncaught exception in …
Run Code Online (Sandbox Code Playgroud)

python apache-spark pyspark

8
推荐指数
2
解决办法
9671
查看次数

理解toolz用例

我正在学习一些函数式编程并查看toolz.compose,pipe,thread_first和thread_last之间的差异对我来说似乎非常微妙或根本不存在.这些功能的预期不同用例是什么?

python functional-programming toolz

7
推荐指数
1
解决办法
837
查看次数

PySpark窗口函数:orderBetween/rowsBetween之间的orderBy中的多个条件

是否有可能创建一个可以在多个条件了窗口函数排序依据rangeBetweenrowsBetween.假设我有一个如下所示的数据框.

user_id     timestamp               date        event
0040b5f0    2018-01-22 13:04:32     2018-01-22  1       
0040b5f0    2018-01-22 13:04:35     2018-01-22  0   
0040b5f0    2018-01-25 18:55:08     2018-01-25  1       
0040b5f0    2018-01-25 18:56:17     2018-01-25  1       
0040b5f0    2018-01-25 20:51:43     2018-01-25  1       
0040b5f0    2018-01-31 07:48:43     2018-01-31  1       
0040b5f0    2018-01-31 07:48:48     2018-01-31  0       
0040b5f0    2018-02-02 09:40:58     2018-02-02  1       
0040b5f0    2018-02-02 09:41:01     2018-02-02  0       
0040b5f0    2018-02-05 14:03:27     2018-02-05  1       
Run Code Online (Sandbox Code Playgroud)

每行,我需要事件列值的总和,其日期不超过3天.但我不能在同一天晚些时候发生事件.我可以创建一个窗口函数,如:

days = lambda i: i * 86400
my_window = Window\
                .partitionBy(["user_id"])\
                .orderBy(F.col("date").cast("timestamp").cast("long"))\
                .rangeBetween(-days(3), 0)
Run Code Online (Sandbox Code Playgroud)

但这将包括同一天晚些时候发生的事件.我需要创建一个窗口函数,其行为类似于(对于带*的行):

user_id …
Run Code Online (Sandbox Code Playgroud)

python window-functions apache-spark pyspark

7
推荐指数
1
解决办法
1159
查看次数

PySpark.sql.filter没有按预期执行

我在执行以下代码时遇到了问题:

from pyspark.sql import functions as F
from pyspark.sql import Row, HiveContext

hc = HiveContext()
rows1 = [Row(id1 = '2', id2 = '1', id3 = 'a'),
         Row(id1 = '3', id2 = '2', id3 = 'a'),
         Row(id1 = '4', id2 = '3', id3 = 'b')]
df1 = hc.createDataFrame(rows1)
df2 = df1.filter(F.col("id3")=="a")
df3 = df1.join(df2, df1.id2 == df2.id1, "inner")
Run Code Online (Sandbox Code Playgroud)

当我运行上面的代码时,df3是一个空的DataFrame.但是:如果我将代码更改为下面,它将给出正确的结果(2行的DataFrame):

from pyspark.sql import functions as F
from pyspark.sql import Row, HiveContext

hc = HiveContext()
rows1 = [Row(id1 = '2', id2 = '1', …
Run Code Online (Sandbox Code Playgroud)

python-2.7 apache-spark apache-spark-sql pyspark spark-dataframe

7
推荐指数
1
解决办法
178
查看次数