标签: amazon-kinesis-kpl

Amazon Kinesis KPL 与 AWS SDK 的优缺点

场景是我将向 kinesis 流写入大量数据(每天 TB 级)。我想知道哪种方法是实现高写入吞吐量的更好方法。我正在为生产者客户考虑以下两种选择。

选项 1:使用 Kinesis 生产者库 (KPL)。

或者

选项 2:AWS 开发工具包 (api)。

我知道 KPL 是在 aws sdk 之上使用的抽象,所以它基本上归结为(KPL with AWS-SDK)或只是 AWS-SDK。根据我的研究,在我看来,AWS-SDK 不提供将多条记录聚合到单个 put 中的能力,而 KPL 确实支持这种聚合(如果这是错误的,请纠正我)。

PutRecords(来自 Kinesis Data Streams API)和 KPL(使用聚合)都提供了高写入吞吐量,问题是这两个选项中哪个更好,为什么?简而言之,有兴趣知道在将数据写入 kinesis 流方面哪个会更快,一旦将其写入流,我就不关心它是如何读取的。也有兴趣了解两种情况下的重试机制差异和异步写入性能。

java amazon-kinesis amazon-kinesis-kpl

9
推荐指数
1
解决办法
1430
查看次数

使用java.lang.AbstractMethodError在cloudera上失败的spark kinesis

下面是我的POM文件.我正在用aws kinesis写一个火花流

<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>1.6.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.6.0</version>
    </dependency>
     <dependency>
        <groupId>com.amazonaws</groupId>
        <artifactId>amazon-kinesis-client</artifactId>
        <version>1.6.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kinesis-asl_2.10</artifactId>
        <version>1.6.0</version>
    </dependency>
Run Code Online (Sandbox Code Playgroud)

我在Cloudera 5.10的火花程序运行期间面临异常

17/04/27 05:34:04 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 58.0 (TID 179, hadoop1.local, executor 5): java.lang.AbstractMethodError
at org.apache.spark.Logging$class.log(Logging.scala:50)
at org.apache.spark.streaming.kinesis.KinesisCheckpointer.log(KinesisCheckpointer.scala:39)
at org.apache.spark.Logging$class.logDebug(Logging.scala:62)
at org.apache.spark.streaming.kinesis.KinesisCheckpointer.logDebug(KinesisCheckpointer.scala:39)
at org.apache.spark.streaming.kinesis.KinesisCheckpointer.startCheckpointerThread(KinesisCheckpointer.scala:119)
at org.apache.spark.streaming.kinesis.KinesisCheckpointer.<init>(KinesisCheckpointer.scala:50)
at org.apache.spark.streaming.kinesis.KinesisReceiver.onStart(KinesisReceiver.scala:149)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:575)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:565)
at org.apache.spark.SparkContext$$anonfun$38.apply(SparkContext.scala:2000)
at org.apache.spark.SparkContext$$anonfun$38.apply(SparkContext.scala:2000)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)

这在EMR4.4上运行完全正常但是CDH失败了.任何建议

apache-spark amazon-kinesis cloudera-cdh spark-streaming amazon-kinesis-kpl

5
推荐指数
1
解决办法
460
查看次数

如何从 Amazon connect 创建的 Kinesis Video 流中以正确的格式获取音频文件?

我正在尝试为 Amazon connect 中的呼叫中心设置语音邮件功能。我已将 Start Media 流块添加到流程中。我还添加了由 Kinesis 流触发的 Lambda 函数。我的想法不是做实时串流,但串流完成后下载文件。

import json
import base64
import boto3
import time

def lambda_handler(event,context):
    # print(event)
    record = base64.b64decode(event["Records"][0]["kinesis"]["data"]).decode('utf-8')
    record_obj = json.loads(record)
    print('record', record)
    bucket = 'yyyy'
    key = 'streams/sample123.raw'
    # try:
    s3_client = boto3.client('s3',region_name='us-east-1')
    kinesis_client_1 = boto3.client('kinesisvideo',region_name='us-east-1')
    get_ep = kinesis_client_1.get_data_endpoint(StreamARN='arn:aws:kinesisvideo:us-east-1:237980099910:stream/xxxx/1580972532224',APIName='GET_MEDIA_FOR_FRAGMENT_LIST')
    t = get_ep['DataEndpoint']
    print(t)
    kinesis_client_2= boto3.client('kinesis-video-archived-media',region_name='us-east-1',endpoint_url=t)
    response = kinesis_client_2.list_fragments(StreamName='xxxx',
        # MaxResults=123,
        # NextToken='string',
        FragmentSelector={
            'FragmentSelectorType': 'SERVER_TIMESTAMP',
            'TimestampRange': {
                'StartTimestamp': '2020-02-07T05:21:30Z',
                'EndTimestamp': '2020-02-07T05:22:08Z'
            }
        }
        )

    fragments_list = res = [ sub['FragmentNumber'] for …
Run Code Online (Sandbox Code Playgroud)

amazon-web-services amazon-kinesis amazon-kinesis-kpl amazon-kinesis-agent amazon-connect

5
推荐指数
0
解决办法
667
查看次数

如何在 Kotlin 中将数据类转换为 ByteBuffer?

我正在尝试使用 Kinesis,它需要字节缓冲区格式的数据。到目前为止,我看到的所有示例都是用 Java 编写的,并传递简单的字符串。任何人都可以知道如何将 kotlin 数据类转换为 bytebuffer 吗?

例如数据类 abc ( var a: Long, var b: String, var c: Double )

bytebuffer kotlin aws-sdk amazon-kinesis-kpl

5
推荐指数
2
解决办法
1万
查看次数

Kinesis vs KPL vs KCL

这是一个有点浅层次的问题。然而,我对这三项服务感到困惑。

据我了解,KPL 生成快速数据,而 KCL 消耗 Kinesis 生成的快速数据。但是,我不明白的是,如果 KPL 和 KCL 组成这一对,我们需要 AWS Kinesis 做什么?

另一种看待方式:如果 AWS Kinesis 可以生成快速数据并且 KCL 可以使用它,那么我们需要 KPL 做什么?

任何澄清的答案将不胜感激。

amazon-kinesis amazon-kinesis-kpl amazon-kinesis-firehose

5
推荐指数
1
解决办法
4696
查看次数

通过设置 API 网关为 Amazon Kinesis 调用 REST API

我正在尝试发送 HTTP Post 请求以将记录放入 Amazon Kinesis Stream。有多种方法(Kinesis 客户端、KPL、将 AWS 网关设置为 Kinesis 代理)。

我看到了关于 Kinesis PutRecord API http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html 的文档

POST / HTTP/1.1
Host: kinesis.<region>.<domain>
Content-Length: <PayloadSizeBytes>
User-Agent: <UserAgentString>
Content-Type: application/x-amz-json-1.1
Authorization: <AuthParams>
Connection: Keep-Alive 
X-Amz-Date: <Date>
X-Amz-Target: Kinesis_20131202.PutRecord
{
  "StreamName": "exampleStreamName",
  "Data": "XzxkYXRhPl8x",
  "PartitionKey": "partitionKey"
}
Run Code Online (Sandbox Code Playgroud)

是否可以将上述 HTTP POST 请求发送到 PutRecord,而无需按照此链接中的说明设置 Amazon API 网关:http ://docs.aws.amazon.com/apigateway/latest/developerguide/use-custom-authorizer 。 html#call-api-with-api-gateway-custom-authorization

KPL 和 Kinesis Client 必须以某种方式在内部使用 HTTP POST 到 PutRecord,因此必须有一种方法可以这样做。不幸的是,我在网上找不到任何资源。

amazon-kinesis amazon-kinesis-kpl

4
推荐指数
1
解决办法
4940
查看次数

AWS Kinesis 如何限制写入吞吐量?

AWS Kinesis 的写入吞吐量相当低,为 1000 次写入/秒和 1MB/写入秒。Kinesis 如何强制执行此限制?如果我尝试在一秒钟内执行 1500 次写入,那么额外的 500 次写入是否会被放入某种队列中,还是会直接失败?

amazon-web-services amazon-kinesis amazon-kinesis-kpl

3
推荐指数
1
解决办法
1万
查看次数

使用 PutRecords 将多条记录加载到 Kinesis - 如何在发生故障时仅重新发送失败的记录?

我\xe2\x80\x99m 使用 Lambda 将数据记录加载到 Kinesis 中,并且经常想要添加最多 500K 条记录,我将这些记录批量分成 500 条,并使用 Boto 的方法将put_records它们发送到 Kinesis。我有时会看到由于超出允许的吞吐量而导致的失败。

\n\n

发生这种情况时重试的最佳方法是什么?理想情况下,我不想在数据流中出现重复的消息,所以我不想简单地重新发送所有 500 条记录,但我很难知道如何仅重试失败的消息。该方法的响应put_records似乎非常有用。

\n\n

我可以依赖响应记录列表的顺序与我传递给 putRecords 的列表的顺序相同吗?

\n\n

我知道我可以增加分片数量,但我\xe2\x80\x99d 希望显着增加将数据加载到此 Kinesis 流的并行 Lambda 函数的数量。我们计划根据源系统对数据进行分区,我不能保证多个函数不会将数据写入同一个分片并超出允许的吞吐量。因此,我认为增加分片不会消除重试策略的需要。

\n\n

或者,有人知道 KPL 是否会自动为我处理这个问题?

\n

amazon-web-services amazon-kinesis aws-lambda amazon-kinesis-kpl

3
推荐指数
1
解决办法
5039
查看次数