计算Hadoop中的成功记录与Hive查询之间的差异

Cip*_*ipi 8 hadoop hive

我有一个Hive表,其中包含客户呼叫的数据.为简单起见,请考虑它有2列,第一列包含客户ID,第二列包含调用的时间戳(unix时间戳).

我可以查询此表以查找每个客户的所有呼叫:

SELECT * FROM mytable SORT BY customer_id, call_time;
Run Code Online (Sandbox Code Playgroud)

结果是:

Customer1    timestamp11
Customer1    timestamp12
Customer1    timestamp13
Customer2    timestamp21
Customer3    timestamp31
Customer3    timestamp32
...
Run Code Online (Sandbox Code Playgroud)

是否有可能为每个客户创建一个Hive查询,从第二次调用开始,两次成功调用之间的时间间隔?对于以上示例,查询应返回:

Customer1    timestamp12-timestamp11
Customer1    timestamp13-timestamp12
Customer3    timestamp32-timestamp31
...
Run Code Online (Sandbox Code Playgroud)

我试图改进sql解决方案中的解决方案,但我坚持使用Hive限制:它仅接受FROM中的子查询,并且连接必须仅包含相等性.

谢谢.

EDIT1:

我试过使用Hive UDF函数:

public class DeltaComputerUDF extends UDF {
private String previousCustomerId;
private long previousCallTime;

public String evaluate(String customerId, LongWritable callTime) {
    long callTimeValue = callTime.get();
    String timeDifference = null;

    if (customerId.equals(previousCustomerId)) {
        timeDifference = new Long(callTimeValue - previousCallTime).toString();
    }

    previousCustomerId = customerId;
    previousCallTime = callTimeValue;

    return timeDifference;
}}
Run Code Online (Sandbox Code Playgroud)

并使用名称"delta".

但似乎(从日志和结果)它在MAP时间使用.由此产生2个问题:

第一:在使用此功能之前,必须按客户ID和时间戳对表数据进行排序.查询:

 SELECT customer_id, call_time, delta(customer_id, call_time) FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time;
Run Code Online (Sandbox Code Playgroud)

因为在使用我的功能很久之后,在REDUCE时间执行排序部分是行不通的.

我可以在使用函数之前对表数据进行排序,但我对此并不满意,因为这是我希望避免的开销.

第二:在分布式Hadoop配置的情况下,数据在可用的作业跟踪器之间分配.所以我相信这个函数有多个实例,每个映射器一个,所以可以在2个映射器之间分配相同的客户数据.在这种情况下,我将丢失客户电话,这是不可接受的.

我不知道如何解决这个问题.我知道DISTRIBUTE BY确保将具有特定值的所有数据发送到同一个reducer(从而确保SORT按预期工作),是否有人知道映射器是否有类似的东西?

接下来我打算遵循libjack的建议来使用reduce脚本.一些其他的hive查询之间需要这种"计算",所以我想尝试Hive提供的所有内容,然后再转移到另一个工具,如Balaswamy vaddeman所建议的那样.

EDIT2:

我开始研究自定义脚本解决方案.但是,在Programming Hive一书的第14章的第一页(本章介绍自定义脚本)中,我发现了以下段落:

流式处理通常不如编码可比较的UDF或InputFormat对象.序列化和反序列化数据以将其传入和传出管道是相对低效的.以统一的方式调试整个程序也更加困难.但是,它对于快速原型设计和利用非Java编写的现有代码非常有用.对于不想编写Java代码的Hive用户,它可能是一种非常有效的方法.

很明显,自定义脚本在效率方面不是最佳解决方案.

但是我应该如何保留我的UDF功能,但要确保它在分布式Hadoop配置中按预期工作?我在语言手册UDF wiki页面的UDF Internals部分找到了这个问题的答案.如果我写我的查询:

 SELECT customer_id, call_time, delta(customer_id, call_time) FROM (SELECT customer_id, call_time FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time) t;
Run Code Online (Sandbox Code Playgroud)

它在REDUCE时执行,DISTRIBUTE BY和SORT BY构造保证来自同一客户的所有记录按照调用的顺序由同一个reducer处理.

所以上面的UDF和这个查询构造解决了我的问题.

(抱歉没有添加链接,但我不允许这样做,因为我没有足够的声望点)

jba*_*ste 13

这是一个老问题,但是为了将来的参考,我在这里写了另一个命题:

Hive 窗口函数允许在查询中使用上一个/下一个值.

simili代码查询可以是:

SELECT CUSTOMER_ID,LAG(call_time,1,0)OVER(PARTITION BY CUSTOMER_ID ORDER BY call_time ROWS 1之前) - call_time FROM MYTABLE;