关于KafkaConsumer(> = 0.9),我正面临一些严重的问题,试图为我的需求实施解决方案.
让我们假设我有一个函数必须只读取来自kafka主题的n条消息.
例如:getMsgs(5)- > 在主题中获取下一个5 kafka消息.
所以,我有一个看起来像这样的循环:
for (boolean exit= false;!exit;)
{
Records = consumer.poll(config.pollTime);
for (Record r:records) {
processRecord(r); //do my things
numMss++;
if (numMss==maximum) //maximum=5
exit=true;
}
}
Run Code Online (Sandbox Code Playgroud)
考虑到这一点,问题是poll()方法可以获得超过5条消息.例如,如果它获得10条消息,我的代码将永远忘记其他5条消息,因为Kafka会认为它们已经消耗掉了.
我尝试提交偏移但似乎不起作用:
consumer.commitSync(Collections.singletonMap(partition,
new OffsetAndMetadata(record.offset() + 1)));
Run Code Online (Sandbox Code Playgroud)
即使使用偏移配置,每当我再次启动消费者时,它都不会从第6条消息开始(记住,我只想要5条消息),但是从第11条开始(因为第一次轮询消耗了10条消息).
有没有解决方案呢,或者(最肯定的)我错过了什么?
提前致谢!!
我们在尝试为我们的应用程序实现SftpConnections池时遇到了一些麻烦.
我们目前正在使用SSHJ(Schmizz)作为传输库,面临一个我们在开发环境中无法模拟的问题(但错误在生产中随机显示,有时在三天之后,有时仅在10分钟之后).
问题是,当尝试通过SFTP发送文件时,线程被锁定在initschmizz' TransportImpl类的方法中:
@Override
public void init(String remoteHost, int remotePort, InputStream in, OutputStream out)
throws TransportException {
connInfo = new ConnInfo(remoteHost, remotePort, in, out);
try {
if (config.isWaitForServerIdentBeforeSendingClientIdent()) {
receiveServerIdent();
sendClientIdent();
} else {
sendClientIdent();
receiveServerIdent();
}
log.info("Server identity string: {}", serverID);
} catch (IOException e) {
throw new TransportException(e);
}
reader.start();
}
Run Code Online (Sandbox Code Playgroud)
isWaitForServerIdentBeforeSendingClientIdent对我们来说是假的,所以首先客户端(我们)发送我们的标识,如日志中所示:
" 客户身份字符串:blabla "
然后转向receiveServerIdent:
private void receiveServerIdent() throws IOException
{
final Buffer.PlainBuffer buf = new Buffer.PlainBuffer();
while ((serverID …Run Code Online (Sandbox Code Playgroud) 我是Kafka 0.9的新手并测试了一些功能,我在Java实现的Consumer(KafkaConsumer)中发现了一个奇怪的行为.
Kafka经纪人位于Ambari外部机器中.
即使你我可以实现一个Producer并开始向外部代理发送消息,我也不知道为什么当消费者试图读取事件(民意调查)时,它会被卡住.
我知道生产者工作得很好,因为我可以通过控制台消费者(在ambari本地工作)消费消息.但是当我执行Java Consumer时,什么都没发生,只是卡住了.调试代码我可以看到它在该poll()行被阻止:
ConsumerRecords<String, String> records = consumer.poll(100);
Run Code Online (Sandbox Code Playgroud)
顺便说一句,超时没有任何作用.如果你输入0,100或1000毫秒无关紧要,消费者在这一行被阻止并且不会超时也不会抛出异常.
我尝试了所有类型的替代属性,例如advertised.host.name,advertised.listener,...等等,运气不好.
任何帮助将受到高度赞赏.提前致谢!
我有 GIT 子树的问题。
让我们看看,我有一个项目 A,其中包括:
Project A
|_CodeFolder1
|_CodeFolder2
|_SharedFolder1
|_SharedFolder2
Run Code Online (Sandbox Code Playgroud)
假设我创建了一个只有SharedFolder1和SharedFolder2的新存储库。所以我从主存储库中删除了这两个文件夹,并将它们添加到新的共享存储库中,如下所示:
SharedProject
|_SharedFolder1
|_SharedFolder2
Run Code Online (Sandbox Code Playgroud)
我是子树的新手,但到目前为止我实现了从远程子树拉/推。但是我遇到了问题。添加子树时,必须添加前缀,但我不想将共享代码保存在不同的文件夹中,例如:
Project A
|_CodeFolder1
|_CodeFolder2
|_SharedFolders
|_SharedFolder1
|_SharedFolder2
Run Code Online (Sandbox Code Playgroud)
--prefix每当我尝试添加子树时,这就是什么:
git subtree add --prefix=SharedFolders --squash shared master
Run Code Online (Sandbox Code Playgroud)
有什么办法可以告诉 git 没有前缀,或者子树必须直接保存在 ProjectA 中,就像在第一个模式中一样,而无需为子树创建新文件夹?
我试过这个命令
git subtree add --prefix=/ --squash shared master
Run Code Online (Sandbox Code Playgroud)
但总是有错误。似乎子树不允许我将树中的文件夹直接带到我的工作路径而不为共享文件创建新文件夹。
任何帮助将非常感激。
我们有一个带有 4 个代理的 kafka 集群和一些具有副本因子 1 和 10 分区的主题。在某一时刻,我们的 4 个服务器中有 2 个带有 kafka 集群 - 失败。所以现在我们有 2 个具有相同主题的经纪人。当我m run command
./kafka_topics.sh --zookeeper localhost:2181 --describe
i得到这个时:
Topic:outcoming-notification-error-topic PartitionCount:10 ReplicationFactor:1 Configs:
Topic: outcoming-error-topic Partition: 0 Leader: 2 Replicas: 2 Isr: 2
Topic: outcoming-error-topic Partition: 1 Leader: 3 Replicas: 3 Isr: 3
Topic: outcoming-error-topic Partition: 2 Leader: 4 Replicas: 4 Isr: 4
Topic: outcoming-error-topic Partition: 3 Leader: 1 Replicas: 1 Isr: 1
Topic: outcoming-error-topic Partition: 4 Leader: 2 Replicas: 2 …Run Code Online (Sandbox Code Playgroud) 我目前面临的问题如下:
Exception in thread "main" javax.persistence.EntityExistsException: a different object with the same identifier value was already associated with the session: [de.entities.Genre#28]
at org.hibernate.ejb.AbstractEntityManagerImpl.convert(AbstractEntityManagerImpl.java:1359)
at org.hibernate.ejb.AbstractEntityManagerImpl.convert(AbstractEntityManagerImpl.java:1310)
at org.hibernate.ejb.AbstractEntityManagerImpl.convert(AbstractEntityManagerImpl.java:1316)
at org.hibernate.ejb.AbstractEntityManagerImpl.persist(AbstractEntityManagerImpl.java:881)
at de.model.DatabaseBuilder.importData(DatabaseBuilder.java:87)
at de.main.Main.main(Main.java:55)
Run Code Online (Sandbox Code Playgroud)
所以异常告诉我我想插入两个具有相同主键 id 的不同对象。
我想要使用 JPA 插入数据库的所有数据都来自 XML 文件。我用 SAXParser 解析这个文件。当然,id 为 28 的流派条目有很多,因为许多电影都有相同的流派。
如果我使用自动生成的 id,数据将不再正确,因为所有 id 均由 XML 文件正确给出。
我怎么解决这个问题?为什么 JPA 不只是忽略这个对象已经存在于数据库中的事实,而只是将电影的 ID 和流派插入到我的 m:n 表中?
我想根据配置将数据发送到不同的 Kafka 消息:
ResponseFactory processingPeply = null;
switch(endpointType)
{
case "email":
ProducerRecord<String, Object> record = new ProducerRecord<>("tp-email.request", tf);
RequestReplyFuture<String, Object, Object> replyFuture = processingTransactionEmailReplyKafkaTemplate.sendAndReceive(record);
SendResult<String, Object> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
ConsumerRecord<String, Object> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
processingPeply = (ResponseFactory) consumerRecord.value();
break;
case "sms":
ProducerRecord<String, Object> record = new ProducerRecord<>("tp-sms.request", tf);
RequestReplyFuture<String, Object, Object> replyFuture = processingTransactionSmsReplyKafkaTemplate.sendAndReceive(record);
SendResult<String, Object> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
ConsumerRecord<String, Object> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
processingPeply = (ResponseFactory) consumerRecord.value();
break;
case "network":
ProducerRecord<String, Object> record = new …Run Code Online (Sandbox Code Playgroud) 所以我试图从压缩在一起的 2 个输入句子返回另一个字符串。如果 2 个句子的长度相同,它将产生实际输出。如果两个输入句子的长度不相同,那么它只会返回一个空字符串。到目前为止,这是我的代码,但我不知道如何正确压缩单词,有人可以帮助我。顺便说一句,如果您能通过递归执行此操作来帮助我,那就太好了,因为我正在尝试这样做。
前任:
Zippppp("ABC", "123") will return "A1B2C3"
Zippppp("AD", "CCC") will return “”
Run Code Online (Sandbox Code Playgroud)
public class Zippppp
{
public Zippppp(String a, String s)
{
int l1 = a.length();
int l2 = s.length();
if(l1 == l2)
for(int i = 0; i > l1; i++)
System.out.print( a.substring(0, 1) + s.substring(0, 1));
}
public static void main(String args[ ])
{
Zippppp sv = new Zippppp("ABC", "123");
System.out.print(sv);
}
}
Run Code Online (Sandbox Code Playgroud) 我面临有关Java方法同步的问题。希望我可以简要解释一下:
我在两个不同的类中,在两个不同的包中有两个不同的方法。就像是:
Package1
|_Class1
\MethodA
Package2
|_Class2
\MethodB
Run Code Online (Sandbox Code Playgroud)
好的,所以现在我需要同步这两个不是thread的方法。到目前为止,我有两种不同的方法:
共享信号量。
在Package1和外部创建一个共享的静态信号量Package2,例如:
package Package3;
public class SemaphoreClass {
public static Semaphore;
}
Run Code Online (Sandbox Code Playgroud)
无论如何,我真的不知道JVM是否会将其视为共享信号量。
已同步(SharedClass.class)。
使用共享类来同步这两个方法,例如:
public void MethodA() {
synchronized (SharedClass.class) {
//...
}
Run Code Online (Sandbox Code Playgroud)
和
public void MethodB() {
synchronized (SharedClass.class) {
//...
}
Run Code Online (Sandbox Code Playgroud)
无论如何,这些只是方法。我想听听什么是实现我想要实现的最佳方法。具有JVM的共享信号量会使事情变得更容易,但是,我知道必须为此提供解决方案。提前致谢。
我目前正在优化一个特定的方法,不幸的是它被 JVM 内联,这会阻止它被正确地向量化。我注意到有一个禁止内联的注释,即jdk.internal.vm.annotation.DontInline. 但是,无法从默认模块访问它。
是否有一种干净的方法可以访问此注释或以其他方式防止内联有问题的方法?
java ×6
apache-kafka ×4
arrays ×1
collectors ×1
deadlock ×1
git ×1
git-subtree ×1
github ×1
inline ×1
jpa ×1
jvm-hotspot ×1
loops ×1
polling ×1
sftp ×1
sockets ×1
ssh ×1