我正在尝试使用auto.create.topics.enable = true在我的server.properties文件中连接到我的代理.但是当我尝试使用Java客户端生产者连接到代理时,我得到以下内容error.
1197 [kafka-producer-network-thread | producer-1] ERROR org.apache.kafka.clients.producer.internals.Sender - kafka生产者I/O线程中未捕获的错误:org.apache.kafka.common.protocol.types.SchemaException:读取字段'topic_metadata'时出错:在org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java)的org.apache.kafka.common.protocol.types.Schema.read(Schema.java:73)中读取大小为619631的数组,只有37个字节可用时出错:380)org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:449)位于org.apache.kafka.clients的org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269). producer.internals.Sender.run(Sender.java:229)atg.apache.kafka.clients.producer.internals.Sender.run(Sender.java:134)at java.lang.Thread.run(Unknown Source)
以下是我的客户端生产者代码.
public static void main(String[] argv){
Properties props = new Properties();
props.put("bootstrap.servers", "http://XX.XX.XX.XX:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 0);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("block.on.buffer.full",true);
Producer<String, String> producer = new KafkaProducer<String, String>(props);
try{ for(int i = 0; i < 10; i++)
{ producer.send(new ProducerRecord<String, String>("topicjava", Integer.toString(i), Integer.toString(i)));
System.out.println("Tried sending:"+i);}
}
catch (Exception e){
e.printStackTrace();
}
producer.close();
}
Run Code Online (Sandbox Code Playgroud)
有人可以帮我解决这个问题吗?
我有三个aws机器,我已经设置了三个docker容器,其上安装了hazelcast-3.5.4(ubuntu).配置aws配置为我通常使用Hazelcast(没有docker).节点没有发现彼此.如何让他们互动或互相发现?
Hazelcast.xml文件如下所示:
<join>
<multicast enabled="false">
<multicast-group>224.2.2.3</multicast-group>
<multicast-port>54327</multicast-port>
</multicast>
<tcp-ip enabled="false">
<interface>127.0.0.1</interface>
<member-list>
<member>127.0.0.1</member>
</member-list>
</tcp-ip>
<aws enabled="true">
<access-key>Some_key</access-key>
<secret-key>Secret_key</secret-key>
<!--optional, default is us-east-1 -->
<region>us-east-1</region>
<!--optional, default is ec2.amazonaws.com. If set, region shouldn't be set as it will override this property -->
<host-header>ec2.amazonaws.com</host-header>
<!-- optional, only instances belonging to this group will be discovered, default will try all running instances -->
<!--security-group-name>hazelcast-sg</security-group-name-->
<tag-key>type</tag-key>
<tag-value>hz-nodes</tag-value>
</aws>
</join>
<public-address>private-ip-of-aws-node</public-address>
<properties>
<property name="hazelcast.local.localAddress">private-ip-of-aws-node</property>
</properties>
Run Code Online (Sandbox Code Playgroud)
此外,我在一个似乎不适合我的类似帖子中添加了两个条目.
有没有办法在hazelcast DB中为每条记录设置TTL?最好是在Map或Ringbuffer中.
9.0客户端使用来自远程系统上运行的两个代理的消息.我的生产者工作正常,能够向代理发送消息但我的消费者无法使用这些消息.消费者和生产者正在我的本地系统上运行这两个经纪人在aws上.每当我尝试运行消费者.代理日志中出现以下错误.
ERROR Closing socket for /122.172.17.81 because of error (kafka.network.Processor)
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'topics': java.nio.BufferUnderflowException
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:66)
at org.apache.kafka.common.requests.JoinGroupRequest.parse(JoinGroupRequest.java:85)
at kafka.api.JoinGroupRequestAndHeader$.readFrom(JoinGroupRequestAndHeader.scala:29)
at kafka.api.RequestKeys$$anonfun$12.apply(RequestKeys.scala:50)
at kafka.api.RequestKeys$$anonfun$12.apply(RequestKeys.scala:50)
at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:50)
at kafka.network.Processor.read(SocketServer.scala:450)
at kafka.network.Processor.run(SocketServer.scala:340)
at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)
我的消费者代码如下>
package Kafka1.K1;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
public class HelloKafkaConsumer
{
public static void main(String args[]) throws InterruptedException, ExecutionException {
String a[] = new String[]{"loader1"};
//topik.add("loader1"); …Run Code Online (Sandbox Code Playgroud)