pko*_*pac 5 apache-spark amazon-kinesis spark-streaming
tldr; 无法使用 Kinesis Spark Streaming 集成,因为它不接收任何数据。
spark-streaming-kinesis-asl-assembly_2.10-1.5.2.jar 下载并添加到类路径job.py或job.jar(只是阅读和打印)提交。KCL Worker 线程有时会说“正在睡觉……” - 它可能会被悄悄地破坏(我检查了我能找到的所有 stderr,但没有任何提示)。也许吞下了 OutOfMemoryError ......但我对此表示怀疑,因为每秒 1 条记录的数量。
-------------------------------------
时间:1448645109000 毫秒
-------------------------------------
15/11/27 17:25:09 INFO JobScheduler:完成作业流作业 1448645109000 ms.0 从作业集时间 1448645109000 ms
15/11/27 17:25:09 信息 KinesisBackedBlockRDD:从持久性列表中删除 RDD 102
15/11/27 17:25:09 INFO JobScheduler:总延迟:0.002 秒,时间 1448645109000 毫秒(执行:0.001 秒)
15/11/27 17:25:09 信息块管理器:删除 RDD 102
15/11/27 17:25:09 INFO KinesisInputDStream:在 NewClass.java:25 of time 1448645109000 ms 处的 createStream 中删除 RDD KinesisBackedBlockRDD[102] 块
15/11/27 17:25:09 INFO ReceivedBlockTracker:删除批次 ArrayBuffer(1448645107000 毫秒)
15/11/27 17:25:09 INFO InputInfoTracker:删除旧的批处理元数据:1448645107000 毫秒
15/11/27 17:25:10 INFO JobScheduler:添加时间为 1448645110000 毫秒的作业
15/11/27 17:25:10 INFO JobScheduler:从作业集时间 1448645110000 ms.0 开始作业流作业 1448645110000 ms.0
-------------------------------------
时间:1448645110000 毫秒
-------------------------------------
<----- 一些数据预计会出现在这里!
15/11/27 17:25:10 INFO JobScheduler:完成作业流作业 1448645110000 ms.0 从作业集时间 1448645110000 ms
15/11/27 17:25:10 INFO JobScheduler:总延迟:0.003 秒,时间 1448645110000 毫秒(执行:0.001 秒)
15/11/27 17:25:10 信息 KinesisBackedBlockRDD:从持久性列表中删除 RDD 103
15/11/27 17:25:10 INFO KinesisInputDStream:在 NewClass.java:25 of time 1448645110000 ms 处的 createStream 中删除 RDD KinesisBackedBlockRDD[103] 块
15/11/27 17:25:10 信息块管理器:删除 RDD 103
15/11/27 17:25:10 INFO ReceivedBlockTracker:删除批次 ArrayBuffer(1448645108000 毫秒)
15/11/27 17:25:10 INFO InputInfoTracker:删除旧的批处理元数据:1448645108000 毫秒
15/11/27 17:25:11 INFO JobScheduler:添加时间为 1448645111000 毫秒的作业
15/11/27 17:25:11 INFO JobScheduler:从作业集时间 1448645111000 ms.0 开始作业流作业 1448645111000 ms.0
请让我知道任何提示,我真的很想使用 Spark 进行实时分析......除了这个不接收数据的小细节:) 似乎没问题。
PS:我觉得奇怪的是,Spark 以某种方式忽略了我的存储级别(内存和磁盘 2)和检查点间隔(20,000 毫秒)的设置
15/11/27 17:23:26 信息 KinesisInputDStream:metadataCleanupDelay = -1
15/11/27 17:23:26 信息 KinesisInputDStream:滑动时间 = 1000 毫秒
15/11/27 17:23:26 信息 KinesisInputDStream:存储级别 = StorageLevel(假、假、假、假、1)
15/11/27 17:23:26 信息 KinesisInputDStream:检查点间隔 = null
15/11/27 17:23:26 信息 KinesisInputDStream:记住持续时间 = 1000 毫秒
15/11/27 17:23:26 INFO KinesisInputDStream:初始化并验证 org.apache.spark.streaming.kinesis.KinesisInputDStream@74b21a6
源代码(Java):
公共类新类{
公共静态无效主(字符串 [] args){
SparkConf conf = new SparkConf().setAppName("appname").setMaster("local[3]");
JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(1000));
JavaReceiverInputDStream kinesisStream = KinesisUtils.createStream(
ssc, "webassist-test", "test", "https://kinesis.us-west-1.amazonaws.com", "us-west-1",
InitialPositionInStream.LATEST,
新的持续时间(20000),
StorageLevel.MEMORY_AND_DISK_2()
);
kinesisStream.print();
ssc.start();
ssc.awaitTermination();
}
}
Python 代码(之前尝试过 pprinting 并发送到 MongoDB):
从 pyspark.streaming.kinesis 导入 KinesisUtils, InitialPositionInStream
从 pyspark 导入 SparkContext,StorageLevel
从 pyspark.streaming 导入 StreamingContext
从 sys 导入 argv
sc = SparkContext(appName="webassist-test")
ssc = StreamingContext(sc, 5)
流 = KinesisUtils.createStream(ssc,
"应用名",
“测试”,
"https://kinesis.us-west-1.amazonaws.com",
"us-west-1",
InitialPositionInStream.LATEST,
5、
StorageLevel.MEMORY_AND_DISK_2)
流.pprint()
ssc.start()
ssc.awaitTermination()
注意:我还尝试将数据发送到 MongoDB,stream.foreachRDD(lambda rdd: rdd.foreachPartition(send_partition))但不将其粘贴到此处,因为您需要一个 MongoDB 实例并且它与问题无关 - 输入中已经没有任何记录。
还有一件事 - KCL 从不提交。对应的 DynamoDB 如下所示:
租用密钥检查点租用计数器租用所有者ownerSwitchesSinceCheckpoint shardId-000000000000 最新 614 本地主机:d92516 ... 8
用于提交的命令:
spark-submit --executor-memory 1024m --master spark://IpAddress:7077 /path/test.py
Run Code Online (Sandbox Code Playgroud)
在 MasterUI 中,我可以看到:
Input Rate
Receivers: 1 / 1 active
Avg: 0.00 events/sec
KinesisReceiver-0
Avg: 0.00 events/sec
...
Completed Batches (last 76 out of 76)
Run Code Online (Sandbox Code Playgroud)
谢谢你的帮助!
过去在连接 Kinesis 时,我遇到过 Spark Streaming 中没有显示记录活动的问题。
我会尝试这些事情来获得更多反馈/来自 Spark 的不同行为:
确保使用foreachRDD、print、saveas等输出操作强制评估DStream转换操作。
创建流或清除现有流时,使用“Kinesis 应用程序名称”参数的新名称在 DynamoDB 中创建一个新的 KCL 应用程序。
创建流时在 TRIM_HORIZON 和 LATEST 之间切换初始位置。
当您尝试这些更改时,请重新启动上下文。
添加代码后编辑: 也许我遗漏了一些明显的东西,但我无法发现您的源代码有任何问题。您是否有 n+1 个 cpu 运行此应用程序(n 是 Kinesis 分片的数量)?
如果您运行 KCL 应用程序(Java/Python/...)从 docker 实例中的分片中读取数据,它是否有效?也许您的网络配置有问题,但我希望有一些错误消息指出它。
如果这足够重要/您有一点时间,您可以在 docker 实例中快速实现 kcl reader,并允许您与 Spark 应用程序进行比较。一些网址:
另一种选择是在不同的集群中运行 Spark Streaming 应用程序并进行比较。
PS:我目前在不同的集群中使用 Spark Streaming 1.5.2 和 Kinesis,它按预期处理记录/显示活动。