我正在运行kinesis plus spark应用程序 https://spark.apache.org/docs/1.2.0/streaming-kinesis-integration.html
我正在运行如下
ec2实例上的命令:
./spark/bin/spark-submit --class org.apache.spark.examples.streaming.myclassname --master yarn-cluster --num-executors 2 --driver-memory 1g --executor-memory 1g --executor-cores 1 /home/hadoop/test.jar
Run Code Online (Sandbox Code Playgroud)
我在EMR上安装了火花.
EMR details
Master instance group - 1 Running MASTER m1.medium
1
Core instance group - 2 Running CORE m1.medium
Run Code Online (Sandbox Code Playgroud)
我越来越低于INFO,它永远不会结束.
15/06/14 11:33:23 INFO yarn.Client: Requesting a new application from cluster with 2 NodeManagers
15/06/14 11:33:23 INFO yarn.Client: Verifying our application has not requested more than the maximum memory capability of the cluster (2048 MB per container)
15/06/14 11:33:23 INFO …Run Code Online (Sandbox Code Playgroud) 我是新手,并试图在版本为1.3.1的Amazon集群上安装spark.当我做
SparkConf sparkConfig = new SparkConf().setAppName("SparkSQLTest").setMaster("local[2]");
Run Code Online (Sandbox Code Playgroud)
它对我有用,但我知道这是为了测试目的我可以设置本地[2]
当我尝试使用群集模式时,我将其更改为
SparkConf sparkConfig = new SparkConf().setAppName("SparkSQLTest").setMaster("spark://localhost:7077");
Run Code Online (Sandbox Code Playgroud)
有了这个我得到低于错误
试图与无法访问的远程地址关联[akka.tcp:// sparkMaster @ localhost:7077].地址现在被封闭了5000毫秒,所有发送到此地址的消息都将被发送到死信.原因:连接被拒绝15/06/10 15:22:21 INFO client.AppClient $ ClientActor:连接到master akka.tcp:// sparkMaster @ localhost:7077/user/Master ..
有人可以让我如何设置主网址.
我有Kinesis Stream捕获的事件.我希望将所有事件放在S3上的特定文件夹结构中.我想创建一个带有日期戳的文件夹,就像6月15日的所有事件应该进入该文件夹一样,6月16日之后新文件夹应该来挑选事件等等.
作为Kinesis的新手,我正在使用文档,我发现有连接器框架,其中S3Emitter与配置一起用于选择需要发出数据的S3位置.但是有人可以建议我如何维护文件夹结构在日期文件夹中捕获事件日期?
我试图了解如何部署使用Kinesis客户端库(KCL)构建的Amazon Kinesis客户端应用程序.
我找到了这个,但它只说明了
部署Amazon Kinesis应用程序时,您可以按照自己的最佳实践将代码部署到Amazon EC2实例.例如,您可以将Amazon Kinesis应用程序添加到您的某个Amazon EC2 AMI中.
这并没有给我更广泛的了解.
这些示例使用Ant脚本来运行Java程序.这是最好的做法吗?
另外,我甚至在运行EC2实例之前就明白了,我需要确定
有人可以在此添加更多细节吗?
嗨我正在按照hashicorp/terraform给出的步骤执行以下活动
# Get latest master branch's dependencies staged in local $GOPATH
git checkout master
git pull
godep restore -v
# Make your way to the dependency in question and checkout the target ref
pushd $GOPATH/src/github.com/some/dependency
git checkout [latest]
# Head back to Terraform on a feature branch and update the dependncy to the
# version currently in your $GOPATH
popd
git checkout my-feature-branch
godep update github.com/...
Run Code Online (Sandbox Code Playgroud)
在此之后,我可以看到我的Godep.json文件已更新,但我没有看到供应商文件夹中的更改.它仍指向旧.好吧,我正在寻找来自供应商的emr支持,因为我正在更新go-aws-sdk,这是最新的go-aws-sdk.当我调用go update github.com/...它修改了godep.json而不是vendor文件夹.有人可以让我知道原因.谢谢
我在 EMR 上运行 spark 1.4.1。我正在尝试使用 10 个具有 122G 和 16 核内存的节点来处理 EMR 上的大量数据。一段时间后,我低于例外情况。
org.apache.spark.shuffle.FetchFailedException: java.lang.RuntimeException: Executor is not registered (appId=application_1439264479594_0002, execId=17)
at org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getBlockData(ExternalShuffleBlockResolver.java:105)
at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.receive(ExternalShuffleBlockHandler.java:74)
at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114)
at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87)
at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101)
at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)
有人可以告诉我我应该如何调试这个。谢谢
我是Kinesis的新手.读出我发现的文档,我可以创建Kinesis Stream来从Producer获取数据.然后使用KCL将从Stream读取此数据以进一步处理.我理解如何通过实现IRecordProcessor来编写KCL应用程序.
然而,关于如何将数据放在Kinesis流上的第一阶段对我来说仍然不清楚.我们是否有一些确实需要实现的AWS API.
场景:我有一台服务器,可以从文件夹中的各种来源连续获取数据.每个文件夹都包含文本文件,其行包含用于更快分析工作的必需属性.我必须将所有这些数据推送到Kinesis Stream.
我需要一些代码,如下面的类putData方法将用于Kinesis流中
public class Put {
AmazonKinesisClient kinesisClient;
Put()
{
String accessKey = "My Access Key here" ;
String secretKey = "My Secret Key here" ;
AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey);
kinesisClient = new AmazonKinesisClient(credentials);
kinesisClient.setEndpoint("kinesis.us-east-1.amazonaws.com", "kinesis", "us-east-1");
System.out.println("starting the Put Application");
}
public void putData(String fileContent,String session) throws Exception
{
final String myStreamName = "ClickStream";
PutRecordRequest putRecordRequest = new PutRecordRequest();
putRecordRequest.setStreamName(myStreamName);
String putData = fileContent;
putRecordRequest.setData(ByteBuffer.wrap(putData.getBytes()));
putRecordRequest.setPartitionKey("session"+session);
PutRecordResult putRecordResult = kinesisClient.putRecord(putRecordRequest);
System.out.println("Successfully putrecord, …Run Code Online (Sandbox Code Playgroud) 我在https://spark.apache.org/docs/1.2.0/streaming-kinesis-integration.html上构建了 KCL 加 Spark 基础
我在 EMR 上运行它(通过 bootstrap 安装 Spark)。我已经在流 SparkTest 上创建并进行了测试,其工作正常。我观察到没有创建 DynamoDB。我已经删除了stream和cluster。第二天,我再次创建了同名的 Kinesis Steam,并使用新启动的集群部署了我的代码。现在我得到了
5/06/12 08:17:28 ERROR worker.InitializeTask: Caught exception:
com.amazonaws.services.kinesis.model.InvalidArgumentException: StartingSequenceNumber 49551532098093284204238000035066183240246145871536717826 used in GetShardIterator on shard shardId-000000000000 in stream sparkTest under account 618673372431 is invalid because it did not come from this stream. (Service: AmazonKinesis; Status Code: 400; Error Code: InvalidArgumentException; Request ID: 770ef875-10db-11e5-b24b-af6f372168ae)
at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:1078)
at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:726)
at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:461)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClie
Run Code Online (Sandbox Code Playgroud)
我无法理解为什么会这样。如果我创建新的运动流然后进行工作,它会再次工作。是 Kinesis 的问题吗?
还有一个线程是使用https://github.com/awslabs/amazon-kinesis-connectors/issues/8进行的 ,但是我没有使用 kinesis 应用程序名称并使用以下命令创建流
KinesisUtils.createStream(
jssc, streamName, endpointUrl, kinesisCheckpointInterval, …Run Code Online (Sandbox Code Playgroud) 你好,我正在使用 pywebhdfs python lib。我通过调用并尝试在 HDFS 上创建文件来连接 EMR。我遇到了异常,这似乎与我正在执行的操作无关,因为我在这里没有达到任何连接限制。是因为 webhdfs 的工作原理吗
from pywebhdfs.webhdfs import PyWebHdfsClient
hdfs = PyWebHdfsClient(host='myhost',port='50070', user_name='hadoop')
my_data = '01010101010101010101010101010101'
my_file = 'user/hadoop/data/myfile.txt'
hdfs.create_file(my_file, my_data)
Run Code Online (Sandbox Code Playgroud)
抛出:
requests.exceptions.ConnectionError: HTTPConnectionPool(host='masterDNS', port=50070): 超过 url 的最大重试次数:/webhdfs/v1/user/hadoop/data/myfile.txt?op=CREATE&user.name=hadoop (由NewConnectionError(': 无法建立新连接: [Errno 115] 操作正在进行中',))
我试图找出可以将数据写入kinesis的托管服务.用这种方式我需要我的消息应该至少有一个交付到kinesis流.是建议还是好主意使用SQS写入Kinesis.我正在寻找可以水平扩展的解决方案.
apache-spark ×3
amazon-emr ×2
emr ×2
amazon-s3 ×1
amazon-sqs ×1
go ×1
godeps ×1
hadoop ×1
hadoop-yarn ×1
java ×1
rabbitmq ×1
terraform ×1
webhdfs ×1