小编Tom*_*Tom的帖子

如果我将自动提交设置为 true 且

我正在阅读https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#kafka-consumers-offset-committing-behaviour-configuration

它说:

启用检查点:如果启用检查点,Flink Kafka Consumer 将在检查点完成时提交存储在检查点状态中的偏移量。这确保了 Kafka 代理中提交的偏移量与检查点状态中的偏移量一致。用户可以通过调用消费者上的 setCommitOffsetsOnCheckpoints(boolean) 方法来选择禁用或启用偏移量提交(默认情况下,该行为为 true)。请注意,在这种情况下,属性中的自动定期偏移提交设置将被完全忽略。

如果我以 10 秒的间隔启用检查点,我也会设置为 true,并在 Kafka 消费者属性中have setCommitOffsetsOnCheckpoints设置 enable.auto.commit=true和 。auto.commit.interval.ms=5000

那么偏移量提交会发生什么行为呢?偏移量会每 10 秒提交 3 次吗?一次来自 flink 执行检查点时,两次来自 Kafka 消费者的自动提交?

apache-flink

1
推荐指数
1
解决办法
2067
查看次数

结果null.asInstanceOf [Int]为0

当我跑步时null.asInstanceOf[Int],我很惊讶地发现结果为0,我会问为什么它为0,我认为它应该为空

scala

0
推荐指数
1
解决办法
118
查看次数

Java 8中的Comparator.cominging为Map.Entry

给定以下代码:

@Test
public void test7() {
    Map<String, Integer> sortedData = new HashMap<>();
    sortedData.put("One", 1);
    sortedData.put("Two", 2);
    sortedData.put("Three", 3);

    Stream<Map.Entry<String, Integer>> stream = sortedData.entrySet().stream();
    List<String> sorted = stream
            .sorted(Comparator.comparing(Map.Entry::getValue))
            .map(Map.Entry::getKey)
            .collect(Collectors.toList());
} 
Run Code Online (Sandbox Code Playgroud)

它可以成功编译,但是当我更改时

.sorted(Comparator.comparing(Map.Entry::getValue))
Run Code Online (Sandbox Code Playgroud)

.sorted(Comparator.comparing(Map.Entry::getValue).reversed())
Run Code Online (Sandbox Code Playgroud)

编译器抱怨 Non static method can't be referenced in static context

我可以想象这是因为getValue不是的静态方法Map.Entry,但是我在这里无法解释问题。

java java-8 java-stream

0
推荐指数
1
解决办法
1722
查看次数

如何找到oracle表的所有分区信息

我有一个分区表t,它使用列表分区在列上分区,

我想查询出所有分区及其对应的列值。

它与PLSQL Developer中的相同:当我查看此表的sql时,它显示所有分区,例如:

  partition DATA_20180522 values (20180522)
    tablespace DMSCMDAT
    pctfree 10
    initrans 1
    maxtrans 255
    storage
    (
      initial 8M
      next 1M
      minextents 1
      maxextents unlimited
    )
Run Code Online (Sandbox Code Playgroud)

我在Google上搜索过,但是找不到答案。

oracle

0
推荐指数
1
解决办法
4846
查看次数

如何在flink应用程序中指定两个源、一个进程运算符和一个接收器运算符

我使用的是flink 1.3,我定义了两个流源,它们将发出相同的事件以供后续运算符处理(我定义的进程运算符和接收器运算符)

但看起来在 source-process-pink 管道中,我只能指定一个源,我会问如何指定两个或多个源并执行相同的进程和接收器

object FlinkApplication {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
    env.addSource(new MySource1()) //How to MySource2 here?
      .setParallelism(1)
      .name("source1")
      .process(new MyProcess())
      .setParallelism(4)
      .addSink(new MySink())
      .setParallelism(2)
    env.execute("FlinkApplication")
  }

}
Run Code Online (Sandbox Code Playgroud)

apache-flink

0
推荐指数
1
解决办法
1107
查看次数

System.nanoTime和System.currentTimeMillis()

我在Windows上使用JDK8,在Linux上使用JDK8

  1. 当我System.nanoTime()/System.currentTimeMillis()在Windows上运行时,结果是49,
System.nanoTime(): 74786833960332

System.currentTimeMillis():1507786236263
Run Code Online (Sandbox Code Playgroud)
  1. 在Linux上运行时,结果是 26236
System.nanoTime(): 39560110918205325

System.currentTimeMillis():1507786262105
Run Code Online (Sandbox Code Playgroud)

我对结果很困惑,这两个值差别很大.

另外,我认为nanoTime是1,000,000毫秒,所以上面的两个值对我来说都是错误的(也就是说,它们都应该是大约1000000)

java

-4
推荐指数
2
解决办法
1284
查看次数

标签 统计

apache-flink ×2

java ×2

java-8 ×1

java-stream ×1

oracle ×1

scala ×1