我开始使用最新的Kafka文档http://kafka.apache.org/documentation.html.但是当我尝试使用新的Consumer API时遇到了一些问题.我已完成以下步骤:
1.添加新的依赖项
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.1</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)
2.添加配置
Map<String, Object> config = new HashMap<String, Object>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"host:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
config.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "range");
Run Code Online (Sandbox Code Playgroud)
3.使用KafkaConsumer API
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(config);
consumer.subscribe("topic");
Run Code Online (Sandbox Code Playgroud)
但是,当我尝试从代理轮询消息时,我只得到null:
Map<String, ConsumerRecords<String, String>> records = consumer.poll(0);
if (records != null)
process(records);
else
System.err.println("null");
Run Code Online (Sandbox Code Playgroud)
然后,在检查源代码后,我知道消费者有什么问题:
@Override
public Map<String, ConsumerRecords<K,V>> poll(long timeout) {
// TODO Auto-generated method stub
return null;
}
Run Code Online (Sandbox Code Playgroud)
更糟糕的是,我找不到关于0.8.2 API的任何其他有用信息,因为关于Kafka的所有用法都与最新版本不兼容.有人能帮助我吗?非常感谢.
源代码HashMap.values()如下所示
public Collection<V> values() {
Collection<V> vs = values;
return (vs != null ? vs : (values = new Values()));
}
Run Code Online (Sandbox Code Playgroud)
如您所见,当values()首次调用该方法时,它只返回一个Values对象.该Values对象是AbstractCollection没有构造函数的子类,当然不包含任何元素.但是当我调用该方法时,它会快速返回一个集合
Collection<String> values = map.values();
System.out.println(values);
Run Code Online (Sandbox Code Playgroud)
那太奇怪了.不仅values(),而且还keySet()和entrySet()方法的返回这样的空对象.那么,这是我的问题,这些方法何时以及如何返回具有我们需要的元素的对象?
我正在使用python ftplib进行隐式tls连接程序.我尝试了有问题的解决方案python-ftp-implicit-tls-connection-issue(包括Rg Glpj和Juan Moreno的答案)来建立连接.但是当我打电话retrline或retrbinary登录到这样的ftp服务器后(FTP_ITLS是子类FTP_TLS):
58 server = FTP_ITLS()
59 server.connect(host="x.x.x.x", port=990)
60 server.login(user="user", passwd="******")
61 server.prot_p()
62
63 server.cwd("doc")
64 print(server.retrlines('LIST'))
65 # server.retrbinary('RETR contents.7z', open('contents.7z', 'wb').write)
66 server.quit()
Run Code Online (Sandbox Code Playgroud)
我收到了一个EOF错误:
Traceback (most recent call last):
File "D:/Coding/test/itls.py", line 64, in <module>
print(server.retrlines('LIST'))
File "D:\Python\Python27\lib\ftplib.py", line 735, in retrlines
conn = self.transfercmd(cmd)
File "D:\Python\Python27\lib\ftplib.py", line 376, in transfercmd
return self.ntransfercmd(cmd, rest)[0]
File "D:\Python\Python27\lib\ftplib.py", line 713, in ntransfercmd
server_hostname=self.host)
File "D:\Python\Python27\lib\ssl.py", line …Run Code Online (Sandbox Code Playgroud) 考虑到我有Algorithm枚举
public enum Algorithm {
SHA1("sha1"),
HMAC("hmac"),;
Algorithm(final String algorithm) {
this.algorithm = algorithm;
}
private final String algorithm;
public String getAlgorithm() {
return algorithm;
}
}
Run Code Online (Sandbox Code Playgroud)
我有不同的算法
public class Sha1 {
public static String hash(final String text, final byte[] sb) {...}
}
Run Code Online (Sandbox Code Playgroud)
和
public class Hmac {
public static String hash(final String text, final byte[] sb) {...}
}
Run Code Online (Sandbox Code Playgroud)
当有人打电话时,我想返回他们的实例
Algorithm.SHA1.getInstance()
Run Code Online (Sandbox Code Playgroud)
题