我需要使用Java使用Spark从DynamoDB表中获取数据.它与用户的访问密钥和密钥一起正常工作:
final JobConf jobConf = new JobConf(sc.hadoopConfiguration());
jobConf.set("dynamodb.servicename", "dynamodb");
jobConf.set("dynamodb.input.tableName", tableName);
jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat");
jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat");
jobConf.set("dynamodb.awsAccessKeyId", accessKey);
jobConf.set("dynamodb.awsSecretAccessKey", secretKey);
jobConf.set("dynamodb.endpoint", endpoint);
Run Code Online (Sandbox Code Playgroud)
我需要使用AWS假定角色和STS(至少出于安全原因)使用spark从DynamoDB中获取数据.可能吗?我发现,有可能使用假定角色访问AWS S3火花(https://issues.apache.org/jira/browse/HADOOP-12537,https://hadoop.apache.org/docs/current3/hadoop- aws/tools/hadoop-aws/index.html),但没有找到类似DynamoDB的想法.
为了接收STS临时凭证,我使用以下代码:
AWSSecurityTokenService stsClient = AWSSecurityTokenServiceClientBuilder.defaultClient();
AssumeRoleRequest assumeRequest = new AssumeRoleRequest()
.withRoleArn(roleArn) // arn:aws:iam::XXXXXXX:role/assume-role-DynamoDB-ReadOnly
.withDurationSeconds(3600)
.withRoleSessionName("assumed-role-session");
AssumeRoleResult assumeResult = stsClient.assumeRole(assumeRequest);
Credentials credentials = assumeResult.getCredentials();
Run Code Online (Sandbox Code Playgroud)
调用credentials.getAccessKeyId(), credentials.getSecretAccessKey() and credentials.getSessionToken()返回生成的临时凭证.有了这些凭据,我成功地可以使用java aws sdk AmazonDynamoDBClient(非火花方法)从DynamoDB获取数据.
火花有可能吗?spark是否允许使用以下内容:
jobConf.set("dynamodb.awsSessionToken”, sessionToken)?
我们应该永远调用processorContext.commit()在Processor执行由自己呢?我的意思是commit在计划的Punctuator实现或内部process方法中调用方法。
我们应该在哪些用例中这样做,我们是否需要这样做?这个问题与 Kafka DSL withtransform()和 Processor API 有关。
似乎 Kafka Streams 自己处理它,调用 processorContext.commit()也不能保证它会立即完成。
我们使用Kafka Streams来消费,处理和产生消息,而在PROD env上,我们面临着多个主题的错误:
ERROR org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=app-xxx-StreamThread-3-consumer, groupId=app]
Offset commit failed on partition xxx-1 at offset 13920:
The request timed out.[]
Run Code Online (Sandbox Code Playgroud)
对于负载较小的主题,这些错误很少发生,但是对于负载较高(和峰值)的主题,每个主题每天都会发生数十次错误。主题具有多个分区(例如10个)。似乎此问题不会影响数据处理(尽管有性能),因为在引发异常(即使对于相同的偏移量可能是多个错误)之后,使用者随后重新读取消息并成功处理它。
我看到此错误消息由于PR而出现在kafka-clients版本中,但是在同一用例的早期版本中(在消费者上),类似的消息()被记录为级别。对于我来说,将日志级别更新为这种用例的警告会更合乎逻辑。1.0.0kafka-clientsErrors.REQUEST_TIMED_OUTOffset commit for group {} failed: {}debug
如何解决这个问题?可能是根本原因?也许更改使用者属性或分区设置可以帮助摆脱此类问题。
我们使用以下实现来创建Kafka流:
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.<String, String>stream(topicName);
stream.foreach((key, value) -> processMessage(key, value));
Topology topology = builder.build();
StreamsConfig streamsConfig = new StreamsConfig(consumerSettings);
new KafkaStreams(streamsTopology, streamsConfig);
Run Code Online (Sandbox Code Playgroud)
我们的Kafka消费者设置:
bootstrap.servers: xxx1:9092,xxx2:9092,...,xxx5:9092
application.id: app
state.dir: /tmp/kafka-streams/xxx
commit.interval.ms: 5000 # also …Run Code Online (Sandbox Code Playgroud) 我有一个用于多个“单页应用程序”的S3存储桶,并为此存储桶启用了静态网站托管。存储桶具有以下文件夹和文件(firstApp和secondApp是文件夹,对应于不同的SPA):
firstApp
index.html
*.js & *.css files
secondApp
index.html
*.js & *.css files
Run Code Online (Sandbox Code Playgroud)
我还在S3存储桶的前面设置了CloudFront,默认的根对象设置为index.html。现在cloud-front-url/firstApp/显示器index.html从firstApp SPA和cloud-front-url/secondApp显示index.html来自secondApp。
我需要index.html根据URL路径将所有不正确的请求重定向到针对特定应用程序的不同请求,例如,如果有人请求cloud-front-url/firstApp/non-existing-path,则应将其index.html从firstApp 重定向至对第二应用程序的类似行为。
我知道如何设置CloudFront将所有404错误重定向到一个index.html:
select distribution > Error Pages > Create custom error response:
* Http error code: 404
* Error Caching Minimum TTL (seconds) : 0
* Customize response: Yes
* Response Page Path : /firstApp/index.html
* HTTP Response Code: 200
Run Code Online (Sandbox Code Playgroud)
CloudFront是否可以index.html根据请求的路径将404错误重定向到其他错误(重定向到index.htmlfirstApp或secondApp文件夹中)?
redirect amazon-s3 amazon-web-services amazon-cloudfront single-page-application
我检查了所有类似的帖子,但仍然找不到解决方案。
问题是测试类中不存在必需的请求部分“文件” 。
我想上传一个文件并将其保存到数据库中。这是我的休息控制器@RestController:
@PostMapping(value = "/upload")
public ResponseEntity<LogoDto> uploadLogo(@RequestParam("file") MultipartFile multipartFile) {
return ResponseEntity.ok(logoService.createLogo(multipartFile));
}
Run Code Online (Sandbox Code Playgroud)
和我的测试课:
@Test
public void createLogo2() throws Exception {
String toJsonLogoDto = new Gson().toJson(logoDto);
MockMultipartFile file = new MockMultipartFile("path", "url", MediaType.APPLICATION_JSON_VALUE, image);
LogoDto response = LogoDataTest.validLogoDto();
Mockito.when(logoServiceMock.createLogo(Mockito.any(MultipartFile.class))).thenReturn(response);
mockMvc.perform(MockMvcRequestBuilders.multipart("/brand-icon/upload")
.file(file)
.content(MediaType.APPLICATION_JSON_VALUE)
.contentType(MediaType.APPLICATION_JSON_VALUE)
.characterEncoding(CharEncoding.UTF_8))
.andDo(MockMvcResultHandlers.print())
.andExpect(MockMvcResultMatchers.status().isOk());
}
Run Code Online (Sandbox Code Playgroud)
我的application.yml看起来像这样:
spring:
servlet:
multipart:
enabled: true
max-file-size: 2MB
max-request-size: 10MB
Run Code Online (Sandbox Code Playgroud)
我尝试在我的@PostMapping中添加消耗;尝试设置每个 MediaTypes.. 仍然会出现错误。
我感谢您的所有回答。
spring spring-mvc spring-boot spring-restcontroller spring-rest
apache-kafka ×2
java ×2
amazon-s3 ×1
apache-spark ×1
aws-sdk ×1
hadoop ×1
redirect ×1
spring ×1
spring-boot ×1
spring-mvc ×1
spring-rest ×1