目前我的应用程序在IO中定期被阻止,输出非常低.我使用一些命令来跟踪进程.
通过使用jstack我发现该应用程序挂在FileOutputStream.writeBytes.
通过使用strace -f -c -p pid收集系统调用信息,我发现了.对于正常情况,它有futex和write syscalls.但是当它变得不正常时,只有futex系统调用.该应用程序一直在调用futex,但都失败并抛出ETIMEDOUT,就像这样:
<futex resumed> =-1 ETIMEDOUT (Connecton timed out)
futex(Ox7f823, FUTEX_WAKE_PRIVATE,1)=0
futex(Ox7f824, FUTEX_WAIT_BITSET_PRIVATE|FUTEX_CLOCK_REALTIME) =-1<unfinished>
<futex resumed> =-1 ETIMEDOUT (Connecton timed out)
futex(Ox7f823, FUTEX_WAKE_PRIVATE,1)=0
futex(Ox7f824, FUTEX_WAIT_BITSET_PRIVATE|FUTEX_CLOCK_REALTIME) =-1<unfinished>
Run Code Online (Sandbox Code Playgroud)
此问题会定期发生,并持续数小时或数小时,并再次恢复正常.
顺便说一句,当在IO中被阻塞时,echo 3>/proc/sys/vm/drop_caches总是使它暂时正常.我用Google搜索并找到了一些类似的proleam,如下所示.
有关我的系统的一些信息.操作系统:Redhat 6.1,核心版本2.6.31
JDK:1.7.0_05
CPU:X5650,24个
内存:24GB和48GB
我想输出火花和火花流到卡夫卡一次.但正如文档所说的那样
"输出操作(如foreachRDD)至少有一次语义,也就是说,如果发生工作失败,转换后的数据可能会被多次写入外部实体."
要进行事务更新,spark建议使用批处理时间(在foreachRDD中可用)和RDD的分区索引来创建标识符.该标识符唯一地标识流应用程序中的blob数据.代码如下:
dstream.foreachRDD { (rdd, time) =>
rdd.foreachPartition { partitionIterator =>
val partitionId = TaskContext.get.partitionId()
val **uniqueId** = generateUniqueId(time.milliseconds, partitionId)
// use this uniqueId to transactionally commit the data in partitionIterator
}
}
Run Code Online (Sandbox Code Playgroud)
但是,如何使用UNIQUEID卡夫卡,使事务提交.
谢谢