我是Kinesis的新手.读出我发现的文档,我可以创建Kinesis Stream来从Producer获取数据.然后使用KCL将从Stream读取此数据以进一步处理.我理解如何通过实现IRecordProcessor来编写KCL应用程序.
然而,关于如何将数据放在Kinesis流上的第一阶段对我来说仍然不清楚.我们是否有一些确实需要实现的AWS API.
场景:我有一台服务器,可以从文件夹中的各种来源连续获取数据.每个文件夹都包含文本文件,其行包含用于更快分析工作的必需属性.我必须将所有这些数据推送到Kinesis Stream.
我需要一些代码,如下面的类putData方法将用于Kinesis流中
public class Put {
AmazonKinesisClient kinesisClient;
Put()
{
String accessKey = "My Access Key here" ;
String secretKey = "My Secret Key here" ;
AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey);
kinesisClient = new AmazonKinesisClient(credentials);
kinesisClient.setEndpoint("kinesis.us-east-1.amazonaws.com", "kinesis", "us-east-1");
System.out.println("starting the Put Application");
}
public void putData(String fileContent,String session) throws Exception
{
final String myStreamName = "ClickStream";
PutRecordRequest putRecordRequest = new PutRecordRequest();
putRecordRequest.setStreamName(myStreamName);
String putData = fileContent;
putRecordRequest.setData(ByteBuffer.wrap(putData.getBytes()));
putRecordRequest.setPartitionKey("session"+session);
PutRecordResult putRecordResult = kinesisClient.putRecord(putRecordRequest);
System.out.println("Successfully putrecord, …Run Code Online (Sandbox Code Playgroud)