标签: amazon-kinesis

我为什么要使用Amazon Kinesis而不是SNS-SQS?

我有一个用例,会有数据流来,我不能以相同的速度消耗它,需要一个缓冲区.这可以使用SNS-SQS队列来解决.我开始知道Kinesis解决了同样的目的,那有什么区别?为什么我更喜欢(或不应该更喜欢)Kinesis?

amazon-sqs amazon-web-services amazon-kinesis

146
推荐指数
11
解决办法
6万
查看次数

application_(state:ACCEPTED)的应用程序报告永远不会结束Spark Submit(在YARN上使用Spark 1.2.0)

我正在运行kinesis plus spark应用程序 https://spark.apache.org/docs/1.2.0/streaming-kinesis-integration.html

我正在运行如下

ec2实例上的命令:

 ./spark/bin/spark-submit --class org.apache.spark.examples.streaming.myclassname --master yarn-cluster --num-executors 2 --driver-memory 1g --executor-memory 1g --executor-cores 1  /home/hadoop/test.jar 
Run Code Online (Sandbox Code Playgroud)

我在EMR上安装了火花.

EMR details
Master instance group - 1   Running MASTER  m1.medium   
1

Core instance group - 2 Running CORE    m1.medium
Run Code Online (Sandbox Code Playgroud)

我越来越低于INFO,它永远不会结束.

15/06/14 11:33:23 INFO yarn.Client: Requesting a new application from cluster with 2 NodeManagers
15/06/14 11:33:23 INFO yarn.Client: Verifying our application has not requested more than the maximum memory capability of the cluster (2048 MB per container)
15/06/14 11:33:23 INFO …
Run Code Online (Sandbox Code Playgroud)

amazon-emr hadoop-yarn apache-spark amazon-kinesis

47
推荐指数
5
解决办法
4万
查看次数

AWS Kinesis中的分区键是什么?

我正在读书AWS Kinesis.在以下程序中,我将数据写入名为的流中TestStream.我运行了这段代码10次,在流中插入10条记录.

var params = {
    Data: 'More Sample data into the test stream ...',
    PartitionKey: 'TestKey_1',
    StreamName: 'TestStream'
};

kinesis.putRecord(params, function(err, data) {
   if (err) console.log(err, err.stack); // an error occurred
   else     console.log(data);           // successful response
});
Run Code Online (Sandbox Code Playgroud)

所有记录都已成功插入.partition key这里到底意味着什么?它在后台做什么?我阅读了它的文档,但不明白它的含义.

stream amazon-web-services node.js amazon-kinesis

28
推荐指数
2
解决办法
1万
查看次数

Amazon Kinesis和AWS Lambda重试

我对Amazon Kinesis很新,所以也许这只是我理解中的一个问题,但在AWS Lambda FAQ中它说:

每个分片严格序列化发送到您的AWS Lambda函数的Amazon Kinesis和DynamoDB Streams记录.这意味着如果将两个记录放在同一个分片中,Lambda保证在使用第二个记录调用Lambda函数之前,将使用第一个记录成功调用它.如果一个记录的调用超时,受到限制或遇到任何其他错误,Lambda将重试直到成功(或记录达到其24小时到期),然后再转到下一个记录.不保证跨不同分片的记录顺序,并且每个分片的处理并行发生.

我的问题是,如果由于某种原因某些格式错误的数据被生产者放入一个分片并且当Lambda函数选择它时它会出错,然后只是不断重试会发生什么?这意味着该特定分片的处理将被错误阻止24小时.

处理应用程序错误的最佳做法是将问题包装在自定义错误中,并将此错误与所有成功处理的记录一起发送到下游并让消费者处理它?当然,在一个不可恢复的错误导致程序像空指针一样崩溃的情况下,这仍然无济于事:我们将在接下来的24小时内再次回到阻塞重试循环.

amazon-web-services amazon-kinesis aws-lambda

22
推荐指数
2
解决办法
1万
查看次数

火花流检查点恢复非常非常缓慢

  • 目标:从Kinesis读取并通过火花流将数据以Parquet格式存储到S3.
  • 情况:应用程序最初运行正常,运行1小时的批次,平均处理时间少于30分钟.出于某种原因,我们可以说应用程序崩溃了,我们尝试从检查点重新启动.处理现在需要永远,而不是前进.我们尝试以1分钟的批处理间隔测试相同的东西,处理运行良好,批次完成需要1.2分钟.当我们从检查点恢复时,每批需要大约15分钟.
  • 注意:我们使用s3作为检查点使用1个执行器,每个执行器有19g内存和3个内核

附上截图:

首次运行 - 检查点恢复之前 在检查点之前 - 流媒体页面

在检查点之前 - 工作页面

在检查点之前 - 乔布斯Page2

试图从检查点恢复: 检查点之后 -  Streaming Page 检查点后 - 工作页面

Config.scala

object Config {

  val sparkConf = new SparkConf


  val sc = new SparkContext(sparkConf)

  val sqlContext = new HiveContext(sc)


  val eventsS3Path = sc.hadoopConfiguration.get("eventsS3Path")
  val useIAMInstanceRole = sc.hadoopConfiguration.getBoolean("useIAMInstanceRole",true)

  val checkpointDirectory =  sc.hadoopConfiguration.get("checkpointDirectory")

//  sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class","org.apache.spark.sql.parquet.DirectParquetOutputCommitter")

  DateTimeZone.setDefault(DateTimeZone.forID("America/Los_Angeles"))

  val numStreams = 2

  def getSparkContext(): SparkContext = {
    this.sc
  }

  def getSqlContext(): HiveContext = {
    this.sqlContext
  }





}
Run Code Online (Sandbox Code Playgroud)

S3Basin.scala

object S3Basin {
  def main(args: Array[String]): Unit = {
    Kinesis.startStreaming(s3basinFunction _)
  }

  def s3basinFunction(streams : DStream[Array[Byte]]): Unit ={ …
Run Code Online (Sandbox Code Playgroud)

amazon-s3 apache-spark amazon-kinesis spark-streaming checkpointing

18
推荐指数
1
解决办法
1500
查看次数

Kinesis Stream和DynamoDB流之间的区别

他们似乎对我做同样的事情.任何人都可以向我解释这个区别吗?

amazon-kinesis amazon-dynamodb-streams

16
推荐指数
2
解决办法
8671
查看次数

我可以自动将换行添加到AWS Firehose记录吗?

我正在尝试使用以下设置配置Kinesis Analytics应用程序:

  • 输入流是Kinesis Firehose,它采用字符串化的JSON值
  • SQL是一个简单的直通(以后需要更复杂但是为了测试,它只是通过发送数据)
  • 输出流是第二个Kinesis Firehose,它将记录传送到S3存储桶

接下来,我将使用Hive + JSONSERDE导入S3存储桶的内容,希望每个JSON记录都在自己的生产线上.Firehose输出只会附加所有打破JSONSERDE的JSON记录.

可以将AWS Lambda数据格式化程序附加到输出流,但这看起来很昂贵.我想要的是使用换行符分割每条记录.

如果我没有使用Google Analytics应用程序,我会将换行符附加到每个Firehose记录中.在应用程序的SQL中没有办法做到这一点似乎很奇怪:

CREATE OR REPLACE STREAM "STREAM_OUT" (
  a VARCHAR(4),
  b VARCHAR(4),
  c VARCHAR(4)
);
CREATE OR REPLACE PUMP "STREAM_PUMP" AS
  INSERT INTO "STREAM_OUT"
    SELECT STREAM
      "a",
      "b",
      "c"
    FROM "SOURCE_SQL_STREAM_001";
Run Code Online (Sandbox Code Playgroud)

是添加Lambda数据格式化程序的最佳答案吗?我真的很想避免这种情况.

amazon-kinesis amazon-kinesis-firehose

16
推荐指数
2
解决办法
4812
查看次数

AWS Lambda无法连接到RDS实例,但我可以在本地吗?

我正在尝试从lambda连接到我的RDS实例.我在本地写了lambda并在本地测试,一切都很好用.我部署到lambda,突然它不起作用.下面是我正在运行的代码,如果有帮助,我将通过kinesis流调用lambda.

'use strict';

exports.handler = (event, context, handlerCallback) => {
    console.log('Recieved request for kinesis events!');
    console.log(event);
    console.log(context);

    const connectionDetails = {
        host:     RDS_HOST,
        port:     5432,
        database: RDS_DATABASE,
        user:     RDS_USER,
        password: RDS_PASSWORD
    };

    const db = require('pg-promise')({promiseLib: require('bluebird')})(connectionDetails);

    db
            .tx(function () {
                console.log('Beginning query');

                return this.query("SELECT 'foobar'")
                           .then(console.log)
                           .catch(console.log)
                           .finally(console.log);
            })
            .finally(() => handlerCallback());
};
Run Code Online (Sandbox Code Playgroud)

以下是来自云监视的日志,如果它有帮助:

START RequestId: *********-****-****-****-********* Version: $LATEST 
2016-05-31T20:58:25.086Z    *********-****-****-****-*********  Recieved request for kinesis events! 
2016-05-31T20:58:25.087Z    *********-****-****-****-*********  { Records:  [ { kinesis: [Object], eventSource: 'aws:kinesis', eventVersion: '1.0', eventID: …
Run Code Online (Sandbox Code Playgroud)

javascript amazon-web-services amazon-rds amazon-kinesis aws-lambda

15
推荐指数
3
解决办法
8198
查看次数

读取Amazon Kinesis Firehose流写入s3的数据

我正在写Kinesis Firehose流的记录,最终由Amazon Kinesis Firehose写入S3文件.

我的记录对象看起来像

ItemPurchase {
    String personId,
    String itemId
}
Run Code Online (Sandbox Code Playgroud)

数据写入S3看起来像:

{"personId":"p-111","itemId":"i-111"}{"personId":"p-222","itemId":"i-222"}{"personId":"p-333","itemId":"i-333"}
Run Code Online (Sandbox Code Playgroud)

没有COMMA分离.

像Json阵列一样没有启动支架

[
Run Code Online (Sandbox Code Playgroud)

没有结束支持,如在Json阵列中

]
Run Code Online (Sandbox Code Playgroud)

我想读取这些数据获取ItemPurchase对象的列表.

List<ItemPurchase> purchases = getPurchasesFromS3(IOUtils.toString(s3ObjectContent))
Run Code Online (Sandbox Code Playgroud)

读取此数据的正确方法是什么?

json amazon-s3 amazon-kinesis amazon-kinesis-firehose

14
推荐指数
3
解决办法
6045
查看次数

亚马逊Kinesis并保证订购

亚马逊声称他们的Kinesis流媒体产品保证了唱片订购.

它提供记录的顺序,以及以相同顺序读取和/或重放记录的能力(...)

Kinesis由Streams组成,Streams本身由一个或多个Shards组成.记录存储在这些碎片中.我们可以编写连接到Shard的消费者应用程序,并按照它们存储的顺序读取/重放记录.

但Kinesis能否开箱即用,为Stream本身订购,而无需向消费者推销订购逻辑?消费者如何从同一个Stream的多个Shards中读取记录,确保记录的读取顺序与它们添加到Stream中的顺序相同?

amazon amazon-web-services amazon-kinesis

14
推荐指数
2
解决办法
8922
查看次数