仅当有超过x条记录时才启动Kinesis消费者?

Luc*_*ess 7 amazon-kinesis

有没有办法创建具有缓冲区限制的Kinesis使用者?喜欢这里:

#Flush when buffer exceeds 100000 Amazon Kinesis records, 64 MB size limit or when time since last buffer exceeds 1 hour
bufferByteSizeLimit = 67108864 
bufferRecordCountLimit = 100000
bufferMillisecondsLimit = 3600000
Run Code Online (Sandbox Code Playgroud)

基本上,我想IRecordProcessor只在有大量数据时才开始.我无法使用上面的连接器代码,因为我需要最新版本的amazon-kinesis-client.

Luc*_*ess 0

我最终实施了自己的解决方案。

  1. 有一个ConcurrentHashMap来存储流数据
      private val recsMap = new ConcurrentHashMap[String, List[RecordStore]]
      private val currByteSize = new AtomicLong(0L)
      private val currRecordCount = new AtomicLong(0L)
      private val currSeconds = new AtomicLong(0L)
    
    Run Code Online (Sandbox Code Playgroud)
  2. 更新计数器(按大小/时间/记录数)
  3. 当计数器达到时刷新数据
      recsMap.foreach(write2File())
      // clean up
      recsMap.remove(writtenRecs())
    
    Run Code Online (Sandbox Code Playgroud)
  4. 检查点和重置计数器
      // reset counters
      currByteSize.getAndSet(value)
      currRecordCount.getAndSet(value)
      currSeconds.getAndSet(value)
    
    Run Code Online (Sandbox Code Playgroud)