Apache Spark Kinesis 集成:已连接,但未收到任何记录

pko*_*pac 5 apache-spark amazon-kinesis spark-streaming

tldr; 无法使用 Kinesis Spark Streaming 集成,因为它不接收任何数据。

  1. 测试流已设置,nodejs 应用程序每秒发送 1 个简单记录。
  2. 标准 Spark 1.5.2 集群设置主节点和工作节点(4 核),环境中带有 docker-compose、AWS 凭证
  3. spark-streaming-kinesis-asl-assembly_2.10-1.5.2.jar 下载并添加到类路径
  4. job.pyjob.jar(只是阅读和打印)提交。
  5. 一切似乎都很好,但没有收到任何记录。

KCL Worker 线程有时会说“正在睡觉……” - 它可能会被悄悄地破坏(我检查了我能找到的所有 stderr,但没有任何提示)。也许吞下了 OutOfMemoryError ......但我对此表示怀疑,因为每秒 1 条记录的数量。

    -------------------------------------
    时间:1448645109000 毫秒
    -------------------------------------

    15/11/27 17:25:09 INFO JobScheduler:完成作业流作业 1448645109000 ms.0 从作业集时间 1448645109000 ms
    15/11/27 17:25:09 信息 KinesisBackedBlockRDD:从持久性列表中删除 RDD 102
    15/11/27 17:25:09 INFO JobScheduler:总延迟:0.002 秒,时间 1448645109000 毫秒(执行:0.001 秒)
    15/11/27 17:25:09 信息块管理器:删除 RDD 102
    15/11/27 17:25:09 INFO KinesisInputDStream:在 NewClass.java:25 of time 1448645109000 ms 处的 createStream 中删除 RDD KinesisBackedBlockRDD[102] 块
    15/11/27 17:25:09 INFO ReceivedBlockTracker:删除批次 ArrayBuffer(1448645107000 毫秒)
    15/11/27 17:25:09 INFO InputInfoTracker:删除旧的批处理元数据:1448645107000 毫秒
    15/11/27 17:25:10 INFO JobScheduler:添加时间为 1448645110000 毫秒的作业
    15/11/27 17:25:10 INFO JobScheduler:从作业集时间 1448645110000 ms.0 开始作业流作业 1448645110000 ms.0
    -------------------------------------
    时间:1448645110000 毫秒
    -------------------------------------
          <----- 一些数据预计会出现在这里!
    15/11/27 17:25:10 INFO JobScheduler:完成作业流作业 1448645110000 ms.0 从作业集时间 1448645110000 ms
    15/11/27 17:25:10 INFO JobScheduler:总延迟:0.003 秒,时间 1448645110000 毫秒(执行:0.001 秒)
    15/11/27 17:25:10 信息 KinesisBackedBlockRDD:从持久性列表中删除 RDD 103
    15/11/27 17:25:10 INFO KinesisInputDStream:在 NewClass.java:25 of time 1448645110000 ms 处的 createStream 中删除 RDD KinesisBackedBlockRDD[103] 块
    15/11/27 17:25:10 信息块管理器:删除 RDD 103
    15/11/27 17:25:10 INFO ReceivedBlockTracker:删除批次 ArrayBuffer(1448645108000 毫秒)
    15/11/27 17:25:10 INFO InputInfoTracker:删除旧的批处理元数据:1448645108000 毫秒
    15/11/27 17:25:11 INFO JobScheduler:添加时间为 1448645111000 毫秒的作业
    15/11/27 17:25:11 INFO JobScheduler:从作业集时间 1448645111000 ms.0 开始作业流作业 1448645111000 ms.0

请让我知道任何提示,我真的很想使用 Spark 进行实时分析......除了这个不接收数据的小细节:) 似乎没问题。

PS:我觉得奇怪的是,Spark 以某种方式忽略了我的存储级别(内存和磁盘 2)和检查点间隔(20,000 毫秒)的设置

    15/11/27 17:23:26 信息 KinesisInputDStream:metadataCleanupDelay = -1
    15/11/27 17:23:26 信息 KinesisInputDStream:滑动时间 = 1000 毫秒
    15/11/27 17:23:26 信息 KinesisInputDStream:存储级别 = StorageLevel(假、假、假、假、1)
    15/11/27 17:23:26 信息 KinesisInputDStream:检查点间隔 = null
    15/11/27 17:23:26 信息 KinesisInputDStream:记住持续时间 = 1000 毫秒
    15/11/27 17:23:26 INFO KinesisInputDStream:初始化并验证 org.apache.spark.streaming.kinesis.KinesisInputDStream@74b21a6

源代码(Java):

    公共类新类{

        公共静态无效主(字符串 [] args){
            SparkConf conf = new SparkConf().setAppName("appname").setMaster("local[3]");
            JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(1000));
            JavaReceiverInputDStream kinesisStream = KinesisUtils.createStream(
                    ssc, "webassist-test", "test", "https://kinesis.us-west-1.amazonaws.com", "us-west-1",
                    InitialPositionInStream.LATEST,
                    新的持续时间(20000),
                    StorageLevel.MEMORY_AND_DISK_2()
            );
            kinesisStream.print();
            ssc.start();
            ssc.awaitTermination();
        }
    }

Python 代码(之前尝试过 pprinting 并发送到 MongoDB):

    从 pyspark.streaming.kinesis 导入 KinesisUtils, InitialPositionInStream
    从 pyspark 导入 SparkContext,StorageLevel
    从 pyspark.streaming 导入 StreamingContext
    从 sys 导入 argv

    sc = SparkContext(appName="webassist-test")
    ssc = StreamingContext(sc, 5)

    流 = KinesisUtils.createStream(ssc,
         "应用名",
         “测试”,
         "https://kinesis.us-west-1.amazonaws.com",
         "us-west-1",
         InitialPositionInStream.LATEST,
         5、
         StorageLevel.MEMORY_AND_DISK_2)

    流.pprint()
    ssc.start()
    ssc.awaitTermination()

注意:我还尝试将数据发送到 MongoDB,stream.foreachRDD(lambda rdd: rdd.foreachPartition(send_partition))但不将其粘贴到此处,因为您需要一个 MongoDB 实例并且它与问题无关 - 输入中已经没有任何记录。

还有一件事 - KCL 从不提交。对应的 DynamoDB 如下所示:

租用密钥检查点租用计数器租用所有者ownerSwitchesSinceCheckpoint
shardId-000000000000 最新 614 本地主机:d92516 ... 8

用于提交的命令:

spark-submit --executor-memory 1024m --master spark://IpAddress:7077 /path/test.py
Run Code Online (Sandbox Code Playgroud)

在 MasterUI 中,我可以看到:

 Input Rate
   Receivers: 1 / 1 active
   Avg: 0.00 events/sec
 KinesisReceiver-0
   Avg: 0.00 events/sec
...
 Completed Batches (last 76 out of 76)
Run Code Online (Sandbox Code Playgroud)

谢谢你的帮助!

Mig*_*lvo 2

过去在连接 Kinesis 时,我遇到过 Spark Streaming 中没有显示记录活动的问题。

我会尝试这些事情来获得更多反馈/来自 Spark 的不同行为:

  1. 确保使用foreachRDDprintsaveas等输出操作强制评估DStream转换操作。

  2. 创建流或清除现有流时,使用“Kinesis 应用程序名称”参数的新名称在 DynamoDB 中创建一个新的 KCL 应用程序。

  3. 创建流时在 TRIM_HORIZON 和 LATEST 之间切换初始位置。

  4. 当您尝试这些更改时,请重新启动上下文。

添加代码后编辑: 也许我遗漏了一些明显的东西,但我无法发现您的源代码有任何问题。您是否有 n+1 个 cpu 运行此应用程序(n 是 Kinesis 分片的数量)?

如果您运行 KCL 应用程序(Java/Python/...)从 docker 实例中的分片中读取数据,它是否有效?也许您的网络配置有问题,但我希望有一些错误消息指出它。

如果这足够重要/您有一点时间,您可以在 docker 实例中快速实现 kcl reader,并允许您与 Spark 应用程序进行比较。一些网址:

Python

爪哇

Python示例

另一种选择是在不同的集群中运行 Spark Streaming 应用程序并进行比较。

PS:我目前在不同的集群中使用 Spark Streaming 1.5.2 和 Kinesis,它按预期处理记录/显示活动。