小编use*_*021的帖子

在Kafka中读取字段'topic_metadata'时出错

我正在尝试使用auto.create.topics.enable = true在我的server.properties文件中连接到我的代理.但是当我尝试使用Java客户端生产者连接到代理时,我得到以下内容error.

1197 [kafka-producer-network-thread | producer-1] ERROR org.apache.kafka.clients.producer.internals.Sender - kafka生产者I/O线程中未捕获的错误:org.apache.kafka.common.protocol.types.SchemaException:读取字段'topic_metadata'时出错:在org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java)的org.apache.kafka.common.protocol.types.Schema.read(Schema.java:73)中读取大小为619631的数组,只有37个字节可用时出错:380)org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:449)位于org.apache.kafka.clients的org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269). producer.internals.Sender.run(Sender.java:229)atg.apache.kafka.clients.producer.internals.Sender.run(Sender.java:134)at java.lang.Thread.run(Unknown Source)

以下是我的客户端生产者代码.

public static void main(String[] argv){
         Properties props = new Properties();
         props.put("bootstrap.servers", "http://XX.XX.XX.XX:9092");
         props.put("acks", "all");
         props.put("retries", 0);
         props.put("batch.size", 16384);
         props.put("linger.ms", 0);
         props.put("buffer.memory", 33554432);
         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         props.put("block.on.buffer.full",true);
         Producer<String, String> producer = new KafkaProducer<String, String>(props);
        try{ for(int i = 0; i < 10; i++)
        { producer.send(new ProducerRecord<String, String>("topicjava", Integer.toString(i), Integer.toString(i)));
             System.out.println("Tried sending:"+i);}
        }
        catch (Exception e){
            e.printStackTrace();
        }
         producer.close();
}
Run Code Online (Sandbox Code Playgroud)

有人可以帮我解决这个问题吗?

apache-kafka kafka-producer-api

8
推荐指数
1
解决办法
1万
查看次数

如何在不同的aws实例上的docker中安装Hazelcast节点相互交互?

我有三个aws机器,我已经设置了三个docker容器,其上安装了hazelcast-3.5.4(ubuntu).配置aws配置为我通常使用Hazelcast(没有docker).节点没有发现彼此.如何让他们互动或互相发现?

Hazelcast.xml文件如下所示:

    <join>
        <multicast enabled="false">
            <multicast-group>224.2.2.3</multicast-group>
            <multicast-port>54327</multicast-port>
        </multicast>
        <tcp-ip enabled="false">
            <interface>127.0.0.1</interface>
            <member-list>
                <member>127.0.0.1</member>
            </member-list>
        </tcp-ip>
        <aws enabled="true">
            <access-key>Some_key</access-key>
            <secret-key>Secret_key</secret-key>
            <!--optional, default is us-east-1 -->
            <region>us-east-1</region>
            <!--optional, default is ec2.amazonaws.com. If set, region shouldn't be set as it will override this property -->
            <host-header>ec2.amazonaws.com</host-header>
            <!-- optional, only instances belonging to this group will be discovered, default will try all running instances -->
            <!--security-group-name>hazelcast-sg</security-group-name-->
            <tag-key>type</tag-key>
            <tag-value>hz-nodes</tag-value>
        </aws>
    </join>

 <public-address>private-ip-of-aws-node</public-address>
<properties>
  <property name="hazelcast.local.localAddress">private-ip-of-aws-node</property>
</properties>
Run Code Online (Sandbox Code Playgroud)

此外,我在一个似乎不适合我的类似帖子中添加了两个条目.

cloud amazon-web-services hazelcast docker

4
推荐指数
1
解决办法
1955
查看次数

在淡褐色中设置TTL /记录到期时间

有没有办法在hazelcast DB中为每条记录设置TTL?最好是在Map或Ringbuffer中.

hazelcast hazelcast-imap

3
推荐指数
1
解决办法
2650
查看次数

读取字段'topics'时出错:Kafka中的java.nio.BufferUnderflowException

9.0客户端使用来自远程系统上运行的两个代理的消息.我的生产者工作正常,能够向代理发送消息但我的消费者无法使用这些消息.消费者和生产者正在我的本地系统上运行这两个经纪人在aws上.每当我尝试运行消费者.代理日志中出现以下错误.

ERROR Closing socket for /122.172.17.81 because of error (kafka.network.Processor)
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'topics': java.nio.BufferUnderflowException
        at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:66)
        at org.apache.kafka.common.requests.JoinGroupRequest.parse(JoinGroupRequest.java:85)
        at kafka.api.JoinGroupRequestAndHeader$.readFrom(JoinGroupRequestAndHeader.scala:29)
        at kafka.api.RequestKeys$$anonfun$12.apply(RequestKeys.scala:50)
        at kafka.api.RequestKeys$$anonfun$12.apply(RequestKeys.scala:50)
        at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:50)
        at kafka.network.Processor.read(SocketServer.scala:450)
        at kafka.network.Processor.run(SocketServer.scala:340)
        at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)

我的消费者代码如下>

package Kafka1.K1;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

public class HelloKafkaConsumer 
{
    public static void main(String args[]) throws InterruptedException, ExecutionException {
        String a[] = new String[]{"loader1"};
        //topik.add("loader1"); …
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-consumer-api

3
推荐指数
1
解决办法
5988
查看次数