小编ank*_*hah的帖子

Kafka突然重置消费者Offset

我正在使用 Kafka 0.8 和 zookeeper 3.3.5。实际上,我们有十几个主题正在消费,没有任何问题。

最近,我们开始喂食和消费一个行为怪异的新话题。消耗的偏移量突然重置。它尊重我们设置的 auto.offset.reset 策略(实际上是最小的),但我不明白为什么这个话题突然重置了它的偏移量。

我正在使用高级消费者。

这是我发现的一些错误日志: 我们有一堆这样的错误日志:

[2015-03-26 05:21:17,789] INFO 从代理 id:1,host:172.16.23.1,port:9092 获取元数据,1 个主题的相关 id 为 47 Set(MyTopic) (kafka.cl)
ient.ClientUtils$)
[2015-03-26 05:21:17,789] ERROR Producer 连接到 172.16.23.1:9092 不成功(kafka.producer.SyncProducer)
java.nio.channels.ClosedByInterruptException
        在 java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
        在 sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:681)
        在 kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
        在 kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
        在 kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
        在 kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
        在 kafka.producer.SyncProducer.send(SyncProducer.scala:112)
        在 kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
        在 kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:88)
        在 kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
        在 kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)

每次发生此问题时,我都会看到 WARN 日志:

[2015-03-26 05:21:30,596] 警告由于套接字错误而重新连接:空(kafka.consumer.SimpleConsumer)

然后真正的问题发生了:

[2015-03-26 05:21:47,551] INFO 连接到 172.16.23.5:9092 进行生产(kafka.producer.SyncProducer)
[2015-03-26 05:21:47,552] INFO 从 172.16.23.5:9092 断开连接 (kafka.producer.SyncProducer)
[2015-03-26 05:21:47,553] 信息 [ConsumerFetcherManager-1427047649942] 为分区 …

java producer-consumer apache-kafka kafka-consumer-api apache-zookeeper

5
推荐指数
1
解决办法
1万
查看次数

什么时候在ExecutorService中收集可运行的对象垃圾?

我有一个可运行的对象A,它在实例化时与服务器交换心跳信号.我将这样的对象提交给具有固定线程池大小n的执行器服务.当run方法遇到异常时,它将返回.对于给定的情况,我的所有线程都遇到异常并返回,但创建的对象仍然存活并继续交换心跳信号.如何将这些对象标记为垃圾收集,以便它们能够阻止心跳信号的交换?

class A implements Runnable {
    public void run(){
          try{
           \\throws error
          } catch(Exception e){
            \\returns
          }
       }

    public static void main(){
          ExecutorService executor = Executors.newFixedThreadPool(n)
          for(i = 1 to n){
               A a = new A()
               executor.submit(a)
          }
       }
}
Run Code Online (Sandbox Code Playgroud)

我应该在主要结束时进行awaitTermination调用并返回吗?

编辑:
以其他方式提出问题,在所有线程返回后终止执行器服务的一种方法是在for循环之后调用shutdown()并使用Integer.MAX长秒调用awaitTermination,这大约是70年(这是时间约束)我不愿意强加).还有其他选择吗?

java multithreading executorservice

5
推荐指数
1
解决办法
730
查看次数

Zookeeper通信协议

我需要使用tcpdump调试我的kafka使用者和zookeeper之间交换的数据.我查看了zookeeper文档,但找不到有关zookeeper通信协议的任何内容,即删除标题后使用wireshark获取以下数据转储.我如何解释数据部分?

Frame 1: 78 bytes on wire (624 bits), 78 bytes captured (624 bits)
Ethernet II, Src: 22:00:0a:xx:xx:xx (22:00:xx:xx:xx:xx), Dst: fe:ff:xx:xx:xx:xx (fe:ff:ff:xx:xx:xx)
Internet Protocol Version 4, Src: 10.234.xxx.xxx, Dst: 10.231.xxx.xxx
Transmission Control Protocol, Src Port: 51720 (51720), Dst Port: 2181 (2181), Seq: 1, Ack: 1, Len: 12
Data (12 bytes)
    Data: 00000008fffffffe0000000b
    [Length: 12]
Run Code Online (Sandbox Code Playgroud)

kafka-consumer-api apache-zookeeper

4
推荐指数
1
解决办法
1371
查看次数

Indeed.com API求职印度

我想使用really.com API创建作业供稿.我在Indeed.com上创建了一个发布商帐户,并接收API令牌,密钥和发布商ID.出于测试目的,我打的网址 http://api.indeed.com/ads/apisearch?publisher=9191150085731087&v=2&limit=100&l=Bangalore%2C+Karnataka&q=java(改变发布者ID).但它返回零结果,同样的查询在really.co.in网站上返回许多结果.我错过了任何地区配置吗?

api

2
推荐指数
1
解决办法
760
查看次数