小编the*_*tom的帖子

Spark Streaming:无状态的整体窗口与保持状态

在使用Spark Streaming处理顺序有限事件会话流时,选择无状态滑动窗口操作(例如reduceByKeyAndWindow)与选择保持状态(例如通过updateStateByKey或新mapStateByKey)会有什么考虑因素?

例如,请考虑以下情形:

可穿戴设备跟踪佩戴者进行的身体锻炼.设备会自动检测锻炼开始的时间,并发出消息; 在锻炼期间发出额外的信息(例如心率); 最后,在练习完成后发出消息.

期望的结果是每个运动会话的聚合记录流.即,应该将同一会话的所有事件聚合在一起(例如,以便每个会话可以保存在单个DB行中).请注意,每个会话的长度都是有限的,但来自多个设备的整个流是连续的.为方便起见,我们假设设备为每个锻炼课程生成一个GUID.

我可以看到使用Spark Streaming处理这个用例的两种方法:

  1. 使用不重叠的窗口,并保持状态.每个GUID保存一个状态,所有事件都与之匹配.当新事件到达时,状态被更新(例如,使用mapWithState),并且如果事件是"运动结束时",则将发出基于状态的聚合记录,并且移除密钥.

  2. 使用重叠的滑动窗口,并仅保留第一个会话.假设长度为2且间隔为1的滑动窗口(参见下图).还假设窗口长度为2 X(最大可能的运动时间).在每个窗口上,事件由GUID进行攻击,例如使用reduceByKeyAndWindow.然后,转储从窗口后半部分开始的所有会话,并释放剩余的会话.这使得每个事件只能使用一次,并确保属于同一会话的所有事件将聚合在一起.

方法#2的图表:

Only sessions starting in the areas marked with \\\ will be emitted. 
-----------
|window 1 |
|\\\\|    |
-----------
     ----------
     |window 2 |
     |\\\\|    |  
     -----------
          ----------
          |window 3 |
          |\\\\|    |
          -----------
Run Code Online (Sandbox Code Playgroud)

我看到的利弊:

方法#1的计算成本较低,但需要保存和管理状态(例如,如果并发会话数增加,则状态可能比内存大).但是,如果最大并发会话数有限,则可能不是问题.

方法#2的成本是两倍(每个事件处理两次),并且具有更高的延迟(2倍最大运动时间),但更简单且易于管理,因为没有保留任何状态.

处理这个用例的最佳方法是 - 这些方法中的任何一种都是"正确的",还是有更好的方法?

应该考虑哪些其他优点/缺点?

apache-spark spark-streaming

23
推荐指数
1
解决办法
2800
查看次数

Glue Dynamic Frame 比普通 Spark 慢得多

在下图中,就如何写入 S3 而言,我们使用三种不同的配置运行相同的粘合作业:

  1. 我们使用动态帧写入S3
  2. 我们使用纯spark框架写入S3
  3. 与 1 相同,但工作节点数量从 80 个减少到 60 个
  • 在所有条件相同的情况下,动态框架需要 75 分钟才能完成这项工作,普通 Spark 需要 10 分钟。输出为 100 GB 的数据。
  • 动态帧对worker节点数量超级敏感,稍微减少worker节点数量时,处理2小时后会因内存问题而失败。这是令人惊讶的,因为我们期望 Glue 作为一项 AWS 服务能够更好地处理 S3 写入操作。

代码差异是这样的:

if dynamic:
    df_final_dyn = DynamicFrame.fromDF(df_final, glueContext, "df_final")

    glueContext.write_dynamic_frame.from_options(
    frame=df_final_dyn, connection_type="s3", format="glueparquet", transformation_ctx="DataSink0",
    connection_options={"path": "s3://...", 
    "partitionKeys": ["year", "month", "day"]})
else:
    spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
    df_final.write.mode("overwrite").format("parquet").partitionBy("year", "month", "day")\
             .save("s3://.../")
Run Code Online (Sandbox Code Playgroud)

在此输入图像描述

为什么效率如此低下?

amazon-s3 amazon-web-services apache-spark aws-glue

14
推荐指数
1
解决办法
2121
查看次数

Scala Spark包含vs.不包含

我可以使用"contains"过滤 - 如下所示 - RDD中的元组.但是如何使用"不包含"过滤RDD呢?

val rdd2 = rdd1.filter(x => x._1 contains ".")
Run Code Online (Sandbox Code Playgroud)

我找不到这个的语法.假设有可能并且我没有使用DataFrames.我无法看到如何使用正则表达式和/或过滤器示例.

scala apache-spark

9
推荐指数
1
解决办法
2万
查看次数

当文件无法适应spark的主内存时,spark如何读取大文件(petabyte)

在这些情况下,大文件会发生什么?

1)Spark从NameNode获取数据的位置.Spark会在同一时间停止,因为根据NameNode的信息,数据大小太长了吗?

2)Spark根据datanode块大小进行数据分区,但是所有数据都不能存储到主存储器中.这里我们没有使用StorageLevel.那么这里会发生什么?

3)Spark对数据进行分区,一旦主存储器的数据再次处理,一些数据将存储在主存储器上,spark将从光盘加载其他数据.

partition apache-spark rdd

9
推荐指数
2
解决办法
9882
查看次数

spark.sql.shuffle.partitions 的 200 个默认分区难题

在许多帖子中都有这样的声明 - 如下以某种形式显示 - 由于一些关于改组、分区、由于 JOIN、AGGR 等等的问题:

... 一般而言,每当您执行 spark sql 聚合或连接对数据进行混洗时,这是结果分区的数量= 200。这是由 spark.sql.shuffle.partitions 设置的。...

所以,我的问题是:

  • 我们的意思是,如果我们将 DF 的分区设置为 765,例如,
    • 处理是针对 765 个分区进行的,但是输出被合并/重新分区标准为 200 - 这里指的是 word结果
    • 或者它是否在合并/重新分区到 200 个分区之后使用 200 个分区进行处理,然后再加入、AGGR?

我问,因为我从来没有看到一个明确的观点。

我做了以下测试:

// genned ad DS of some 20M short rows
df0.count
val ds1 = df0.repartition(765)
ds1.count
val ds2 = df0.repartition(765)
ds2.count

sqlContext.setConf("spark.sql.shuffle.partitions", "765")
// The above not included on 1st run, the above included on 2nd run.

ds1.rdd.partitions.size
ds2.rdd.partitions.size

val joined = ds1.join(ds2, …
Run Code Online (Sandbox Code Playgroud)

apache-spark

9
推荐指数
2
解决办法
1万
查看次数

数据工厂、Synapse Analytics 和 DataBricks 比较

我对 Azure 还不太熟悉,我想知道什么时候建议使用 ADF、Synapse 或 DataBricks。他们的最佳实践和性能用例是什么?

你能帮我解决这个理论问题吗?

干杯!

azure azure-data-factory databricks azure-synapse

9
推荐指数
1
解决办法
1万
查看次数

Hadoop作为文档存储数据库

我们有一个大型文档存储,目前在3TB空间运行,每六个月增加1 TB.它们目前存储在Windows文件系统中,这有时会在访问和检索方面造成问题.我们正在寻找利用基于Haddop的文档存储数据库.继续使用Haddop是一个好主意吗?任何人都有同样的曝光?实现同样的挑战和技术障碍可能是什么?

hadoop

8
推荐指数
1
解决办法
1万
查看次数

搭配Arvo,Kryo和Parquet的Spark

我很难理解Arvo,Kryo和Parquet究竟在Spark的背景下做了什么.它们都与序列化有关,但我看到它们一起使用,所以它们不能做同样的事情.

Parquet将其自身描述为柱状存储格式,我有点理解但是当我保存镶木地板文件时,Arvo或Kryo可以与它有什么关系吗?或者它们仅在火花工作期间相关,即.在洗牌或溢出到磁盘期间通过网络发送对象?Arvo和Kryo​​如何区别以及当您一起使用时会发生什么?

kryo apache-spark parquet

7
推荐指数
2
解决办法
4075
查看次数

sparksql 删除配置单元表

我想通过 sparksql 删除一个配置单元表。

在安装了 hadoop 2.6、hive 2.0、spark 1.6 和 spark 2.0 的集群中。我在两个版本的 pyspark shell 和 spark-submit 作业中尝试了以下代码。

sqlContext.sql('drop table test')  //spark 1.6
spark.sql('drop table test')       //spark 2.0
Run Code Online (Sandbox Code Playgroud)

两个代码在 pyspark-shell 中都可以正常工作,我可以从 hive cli 中看到测试表不再存在。

但是,如果代码在 python 文件中,然后使用 spark-submit 提交到集群,则代码永远不会生效。

spark 2.0 甚至给出了错误

pyspark.sql.utils.AnalysisException: u"Table to drop '`try`' does not exist;"
Run Code Online (Sandbox Code Playgroud)

我已将hive-site.xml复制到 spark 中的 conf 目录中。

通过 sparksql 删除配置单元表的正确方法是什么?

更新:

我尝试比较了 spark-shell 和我使用以下代码提交的作业之间的 spark 环境

spark-submit --master yarn --deploy-mode cluster try_spark_sql.py
Run Code Online (Sandbox Code Playgroud)

在spark-shell环境下,可以看到spark.sql.catalogImplementation 设置为hive

在使用上述代码提交的作业中。环境不包含spark.sql.catalogImplementation 我尝试使用以下代码设置它: …

apache-spark apache-spark-sql pyspark-sql

7
推荐指数
3
解决办法
3万
查看次数

Spark.table("TABLE A") 和 Spark.read.("TABLE A") 之间有什么区别

问题如标题,我正在学习sparkSQL,但我无法很好地理解它们之间的区别。谢谢。

apache-spark apache-spark-sql pyspark

7
推荐指数
1
解决办法
1万
查看次数