小编Jih*_* No的帖子

汇合平台vs阿帕奇卡夫卡

我是kafka的新手,我对Confluent平台感到好奇.

看来Confluent平台上的用户故事并不多.

汇合平台是否只为卡夫卡增加了更多的价值?

或者任何人都可以告诉我你最喜欢哪一个?

我必须选择其中一个.

apache-kafka confluent confluent-platform

26
推荐指数
1
解决办法
1万
查看次数

HIVE使用json格式插入覆盖目录

如何使用json架构插入覆盖目录?

有生蜂巢avro表; (这实际上有很多领域)

tb_test--------
name string
kickname string
-----------------
Run Code Online (Sandbox Code Playgroud)

然后我想通过jsonserde将查询结果保存到hdfs中的某个目录中.

我试过这个.

insert overwrite directory '/json/'
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
WITH SERDEPROPERTIES (
 "stat_name"="$._col0",
 "stat_interval"="$._col1"
)
STORED AS TEXTFILE 
select name, nickname
from tb_test limit 100
Run Code Online (Sandbox Code Playgroud)

但在/ json /中写入json有_colXX字段名而不是原始字段名.

{"_col0":"basic_qv"," _col1":"h"}
{"_col0":"basic_qv","_col1 ":"h"}
{"_col0":"basic_qv","_col1 ":"h"}
{"_col0":"basic_qv"," _col1":"h"}
{"_col0":"basic_qv","_col1 ":"h"}
Run Code Online (Sandbox Code Playgroud)

我期望

{"name":"basic_qv","nickname":"h"}
{"name":"basic_qv","nickname":"h"}
{"name":"basic_qv","nickname":"h"}
{"name":"basic_qv","nickname":"h"}
{"name":"basic_qv","nickname":"h"}
Run Code Online (Sandbox Code Playgroud)

有什么用呢?

谢谢!!

json hadoop hive hiveql

9
推荐指数
1
解决办法
1448
查看次数

如何在数据集上将集合分组为运算符/方法?

Spark Scala中是否没有功能级别的grouping_sets支持?

我不知道这个补丁适用于大师版 https://github.com/apache/spark/pull/5080

我想通过scala dataframe api进行这种查询。

GROUP BY expression list GROUPING SETS(expression list2)
Run Code Online (Sandbox Code Playgroud)

cuberollup 功能在Dataset API中可用,但找不到分组集。为什么?

dataframe apache-spark apache-spark-sql

5
推荐指数
1
解决办法
2948
查看次数

缩减列表<CompletableFuture<T>>

何时ints给出:

List<Integer> ints = IntStream.range(0, 1000).boxed().collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)

使用Java Stream API,我们可以减少它们

MyValue myvalue = ints
        .parallelStream()
        .map(x -> toMyValue(x))
        .reduce((t, t2) -> t.combine(t2))
        .get();
Run Code Online (Sandbox Code Playgroud)

在这个例子中,对我来说重要的是......

  • 项目将在多个线程中减少
  • 早期映射的项目将提前减少
  • 并非所有结果toMyValue()都会同时加载

现在我想通过API做同样的处理CompletableFuture

为了做地图,我做了:

List<CompeletableFuture<MyValue>> myValueFutures = ints
        .stream()
        .map(x -> CompletableFuture.supplyAsync(() -> toMyValue(x), MY_THREAD_POOL))
        .collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)

现在我不知道如何减少List<CompeletableFuture<MyValue>> myValueFutures单身MyValue

并行流提供了方便的 API,但由于这些问题我不想使用 Stream API:

  • 并行流在处理过程中很难停止阶段。
  • 当某些worker被IO阻塞时,并行流的活动worker计数可能会超过并行度。这有助于最大限度地提高 CPU 利用率,但可能会出现内存开销(甚至 OOM)。

有什么办法可以减少 CompetableFutures 吗?没有流减少API的一一?

java java-stream completable-future

5
推荐指数
1
解决办法
1917
查看次数