任何人都可以告诉我更多关于PostgreSQL中物理复制和逻辑复制之间的区别吗?
在Vim中,我们可以使用Ctrl + N或Ctrl + P完成代码完成.如何在IdeaVim for IntelliJ中配置它?
我正在使用kafka 0.10.2,现在遇到了CommitFailedException.喜欢:
由于该组已经重新平衡并将分区分配给另一个成员,因此无法完成提交.这意味着后续调用poll()的时间长于配置的max.poll.interval.ms,这通常意味着轮询循环花费了太多时间进行消息处理.您可以通过增加会话超时或通过max.poll.records减少poll()中返回的批量的最大大小来解决此问题.
我已将max.poll.interval.ms设置为Integer.MAX_VALUE.所以任何人都可以告诉我为什么即使我设定了价值仍然会发生这种情况?
另一个问题是:我做的描述是将session.timeout.ms设置为60000并且它仍然会发生.我尝试通过简单的代码重现
public static void main(String[] args) throws InterruptedException {
Logger logger = Logger.getLogger(KafkaConsumer10.class);
logger.info("XX");
Properties props = new Properties();
props.put("bootstrap.servers", "kafka-broker:9098");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("max.poll.interval.ms", "300000");
props.put("session.timeout.ms", "10000");
props.put("max.poll.records", "2");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("t1"));
while (true) {
Thread.sleep(11000);
ConsumerRecords<String, String> records = consumer.poll(100);
//Thread.sleep(11000);
Thread.sleep(11000);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
Run Code Online (Sandbox Code Playgroud)
当我将session.timeout.ms设置为10000时,我尝试在我的轮询循环中睡眠超过10000毫秒,但它似乎工作,没有异常.所以我对此感到困惑.如果heartbeat是由consumer.poll和consumer.commit触发的,那么我的代码中的心跳超出会话超时.为什么不抛出CommitFailedException?
CREATE OR REPLACE FUNCTION mover(src text, dst text, cpquery text, conname text, ifbin boolean) returns void as
$$
DECLARE
cnt integer;
dlcnt integer;
del_count integer;
ret text;
BEGIN
SELECT pg_catalog.dblink_copy_open(conname, dst, ifbin) INTO ret ;
RAISE LOG 'dblink_open %',ret;
execute 'SELECT 1 as check FROM ' || src ||' limit 1' into cnt;
IF cnt=0 THEN
PERFORM pg_sleep(2);
END IF;
IF ifbin=true THEN
RAISE DEBUG 'Start to Copy data with binary';
execute 'COPY (' || cpquery || ' ) to …Run Code Online (Sandbox Code Playgroud) Thread 10296: (state = IN_NATIVE)
- sun.nio.ch.Net.connect0(boolean, java.io.FileDescriptor, java.net.InetAddress, int) @bci=0 (Interpreted frame)
- sun.nio.ch.Net.connect(java.net.ProtocolFamily, java.io.FileDescriptor, java.net.InetAddress, int) @bci=25, line=465 (Interpreted frame)
- sun.nio.ch.Net.connect(java.io.FileDescriptor, java.net.InetAddress, int) @bci=6, line=457 (Interpreted frame)
- sun.nio.ch.SocketChannelImpl.connect(java.net.SocketAddress) @bci=225, line=670 (Interpreted frame)
- kafka.network.BlockingChannel.connect() @bci=135 (Interpreted frame)
- kafka.producer.SyncProducer.connect() @bci=21 (Interpreted frame)
- kafka.producer.SyncProducer.getOrMakeConnection() @bci=11 (Interpreted frame)
- kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(kafka.api.RequestOrResponse, boolean) @bci=13 (Interpreted frame)
- kafka.producer.SyncProducer.send(kafka.api.TopicMetadataRequest) @bci=6 (Interpreted frame)
- kafka.client.ClientUtils$.fetchTopicMetadata(scala.collection.Set, scala.collection.Seq, kafka.producer.ProducerConfig, int, kafka.auth.KafkaUser) @bci=189 (Interpreted frame)
- kafka.producer.BrokerPartitionInfo.updateInfo(scala.collection.immutable.Set, int) @bci=24 (Interpreted frame)
- kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp() @bci=54 …Run Code Online (Sandbox Code Playgroud) java ×2
postgresql ×2
apache-kafka ×1
database ×1
debug-print ×1
ideavim ×1
jstack ×1
plpgsql ×1
vim ×1