我有一个用例,会有数据流来,我不能以相同的速度消耗它,需要一个缓冲区.这可以使用SNS-SQS队列来解决.我开始知道Kinesis解决了同样的目的,那有什么区别?为什么我更喜欢(或不应该更喜欢)Kinesis?
我正在运行kinesis plus spark应用程序 https://spark.apache.org/docs/1.2.0/streaming-kinesis-integration.html
我正在运行如下
ec2实例上的命令:
./spark/bin/spark-submit --class org.apache.spark.examples.streaming.myclassname --master yarn-cluster --num-executors 2 --driver-memory 1g --executor-memory 1g --executor-cores 1 /home/hadoop/test.jar
Run Code Online (Sandbox Code Playgroud)
我在EMR上安装了火花.
EMR details
Master instance group - 1 Running MASTER m1.medium
1
Core instance group - 2 Running CORE m1.medium
Run Code Online (Sandbox Code Playgroud)
我越来越低于INFO,它永远不会结束.
15/06/14 11:33:23 INFO yarn.Client: Requesting a new application from cluster with 2 NodeManagers
15/06/14 11:33:23 INFO yarn.Client: Verifying our application has not requested more than the maximum memory capability of the cluster (2048 MB per container)
15/06/14 11:33:23 INFO …Run Code Online (Sandbox Code Playgroud) 我正在读书AWS Kinesis.在以下程序中,我将数据写入名为的流中TestStream.我运行了这段代码10次,在流中插入10条记录.
var params = {
Data: 'More Sample data into the test stream ...',
PartitionKey: 'TestKey_1',
StreamName: 'TestStream'
};
kinesis.putRecord(params, function(err, data) {
if (err) console.log(err, err.stack); // an error occurred
else console.log(data); // successful response
});
Run Code Online (Sandbox Code Playgroud)
所有记录都已成功插入.partition key这里到底意味着什么?它在后台做什么?我阅读了它的文档,但不明白它的含义.
我对Amazon Kinesis很新,所以也许这只是我理解中的一个问题,但在AWS Lambda FAQ中它说:
每个分片严格序列化发送到您的AWS Lambda函数的Amazon Kinesis和DynamoDB Streams记录.这意味着如果将两个记录放在同一个分片中,Lambda保证在使用第二个记录调用Lambda函数之前,将使用第一个记录成功调用它.如果一个记录的调用超时,受到限制或遇到任何其他错误,Lambda将重试直到成功(或记录达到其24小时到期),然后再转到下一个记录.不保证跨不同分片的记录顺序,并且每个分片的处理并行发生.
我的问题是,如果由于某种原因某些格式错误的数据被生产者放入一个分片并且当Lambda函数选择它时它会出错,然后只是不断重试会发生什么?这意味着该特定分片的处理将被错误阻止24小时.
处理应用程序错误的最佳做法是将问题包装在自定义错误中,并将此错误与所有成功处理的记录一起发送到下游并让消费者处理它?当然,在一个不可恢复的错误导致程序像空指针一样崩溃的情况下,这仍然无济于事:我们将在接下来的24小时内再次回到阻塞重试循环.
附上截图:
Config.scala
object Config {
val sparkConf = new SparkConf
val sc = new SparkContext(sparkConf)
val sqlContext = new HiveContext(sc)
val eventsS3Path = sc.hadoopConfiguration.get("eventsS3Path")
val useIAMInstanceRole = sc.hadoopConfiguration.getBoolean("useIAMInstanceRole",true)
val checkpointDirectory = sc.hadoopConfiguration.get("checkpointDirectory")
// sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class","org.apache.spark.sql.parquet.DirectParquetOutputCommitter")
DateTimeZone.setDefault(DateTimeZone.forID("America/Los_Angeles"))
val numStreams = 2
def getSparkContext(): SparkContext = {
this.sc
}
def getSqlContext(): HiveContext = {
this.sqlContext
}
}
Run Code Online (Sandbox Code Playgroud)
S3Basin.scala
object S3Basin {
def main(args: Array[String]): Unit = {
Kinesis.startStreaming(s3basinFunction _)
}
def s3basinFunction(streams : DStream[Array[Byte]]): Unit ={ …Run Code Online (Sandbox Code Playgroud) amazon-s3 apache-spark amazon-kinesis spark-streaming checkpointing
他们似乎对我做同样的事情.任何人都可以向我解释这个区别吗?
我正在尝试使用以下设置配置Kinesis Analytics应用程序:
接下来,我将使用Hive + JSONSERDE导入S3存储桶的内容,希望每个JSON记录都在自己的生产线上.Firehose输出只会附加所有打破JSONSERDE的JSON记录.
我可以将AWS Lambda数据格式化程序附加到输出流,但这看起来很昂贵.我想要的是使用换行符分割每条记录.
如果我没有使用Google Analytics应用程序,我会将换行符附加到每个Firehose记录中.在应用程序的SQL中没有办法做到这一点似乎很奇怪:
CREATE OR REPLACE STREAM "STREAM_OUT" (
a VARCHAR(4),
b VARCHAR(4),
c VARCHAR(4)
);
CREATE OR REPLACE PUMP "STREAM_PUMP" AS
INSERT INTO "STREAM_OUT"
SELECT STREAM
"a",
"b",
"c"
FROM "SOURCE_SQL_STREAM_001";
Run Code Online (Sandbox Code Playgroud)
是添加Lambda数据格式化程序的最佳答案吗?我真的很想避免这种情况.
我正在尝试从lambda连接到我的RDS实例.我在本地写了lambda并在本地测试,一切都很好用.我部署到lambda,突然它不起作用.下面是我正在运行的代码,如果有帮助,我将通过kinesis流调用lambda.
'use strict';
exports.handler = (event, context, handlerCallback) => {
console.log('Recieved request for kinesis events!');
console.log(event);
console.log(context);
const connectionDetails = {
host: RDS_HOST,
port: 5432,
database: RDS_DATABASE,
user: RDS_USER,
password: RDS_PASSWORD
};
const db = require('pg-promise')({promiseLib: require('bluebird')})(connectionDetails);
db
.tx(function () {
console.log('Beginning query');
return this.query("SELECT 'foobar'")
.then(console.log)
.catch(console.log)
.finally(console.log);
})
.finally(() => handlerCallback());
};
Run Code Online (Sandbox Code Playgroud)
以下是来自云监视的日志,如果它有帮助:
START RequestId: *********-****-****-****-********* Version: $LATEST
2016-05-31T20:58:25.086Z *********-****-****-****-********* Recieved request for kinesis events!
2016-05-31T20:58:25.087Z *********-****-****-****-********* { Records: [ { kinesis: [Object], eventSource: 'aws:kinesis', eventVersion: '1.0', eventID: …Run Code Online (Sandbox Code Playgroud) javascript amazon-web-services amazon-rds amazon-kinesis aws-lambda
我正在写Kinesis Firehose流的记录,最终由Amazon Kinesis Firehose写入S3文件.
我的记录对象看起来像
ItemPurchase {
String personId,
String itemId
}
Run Code Online (Sandbox Code Playgroud)
数据写入S3看起来像:
{"personId":"p-111","itemId":"i-111"}{"personId":"p-222","itemId":"i-222"}{"personId":"p-333","itemId":"i-333"}
Run Code Online (Sandbox Code Playgroud)
没有COMMA分离.
像Json阵列一样没有启动支架
[
Run Code Online (Sandbox Code Playgroud)
没有结束支持,如在Json阵列中
]
Run Code Online (Sandbox Code Playgroud)
我想读取这些数据获取ItemPurchase对象的列表.
List<ItemPurchase> purchases = getPurchasesFromS3(IOUtils.toString(s3ObjectContent))
Run Code Online (Sandbox Code Playgroud)
读取此数据的正确方法是什么?
亚马逊声称他们的Kinesis流媒体产品保证了唱片订购.
它提供记录的顺序,以及以相同顺序读取和/或重放记录的能力(...)
Kinesis由Streams组成,Streams本身由一个或多个Shards组成.记录存储在这些碎片中.我们可以编写连接到Shard的消费者应用程序,并按照它们存储的顺序读取/重放记录.
但Kinesis能否开箱即用,为Stream本身订购,而无需向消费者推销订购逻辑?消费者如何从同一个Stream的多个Shards中读取记录,确保记录的读取顺序与它们添加到Stream中的顺序相同?
amazon-kinesis ×10
amazon-s3 ×2
apache-spark ×2
aws-lambda ×2
amazon ×1
amazon-emr ×1
amazon-rds ×1
amazon-sqs ×1
hadoop-yarn ×1
javascript ×1
json ×1
node.js ×1
stream ×1