小编Sim*_* Su的帖子

流复制和逻辑复制之间的区别

任何人都可以告诉我更多关于PostgreSQL中物理复制和逻辑复制之间的区别吗?

database postgresql database-replication

32
推荐指数
1
解决办法
1万
查看次数

如何在Vim中配置IdeaVIM Ctrl + N和Ctrl + P完成?

在Vim中,我们可以使用Ctrl + NCtrl + P完成代码完成.如何在IdeaVim for IntelliJ中配置它?

vim jetbrains-ide intellij-idea ideavim

31
推荐指数
5
解决办法
7870
查看次数

CommitFailedException由于组已经重新平衡并将分区分配给另一个成员,因此无法完成提交

我正在使用kafka 0.10.2,现在遇到了CommitFailedException.喜欢:

由于该组已经重新平衡并将分区分配给另一个成员,因此无法完成提交.这意味着后续调用poll()的时间长于配置的max.poll.interval.ms,这通常意味着轮询循环花费了太多时间进行消息处理.您可以通过增加会话超时或通过max.poll.records减少poll()中返回的批量的最大大小来解决此问题.

我已将max.poll.interval.ms设置为Integer.MAX_VALUE.所以任何人都可以告诉我为什么即使我设定了价值仍然会发生这种情况?

另一个问题是:我做的描述是将session.timeout.ms设置为60000并且它仍然会发生.我尝试通过简单的代码重现

 public static void main(String[] args) throws InterruptedException {     
        Logger logger = Logger.getLogger(KafkaConsumer10.class);
        logger.info("XX");
        Properties props = new Properties();
        props.put("bootstrap.servers", "kafka-broker:9098");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("max.poll.interval.ms", "300000");
        props.put("session.timeout.ms", "10000");
        props.put("max.poll.records", "2");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("t1"));
        while (true) {
            Thread.sleep(11000);
            ConsumerRecords<String, String> records = consumer.poll(100);
            //Thread.sleep(11000);
            Thread.sleep(11000);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
Run Code Online (Sandbox Code Playgroud)

当我将session.timeout.ms设置为10000时,我尝试在我的轮询循环中睡眠超过10000毫秒,但它似乎工作,没有异常.所以我对此感到困惑.如果heartbeat是由consumer.poll和consumer.commit触发的,那么我的代码中的心跳超出会话超时.为什么不抛出CommitFailedException?

java apache-kafka

20
推荐指数
2
解决办法
2万
查看次数

如何使用"RAISE INFO,RAISE LOG,RAISE DEBUG"来跟踪PostgreSQL函数的登录?

CREATE OR REPLACE FUNCTION mover(src text, dst text, cpquery text, conname text, ifbin boolean) returns void as
$$
        DECLARE
                cnt integer;
                dlcnt integer;
                del_count integer;
                ret text;

        BEGIN
                SELECT  pg_catalog.dblink_copy_open(conname, dst, ifbin) INTO ret ;
                RAISE LOG 'dblink_open %',ret;

                execute 'SELECT  1 as check FROM ' || src ||' limit 1' into cnt;
                IF cnt=0 THEN
                        PERFORM pg_sleep(2);
                END IF;

                IF ifbin=true THEN
                        RAISE DEBUG 'Start to Copy data with binary';
                        execute 'COPY (' || cpquery || '  ) to …
Run Code Online (Sandbox Code Playgroud)

postgresql plpgsql debug-print

8
推荐指数
1
解决办法
2万
查看次数

jstack文件中IN_NATIVE是什么意思?

    Thread 10296: (state = IN_NATIVE)
 - sun.nio.ch.Net.connect0(boolean, java.io.FileDescriptor, java.net.InetAddress, int) @bci=0 (Interpreted frame)
 - sun.nio.ch.Net.connect(java.net.ProtocolFamily, java.io.FileDescriptor, java.net.InetAddress, int) @bci=25, line=465 (Interpreted frame)
 - sun.nio.ch.Net.connect(java.io.FileDescriptor, java.net.InetAddress, int) @bci=6, line=457 (Interpreted frame)
 - sun.nio.ch.SocketChannelImpl.connect(java.net.SocketAddress) @bci=225, line=670 (Interpreted frame)
 - kafka.network.BlockingChannel.connect() @bci=135 (Interpreted frame)
 - kafka.producer.SyncProducer.connect() @bci=21 (Interpreted frame)
 - kafka.producer.SyncProducer.getOrMakeConnection() @bci=11 (Interpreted frame)
 - kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(kafka.api.RequestOrResponse, boolean) @bci=13 (Interpreted frame)
 - kafka.producer.SyncProducer.send(kafka.api.TopicMetadataRequest) @bci=6 (Interpreted frame)
 - kafka.client.ClientUtils$.fetchTopicMetadata(scala.collection.Set, scala.collection.Seq, kafka.producer.ProducerConfig, int, kafka.auth.KafkaUser) @bci=189 (Interpreted frame)
 - kafka.producer.BrokerPartitionInfo.updateInfo(scala.collection.immutable.Set, int) @bci=24 (Interpreted frame)
 - kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp() @bci=54 …
Run Code Online (Sandbox Code Playgroud)

java jstack

1
推荐指数
1
解决办法
4882
查看次数