小编eto*_*tov的帖子

Spark Streaming:无状态的整体窗口与保持状态

在使用Spark Streaming处理顺序有限事件会话流时,选择无状态滑动窗口操作(例如reduceByKeyAndWindow)与选择保持状态(例如通过updateStateByKey或新mapStateByKey)会有什么考虑因素?

例如,请考虑以下情形:

可穿戴设备跟踪佩戴者进行的身体锻炼.设备会自动检测锻炼开始的时间,并发出消息; 在锻炼期间发出额外的信息(例如心率); 最后,在练习完成后发出消息.

期望的结果是每个运动会话的聚合记录流.即,应该将同一会话的所有事件聚合在一起(例如,以便每个会话可以保存在单个DB行中).请注意,每个会话的长度都是有限的,但来自多个设备的整个流是连续的.为方便起见,我们假设设备为每个锻炼课程生成一个GUID.

我可以看到使用Spark Streaming处理这个用例的两种方法:

  1. 使用不重叠的窗口,并保持状态.每个GUID保存一个状态,所有事件都与之匹配.当新事件到达时,状态被更新(例如,使用mapWithState),并且如果事件是"运动结束时",则将发出基于状态的聚合记录,并且移除密钥.

  2. 使用重叠的滑动窗口,并仅保留第一个会话.假设长度为2且间隔为1的滑动窗口(参见下图).还假设窗口长度为2 X(最大可能的运动时间).在每个窗口上,事件由GUID进行攻击,例如使用reduceByKeyAndWindow.然后,转储从窗口后半部分开始的所有会话,并释放剩余的会话.这使得每个事件只能使用一次,并确保属于同一会话的所有事件将聚合在一起.

方法#2的图表:

Only sessions starting in the areas marked with \\\ will be emitted. 
-----------
|window 1 |
|\\\\|    |
-----------
     ----------
     |window 2 |
     |\\\\|    |  
     -----------
          ----------
          |window 3 |
          |\\\\|    |
          -----------
Run Code Online (Sandbox Code Playgroud)

我看到的利弊:

方法#1的计算成本较低,但需要保存和管理状态(例如,如果并发会话数增加,则状态可能比内存大).但是,如果最大并发会话数有限,则可能不是问题.

方法#2的成本是两倍(每个事件处理两次),并且具有更高的延迟(2倍最大运动时间),但更简单且易于管理,因为没有保留任何状态.

处理这个用例的最佳方法是 - 这些方法中的任何一种都是"正确的",还是有更好的方法?

应该考虑哪些其他优点/缺点?

apache-spark spark-streaming

23
推荐指数
1
解决办法
2800
查看次数

Perl Thrift客户到Hive?

我想使用Perl连接到基于Hadoop的Hive数据存储区.Hive允许使用Thrift接口(http://wiki.apache.org/hadoop/Hive/HiveClient)进行连接,并且Perl有一个Thrift实现(例如http://metacpan.org/pod/Thrift :::XS ) .但是,我找到的唯一一个Thrift客户端是Cassandra客户端.

如果这样的客户存在,或者如何创建它的任何想法?也许甚至可以连接而不明确定义一个?

(PS - 还有一个到Hive的ODBC/JDBC接口,但是安装这些模块是一个令人头痛的问题,并且是最后的手段)

谢谢!

perl hive thrift

7
推荐指数
1
解决办法
5289
查看次数

员工轮班问题 - 将任务联系在一起

我有一个列表Employee和一个列表Mission。每个任务都有开始时间和持续时间。

在CP模型(谷歌CpSat,从或工具包),我定义shifts = Dictionary<(int,int),IntVar>,其中shifts[(missionId, employeeId)] == 1当且仅当这个任务是由这名员工来实现。

我需要将每个任务分配给一名员工,显然一名员工无法同时完成两项任务。我已经写了这两个硬约束,它们运行良好。


问题:

现在,有些任务是“链接”在一起的,应该由同一个员工来实现。它们存储如下:

linkedMissions = {{1,2}, {3,4,5}}

在这里,任务1和任务2必须由同一个员工共同实现,任务3、4和5也是如此。


为了编写这最后一个约束,我为每个员工收集了应该链接在一起的所有班次的列表,然后我让它们都相等。

foreach (var employee in listEmployeesIds)
foreach (var missionGroup in linkedMissionsIds)
{
    var linkedShifts = shifts
        .Where(o => o.Key.Item2 == employee
                    && missionGroup.Contains(o.Key.Item1))
        .Select(o => o.Value)
        .ToList();

    for (var i = 0; i < linkedShifts.Count - 1; i++) 
        model.Add(linkedShifts[i] == linkedShifts[i + 1]);
}
Run Code Online (Sandbox Code Playgroud)

然而,求解器告诉我该模型不可行,但我可以用纸和笔轻松找到工作计划。我有 35 名员工和 25 个任务,连接在一起的任务不重叠,所以应该没有任何问题。


编辑:

作为替代方法,正如@Laurent Perron 所建议的,我尝试对所有必须在一起的班次使用相同的布尔变量:

var constraintBools = new …
Run Code Online (Sandbox Code Playgroud)

c# sat or-tools cp-sat

7
推荐指数
1
解决办法
307
查看次数

在RedShift中将值拆分为多行

如何将字段(例如CSV字符串)拆分为多行的问题已经得到解答:将 值拆分为多行.

但是,这个问题涉及MSSQL,并且答案使用了没有RedShift等价物的各种功能.

为了完整起见,这是我想做的一个例子:

目前的数据:

| Key | Data     |
+-----+----------+
| 1   | 18,20,22 |
| 2   | 17,19    |
Run Code Online (Sandbox Code Playgroud)

所需数据:

| Key | Data     |
+-----+----------+
| 1   | 18       |
| 1   | 20       |
| 1   | 22       |
| 2   | 17       |
| 2   | 19       |
Run Code Online (Sandbox Code Playgroud)

现在,我可以建议在CSV字段中使用小的,有限数量的元素的情况:在所有可能的数组位置上使用split_part和union,如下所示:

SELECT Key, split_part(Data, ',', 1) 
FROM mytable
WHERE split_part(Data, ',', 1) != ""
    UNION
SELECT Key, split_part(Data, ',', 2) 
FROM mytable
WHERE split_part(Data, ',', 2) …
Run Code Online (Sandbox Code Playgroud)

sql split amazon-redshift

5
推荐指数
1
解决办法
5573
查看次数

使用Node.js将数据加载到Redshift中

使用node.js将数据插入Amazon Redshift有哪些方法?

这应该是非常简单的,但我无法找到有效加载的任何具体示例.

postgresql node.js amazon-redshift

5
推荐指数
1
解决办法
7770
查看次数

CP 求解器可以在特定点初始化吗?

我正在使用 CP-Sat 求解器来优化我正在制定的时间表。然而,现在这个问题需要很长时间才能解决。是否可以用旧结果为求解器提供种子,作为起点,以减少找到最佳结果所需的时间?

python or-tools cp-sat

5
推荐指数
1
解决办法
633
查看次数

播种 CP-SAT 求解器

在谷歌 OR-tools 库中,“原始”CP-Solver(此处讨论: https: //developers.google.com/optimization/cp/original_cp_solver)可以使用重新播种.ReSeed()。然而,较新的版本,CP-SAT 不能。

我的假设是 CP-SAT 将详尽地尝试您问题中的每个选项,从可行的选项中选取最大值或最小值(取决于您的优化目标)。因为它会尝试所有这些,所以不需要做种,因此该选项对您不可用。

这种理解正确吗?如果是的话,为什么原始求解器有种子?.ReSeed()如果我的说法不正确,那么新 CpSolver 中缺少 是否是一个疏忽?

python or-tools cp-sat

2
推荐指数
1
解决办法
2060
查看次数