社区!
请帮助我了解如何使用 Spark 获得更好的压缩率?
让我描述一下案例:
我有数据集,让我们把它的产品在其上的实木复合地板文件使用的编解码器使用Sqoop ImportTool进口HDFS瞬间。作为导入的结果,我有 100 个文件,总大小为46 GB,文件大小不同(最小 11MB,最大 1.5GB,平均 ~ 500MB)。记录总数超过80 亿条,有84 列
我也在使用snappy对 Spark 进行简单的读取/重新分区/写入,结果我得到:
~ 100 GB输出大小,具有相同的文件数、相同的编解码器、相同的数量和相同的列。
代码片段:
val productDF = spark.read.parquet("/ingest/product/20180202/22-43/")
productDF
.repartition(100)
.write.mode(org.apache.spark.sql.SaveMode.Overwrite)
.option("compression", "snappy")
.parquet("/processed/product/20180215/04-37/read_repartition_write/general")
Run Code Online (Sandbox Code Playgroud)
摄取:
creator: parquet-mr version 1.5.0-cdh5.11.1 (build ${buildNumber})
extra: parquet.avro.schema = {"type":"record","name":"AutoGeneratedSchema","doc":"Sqoop import of QueryResult","fields"
and almost all columns looks like
AVAILABLE: OPTIONAL INT64 R:0 D:1
row group 1: RC:3640100 TS:36454739 OFFSET:4
AVAILABLE: INT64 SNAPPY …Run Code Online (Sandbox Code Playgroud) snappy apache-spark parquet apache-spark-sql spark-dataframe
我将在Spark的上下文中提出这个问题,因为这就是我所面临的问题,但这可能是一个普通的Java问题.
在我们的火花工作中,我们Resolver需要在所有工人中使用它(它在udf中使用).问题是它不可序列化,我们无法改变它.解决方案是将其作为可序列化的另一个类的成员.
所以我们最终得到:
public class Analyzer implements Serializable {
transient Resolver resolver;
public Analyzer() {
System.out.println("Initializing a Resolver...");
resolver = new Resolver();
}
public int resolve(String key) {
return resolver.find(key);
}
}
Run Code Online (Sandbox Code Playgroud)
然后我们broadcast使用Spark API这个类:
val analyzer = sparkContext.broadcast(new Analyzer())
Run Code Online (Sandbox Code Playgroud)
(有关Spark广播的更多信息,请点击此处)
然后,我们继续analyzer在UDF中使用,作为我们的火花代码的一部分,具体如下:
val resolve = udf((key: String) => analyzer.value.resolve(key))
val result = myDataFrame.select("key", resolve("key")).count()
Run Code Online (Sandbox Code Playgroud)
这一切都按预期工作,但让我们感到疑惑.
Resolver没有实现Serializable,因此标记为transient- 意味着它不会与它的所有者对象一起序列化Analyzer.
但是正如您从上面的代码中可以清楚地看到的那样,该resolve()方法使用resolver,因此它不能为null.事实上并非如此.代码有效.
因此,如果字段未通过序列化传递,那么resolver …
我想在pandas dataframe列的子集上使用sklearn.preprocessing.StandardScaler.在管道之外,这是微不足道的:
df[['A', 'B']] = scaler.fit_transform(df[['A', 'B']])
Run Code Online (Sandbox Code Playgroud)
但现在假设我在类型字符串的df中有列'C',以及下面的管道定义
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
pipeline = Pipeline([
('standard', StandardScaler())
])
df_scaled = pipeline.fit_transform(df)
Run Code Online (Sandbox Code Playgroud)
如何告诉StandardScaler只扩展A列和B列?
我已经习惯了SparkML管道,其中要缩放的特征可以传递给缩放器组件的构造函数:
normalizer = Normalizer(inputCol="features", outputCol="features_norm", p=1.0)
Run Code Online (Sandbox Code Playgroud)
注意:要素列包含稀疏向量,其中包含Spark的VectorAssembler创建的所有数字要素列
我们知道
list(map(f,[1,2],[3,4],[6,7]))
Run Code Online (Sandbox Code Playgroud)
相当于
[f(1,3,6),f(2,4,7)]
Run Code Online (Sandbox Code Playgroud)
我想知道是否有tolist相当于的内置函数[],所以
tolist(a,b,c,d)
Run Code Online (Sandbox Code Playgroud)
相当于[a,b,c,d].
我认为这样的函数在函数式编程中很有用.因为许多函数采用list参数而不是sequence.
当然,一个简单的自定义方式是lambda *x:list(x),但我总觉得它在语法上很麻烦,特别是在函数式编程风格中使用它
map(lambda *x:list(x),[1,2],[3,4],[6,7])
Run Code Online (Sandbox Code Playgroud)
所以我的问题是,如果没有这样的内置tolist,我们是否可以使用FP包更加优雅地构建它toolz?
PS:我实际想要实现的是添加的线程版本(我知道numpy,但我现在不想使用它)
from toolz.curried import *
import operator as op
def addwise(*args):
return list(map(compose(reduce(op.add),lambda *x:list(x)),*args))
Run Code Online (Sandbox Code Playgroud)
然后
addwise([1,2],[3,4],[6,7])
Run Code Online (Sandbox Code Playgroud)
会给
[10, 13]
Run Code Online (Sandbox Code Playgroud) 在通常的structured_kafka_wordcount.py代码中,
当我将线条分成udf如下所示的单词时,
my_split = udf(lambda x: x.split(' '), ArrayType(StringType()))
words = lines.select(
explode(
my_split(lines.value)
)
)
Run Code Online (Sandbox Code Playgroud)
警告将继续显示:
WARN CachedKafkaConsumer:CachedKafkaConsumer未在UninterruptibleThread中运行.当CachedKafkaConsumer的方法由于KAFKA-1894而中断时,它可能会挂起
另一方面,当我将线条分成单词时pyspark.sql.functions.split,一切都运行良好.
words = lines.select(
explode(
split(lines.value, ' ')
)
)
Run Code Online (Sandbox Code Playgroud)
为什么会这样,以及如何修复警告?
这是我试图在实践中执行的代码:
pattern = "(.+) message repeated (\\d) times: \\[ (.+)\\]"
prog = re.compile(pattern)
def _unfold(x):
ret = []
result = prog.match(x)
if result:
log = " ".join((result.group(1), result.group(3)))
times = result.group(2)
for _ in range(int(times)):
ret.append(log)
else:
ret.append(x)
return ret
_udf = udf(lambda x: …Run Code Online (Sandbox Code Playgroud) apache-kafka apache-spark spark-streaming apache-spark-sql pyspark
我经常需要在spark 2.1中对数据帧执行自定义聚合,并使用以下两种方法:
我通常更喜欢第一个选项,因为它比UDAF实现更容易实现和更易读.但我认为第一个选项通常较慢,因为在网络周围发送了更多数据(没有部分聚合),但我的经验表明UDAF通常很慢.这是为什么?
具体示例:计算直方图:
数据在蜂巢表中(1E6随机双值)
val df = spark.table("testtable")
def roundToMultiple(d:Double,multiple:Double) = Math.round(d/multiple)*multiple
Run Code Online (Sandbox Code Playgroud)
UDF方法:
val udf_histo = udf((xs:Seq[Double]) => xs.groupBy(x => roundToMultiple(x,0.25)).mapValues(_.size))
df.groupBy().agg(collect_list($"x").as("xs")).select(udf_histo($"xs")).show(false)
+--------------------------------------------------------------------------------+
|UDF(xs) |
+--------------------------------------------------------------------------------+
|Map(0.0 -> 125122, 1.0 -> 124772, 0.75 -> 250819, 0.5 -> 248696, 0.25 -> 250591)|
+--------------------------------------------------------------------------------+
Run Code Online (Sandbox Code Playgroud)
UDAF-方法
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
import scala.collection.mutable
class HistoUDAF(binWidth:Double) extends UserDefinedAggregateFunction {
override def inputSchema: StructType =
StructType(
StructField("value", DoubleType) :: Nil
)
override def bufferSchema: StructType =
new StructType() …Run Code Online (Sandbox Code Playgroud) aggregate-functions user-defined-functions dataframe apache-spark
我正在尝试运行此代码:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.master("local") \
.appName("Word Count") \
.getOrCreate()
df = spark.createDataFrame([
(1, 144.5, 5.9, 33, 'M'),
(2, 167.2, 5.4, 45, 'M'),
(3, 124.1, 5.2, 23, 'F'),
(4, 144.5, 5.9, 33, 'M'),
(5, 133.2, 5.7, 54, 'F'),
(3, 124.1, 5.2, 23, 'F'),
(5, 129.2, 5.3, 42, 'M'),
], ['id', 'weight', 'height', 'age', 'gender'])
df.show()
print('Count of Rows: {0}'.format(df.count()))
print('Count of distinct Rows: {0}'.format((df.distinct().count())))
spark.stop()
Run Code Online (Sandbox Code Playgroud)
并得到一个错误
18/06/22 11:58:39 ERROR SparkUncaughtExceptionHandler: Uncaught exception in …Run Code Online (Sandbox Code Playgroud) 我正在学习一些函数式编程并查看toolz.compose,pipe,thread_first和thread_last之间的差异对我来说似乎非常微妙或根本不存在.这些功能的预期不同用例是什么?
是否有可能创建一个可以在多个条件了窗口函数排序依据为rangeBetween或rowsBetween.假设我有一个如下所示的数据框.
user_id timestamp date event
0040b5f0 2018-01-22 13:04:32 2018-01-22 1
0040b5f0 2018-01-22 13:04:35 2018-01-22 0
0040b5f0 2018-01-25 18:55:08 2018-01-25 1
0040b5f0 2018-01-25 18:56:17 2018-01-25 1
0040b5f0 2018-01-25 20:51:43 2018-01-25 1
0040b5f0 2018-01-31 07:48:43 2018-01-31 1
0040b5f0 2018-01-31 07:48:48 2018-01-31 0
0040b5f0 2018-02-02 09:40:58 2018-02-02 1
0040b5f0 2018-02-02 09:41:01 2018-02-02 0
0040b5f0 2018-02-05 14:03:27 2018-02-05 1
Run Code Online (Sandbox Code Playgroud)
每行,我需要事件列值的总和,其日期不超过3天.但我不能在同一天晚些时候发生事件.我可以创建一个窗口函数,如:
days = lambda i: i * 86400
my_window = Window\
.partitionBy(["user_id"])\
.orderBy(F.col("date").cast("timestamp").cast("long"))\
.rangeBetween(-days(3), 0)
Run Code Online (Sandbox Code Playgroud)
但这将包括同一天晚些时候发生的事件.我需要创建一个窗口函数,其行为类似于(对于带*的行):
user_id …Run Code Online (Sandbox Code Playgroud) 我在执行以下代码时遇到了问题:
from pyspark.sql import functions as F
from pyspark.sql import Row, HiveContext
hc = HiveContext()
rows1 = [Row(id1 = '2', id2 = '1', id3 = 'a'),
Row(id1 = '3', id2 = '2', id3 = 'a'),
Row(id1 = '4', id2 = '3', id3 = 'b')]
df1 = hc.createDataFrame(rows1)
df2 = df1.filter(F.col("id3")=="a")
df3 = df1.join(df2, df1.id2 == df2.id1, "inner")
Run Code Online (Sandbox Code Playgroud)
当我运行上面的代码时,df3是一个空的DataFrame.但是:如果我将代码更改为下面,它将给出正确的结果(2行的DataFrame):
from pyspark.sql import functions as F
from pyspark.sql import Row, HiveContext
hc = HiveContext()
rows1 = [Row(id1 = '2', id2 = '1', …Run Code Online (Sandbox Code Playgroud) python-2.7 apache-spark apache-spark-sql pyspark spark-dataframe
apache-spark ×7
python ×5
pyspark ×4
toolz ×2
apache-kafka ×1
dataframe ×1
java ×1
kryo ×1
pandas ×1
parquet ×1
python-2.7 ×1
scala ×1
scikit-learn ×1
snappy ×1