小编Nis*_*yal的帖子

具有自定义对象数据类型的Kafka流聚合

我有一个从GenericRecord类型的主题中提取json字符串的处理器。现在我将流分成2个分支。我进入第一个分支,然后将(键,值)映射到2个字符串中,该字符串包含json的特定字段和该字段的值,并且按键分组。到目前为止,一切都很好。现在,我必须使用新的Type用户定义来聚合流,并且会收到异常。

这里的代码:

新类型:

private class Tuple {

    public int occ;
    public int sum;


    public Tuple (int occ, int sum) {
        this.occ = occ;
        this.sum = sum;
    }

    public void sum (int toAdd) {
        this.sum += toAdd;
        this.occ ++;
    }

    public int getAverage () {
        return this.sum / this.occ;
    }

    public String toString() {
        return occ + "-> " + sum + ": " + getAverage();
    }
Run Code Online (Sandbox Code Playgroud)

好流:

  StreamsBuilder builder = new StreamsBuilder();
    KStream<GenericRecord, GenericRecord> source =
          builder.stream(topic);

    KStream<GenericRecord, GenericRecord>[] branches …
Run Code Online (Sandbox Code Playgroud)

aggregation apache-kafka apache-kafka-streams

3
推荐指数
1
解决办法
2860
查看次数

在插入之前查找 HIVE 表中是否已存在记录

我有一个 HIVE 分区表,在向其中插入记录之前,我需要查找记录是否已存在。

例子。

Insert into table employee partition (day, location) select distinct name, number,
date,aud_date, day, location from tableB.
Run Code Online (Sandbox Code Playgroud)

如果我尝试从 tableB 插入的记录已存在于员工表中,则应绕过它或将其写入另一个表中。我需要检查员工表中是否已存在的列是姓名、号码、日期、日期、位置。我不想检查 aud_date 因为它会有所不同。

hive

2
推荐指数
1
解决办法
5801
查看次数

在配置单元中更改列类型

我昨天刚开始学习蜂巢,我不得不改变蜂巢中的一个列的类型.我想问一下列类型的更改是否对它们有某种限制因为我只能做特定类型的更改,比如我可以将int转换为double,string转换为double,double转换为string但是我不能将字符串更改为int,double到int.

ALTER TABLE student CHANGE rollno rollno int;
FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. Unable to alter table. The following columns have types incompatible with the existing columns in their respective positions :
rollno

hive> DESCRIBE FORMATTED student
> ;
OK
# col_name              data_type               comment             

rollno                  int                                         
name                    string                                      
phno                    string                                      
city                    string   

    ALTER TABLE student CHANGE rollno rollno double;
OK
Time taken: 0.144 seconds

ALTER TABLE student CHANGE rollno rollno int;

FAILED: Execution Error, return code 1 …
Run Code Online (Sandbox Code Playgroud)

hadoop hive

2
推荐指数
2
解决办法
1万
查看次数

我们可以在 hive 中的“show partitions tableName”上使用限制原因吗

我有一个有很多分区的配置单元表,我想在执行show partitions table namecommand时只得到 100 个分区。

hive

2
推荐指数
1
解决办法
6039
查看次数

如何选择$(this)和.class在一行中做某事?

我知道这是一个愚蠢的问题,但是我如何在同一行中将Class()切换到两个选择器。

$('.search-ico').click(function(){
        $(this).toggleClass('is-active');
        $('.class').toggleClass('is-active');
});
Run Code Online (Sandbox Code Playgroud)

我在下面尝试过:

$('.search-ico').click(function(){
    $(this,'.class').toggleClass('is-active');
}); 
Run Code Online (Sandbox Code Playgroud)

但它不起作用(仅this上课)。

谢谢

javascript jquery

2
推荐指数
1
解决办法
48
查看次数

如何从 PHP 中的 SOAP 调用获取 JSON 响应

由于 SOAP 客户端默认返回 XML 响应,因此我需要获取 JSON 响应而不是 XML。

$client = new SoapClient(null, array('location' => "http://localhost/soap.php",
                                     'uri'      => "http://test-uri/"));
Run Code Online (Sandbox Code Playgroud)

在这种情况下,需要在SOAPClientSOAPHeader 中设置什么属性才能返回 JSON 响应?

php soap-client

1
推荐指数
1
解决办法
2万
查看次数

当流应用程序有多个实例时,有状态操作如何在Kafka流中工作?

状态完整操作如何在具有多个实例的Kafka Stream应用程序中工作?让我们说我们有2个主题,每个A和B有2个分区.我们有一个流应用程序,它既消耗了两个主题,又有两个流之间的连接.

现在我们正在运行此流应用程序的2个实例.据我所知,每个实例将分配每个主题的2个分区之一.

现在,如果要连接的消息被应用程序的不同实例使用,联接将如何发生?我无法理解它.

虽然我测试了一个似乎工作正常的小流应用程序.我是否可以在不考虑流应用程序中定义的拓扑类型的情况下,始终增加任何类型应用程序的实例数量?

是否有任何文件可以让我了解其工作细节?

apache-kafka apache-kafka-streams

0
推荐指数
1
解决办法
99
查看次数