我正在尝试使用Spark集群(在AWS EMR上运行)来链接其中包含公共元素的项目组.基本上,我有一些包含一些元素的组,如果某些元素在多个组中,我想创建一个包含所有这些组的元素的组.
我知道GraphX库,我试图使用graphframes包(ConnectedComponents算法)来解决这个任务,但它认为graphframes包已经不够成熟并且非常浪费资源......在我的数据集上运行它(cca 60GB)无论我调整Spark参数多少,我如何对数据进行分区和重新分区,或者我创建了多大的集群(图表都很大),它只会耗尽内存.
所以我写了自己的代码来完成任务.代码有效,它解决了我的问题,但每次迭代都会慢下来.由于它有时需要大约10次迭代才能完成,它可能会运行很长时间,我无法弄清楚问题是什么.
我从一个item_links有两列的表(DataFrame)开始:item和group_name.项目在每个组中都是唯一的,但不在此表中.一个项目可以在多个组中.如果两个项目各自具有相同组名称的行,则它们都属于同一组.
我首先逐项分组,找到每个项目中所有组中最小的所有组名称.我将此信息作为额外列附加到原始DataFrame.然后,我通过组名称并在每个组中查找此新列的最小值来创建新的DataFrame.我将此DataFrame与原始表一起加入组名称,并使用该新列中的最小值替换组名称列.这个想法是,如果一个组包含一个也属于某个较小组的项目,那么该组将被合并.在每次迭代中,它链接由中间越来越多的项间接链接的组.
我正在运行的代码如下所示:
print(" Merging groups that have common items...")
n_partitions = 32
merge_level = 0
min_new_group = "min_new_group_{}".format(merge_level)
# For every item identify the (alphabetically) first group in which this item was found
# and add a new column min_new_group with that information for every item.
first_group = item_links \
.groupBy('item') \
.agg( min('group_name').alias(min_new_group) ) \
.withColumnRenamed('item', 'item_id') \
.coalesce(n_partitions) \ …Run Code Online (Sandbox Code Playgroud) 我正在构建一个无服务器的Web跟踪系统,该系统使用AWS API Gateway为其跟踪像素提供服务,每当跟踪请求到达时,它就会调用Lambda函数,以将跟踪事件写入Kinesis流。
Lambda函数本身并没有做任何花哨的事情。它只是接受传入的事件(它自己的参数)并将其写入流中。本质上,它只是:
import boto3
kinesis_client = boto3.client("kinesis")
kinesis_stream = "my_stream_name"
def return_tracking_pixel(event, context):
...
new_record = ...(event)
kinesis_client.put_record(
StreamName=kinesis_stream,
Data=new_record,
PartitionKey=...
)
return ...
Run Code Online (Sandbox Code Playgroud)
有时,我会在Lambda执行期间遇到一个怪异的高峰,这导致我的某些Lambda函数调用超时,并且跟踪请求丢失了。
这是在受影响的时间段内Lambda函数的1分钟调用计数的图表:
在20:50和23:10之间,我突然看到许多调用错误(错误计数为1分钟):
这显然是由Lambda执行超时(每1分钟间隔中的最长持续时间)引起的:
我的Kinesis流(数据输入,放置记录数,put_record成功计数等,都看起来很正常)和我的API GW(调用数均与API GW调用数相对应)都没有任何异常在API GW的限制内)。
是什么原因导致Lambda函数执行持续时间突然(看似随机发生)?
编辑:既不限制lambda函数,这是我的第一个想法。
amazon-web-services amazon-kinesis aws-lambda aws-api-gateway