在使用Spark Streaming处理顺序有限事件会话流时,选择无状态滑动窗口操作(例如reduceByKeyAndWindow)与选择保持状态(例如通过updateStateByKey或新mapStateByKey)会有什么考虑因素?
例如,请考虑以下情形:
可穿戴设备跟踪佩戴者进行的身体锻炼.设备会自动检测锻炼开始的时间,并发出消息; 在锻炼期间发出额外的信息(例如心率); 最后,在练习完成后发出消息.
期望的结果是每个运动会话的聚合记录流.即,应该将同一会话的所有事件聚合在一起(例如,以便每个会话可以保存在单个DB行中).请注意,每个会话的长度都是有限的,但来自多个设备的整个流是连续的.为方便起见,我们假设设备为每个锻炼课程生成一个GUID.
我可以看到使用Spark Streaming处理这个用例的两种方法:
使用不重叠的窗口,并保持状态.每个GUID保存一个状态,所有事件都与之匹配.当新事件到达时,状态被更新(例如,使用mapWithState),并且如果事件是"运动结束时",则将发出基于状态的聚合记录,并且移除密钥.
使用重叠的滑动窗口,并仅保留第一个会话.假设长度为2且间隔为1的滑动窗口(参见下图).还假设窗口长度为2 X(最大可能的运动时间).在每个窗口上,事件由GUID进行攻击,例如使用reduceByKeyAndWindow.然后,转储从窗口后半部分开始的所有会话,并释放剩余的会话.这使得每个事件只能使用一次,并确保属于同一会话的所有事件将聚合在一起.
方法#2的图表:
Run Code Online (Sandbox Code Playgroud)Only sessions starting in the areas marked with \\\ will be emitted. ----------- |window 1 | |\\\\| | ----------- ---------- |window 2 | |\\\\| | ----------- ---------- |window 3 | |\\\\| | -----------
我看到的利弊:
方法#1的计算成本较低,但需要保存和管理状态(例如,如果并发会话数增加,则状态可能比内存大).但是,如果最大并发会话数有限,则可能不是问题.
方法#2的成本是两倍(每个事件处理两次),并且具有更高的延迟(2倍最大运动时间),但更简单且易于管理,因为没有保留任何状态.
处理这个用例的最佳方法是 - 这些方法中的任何一种都是"正确的",还是有更好的方法?
应该考虑哪些其他优点/缺点?
在下图中,就如何写入 S3 而言,我们使用三种不同的配置运行相同的粘合作业:
代码差异是这样的:
if dynamic:
df_final_dyn = DynamicFrame.fromDF(df_final, glueContext, "df_final")
glueContext.write_dynamic_frame.from_options(
frame=df_final_dyn, connection_type="s3", format="glueparquet", transformation_ctx="DataSink0",
connection_options={"path": "s3://...",
"partitionKeys": ["year", "month", "day"]})
else:
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
df_final.write.mode("overwrite").format("parquet").partitionBy("year", "month", "day")\
.save("s3://.../")
Run Code Online (Sandbox Code Playgroud)
为什么效率如此低下?
我可以使用"contains"过滤 - 如下所示 - RDD中的元组.但是如何使用"不包含"过滤RDD呢?
val rdd2 = rdd1.filter(x => x._1 contains ".")
Run Code Online (Sandbox Code Playgroud)
我找不到这个的语法.假设有可能并且我没有使用DataFrames.我无法看到如何使用正则表达式和/或过滤器示例.
在这些情况下,大文件会发生什么?
1)Spark从NameNode获取数据的位置.Spark会在同一时间停止,因为根据NameNode的信息,数据大小太长了吗?
2)Spark根据datanode块大小进行数据分区,但是所有数据都不能存储到主存储器中.这里我们没有使用StorageLevel.那么这里会发生什么?
3)Spark对数据进行分区,一旦主存储器的数据再次处理,一些数据将存储在主存储器上,spark将从光盘加载其他数据.
在许多帖子中都有这样的声明 - 如下以某种形式显示 - 由于一些关于改组、分区、由于 JOIN、AGGR 等等的问题:
... 一般而言,每当您执行 spark sql 聚合或连接对数据进行混洗时,这是结果分区的数量= 200。这是由 spark.sql.shuffle.partitions 设置的。...
所以,我的问题是:
我问,因为我从来没有看到一个明确的观点。
我做了以下测试:
// genned ad DS of some 20M short rows
df0.count
val ds1 = df0.repartition(765)
ds1.count
val ds2 = df0.repartition(765)
ds2.count
sqlContext.setConf("spark.sql.shuffle.partitions", "765")
// The above not included on 1st run, the above included on 2nd run.
ds1.rdd.partitions.size
ds2.rdd.partitions.size
val joined = ds1.join(ds2, …Run Code Online (Sandbox Code Playgroud) 我对 Azure 还不太熟悉,我想知道什么时候建议使用 ADF、Synapse 或 DataBricks。他们的最佳实践和性能用例是什么?
你能帮我解决这个理论问题吗?
干杯!
我们有一个大型文档存储,目前在3TB空间运行,每六个月增加1 TB.它们目前存储在Windows文件系统中,这有时会在访问和检索方面造成问题.我们正在寻找利用基于Haddop的文档存储数据库.继续使用Haddop是一个好主意吗?任何人都有同样的曝光?实现同样的挑战和技术障碍可能是什么?
我很难理解Arvo,Kryo和Parquet究竟在Spark的背景下做了什么.它们都与序列化有关,但我看到它们一起使用,所以它们不能做同样的事情.
Parquet将其自身描述为柱状存储格式,我有点理解但是当我保存镶木地板文件时,Arvo或Kryo可以与它有什么关系吗?或者它们仅在火花工作期间相关,即.在洗牌或溢出到磁盘期间通过网络发送对象?Arvo和Kryo如何区别以及当您一起使用时会发生什么?
我想通过 sparksql 删除一个配置单元表。
在安装了 hadoop 2.6、hive 2.0、spark 1.6 和 spark 2.0 的集群中。我在两个版本的 pyspark shell 和 spark-submit 作业中尝试了以下代码。
sqlContext.sql('drop table test') //spark 1.6
spark.sql('drop table test') //spark 2.0
Run Code Online (Sandbox Code Playgroud)
两个代码在 pyspark-shell 中都可以正常工作,我可以从 hive cli 中看到测试表不再存在。
但是,如果代码在 python 文件中,然后使用 spark-submit 提交到集群,则代码永远不会生效。
spark 2.0 甚至给出了错误
pyspark.sql.utils.AnalysisException: u"Table to drop '`try`' does not exist;"
Run Code Online (Sandbox Code Playgroud)
我已将hive-site.xml复制到 spark 中的 conf 目录中。
通过 sparksql 删除配置单元表的正确方法是什么?
更新:
我尝试比较了 spark-shell 和我使用以下代码提交的作业之间的 spark 环境
spark-submit --master yarn --deploy-mode cluster try_spark_sql.py
Run Code Online (Sandbox Code Playgroud)
在spark-shell环境下,可以看到spark.sql.catalogImplementation 设置为hive
在使用上述代码提交的作业中。环境不包含spark.sql.catalogImplementation 我尝试使用以下代码设置它: …
问题如标题,我正在学习sparkSQL,但我无法很好地理解它们之间的区别。谢谢。
apache-spark ×8
amazon-s3 ×1
aws-glue ×1
azure ×1
databricks ×1
hadoop ×1
kryo ×1
parquet ×1
partition ×1
pyspark ×1
pyspark-sql ×1
rdd ×1
scala ×1