有没有办法创建具有缓冲区限制的Kinesis使用者?喜欢这里:
#Flush when buffer exceeds 100000 Amazon Kinesis records, 64 MB size limit or when time since last buffer exceeds 1 hour
bufferByteSizeLimit = 67108864
bufferRecordCountLimit = 100000
bufferMillisecondsLimit = 3600000
Run Code Online (Sandbox Code Playgroud)
基本上,我想IRecordProcessor只在有大量数据时才开始.我无法使用上面的连接器代码,因为我需要最新版本的amazon-kinesis-client.
我最终实施了自己的解决方案。
ConcurrentHashMap来存储流数据
private val recsMap = new ConcurrentHashMap[String, List[RecordStore]]
private val currByteSize = new AtomicLong(0L)
private val currRecordCount = new AtomicLong(0L)
private val currSeconds = new AtomicLong(0L)
Run Code Online (Sandbox Code Playgroud) recsMap.foreach(write2File())
// clean up
recsMap.remove(writtenRecs())
Run Code Online (Sandbox Code Playgroud) // reset counters
currByteSize.getAndSet(value)
currRecordCount.getAndSet(value)
currSeconds.getAndSet(value)
Run Code Online (Sandbox Code Playgroud)| 归档时间: |
|
| 查看次数: |
65 次 |
| 最近记录: |