小编max*_*dim的帖子

是否可以并行追加多个客户端的HDFS文件?

基本上整个问题都在标题中.我想知道是否可以同时从多台计算机上附加到位于HDFS上的文件?类似于存储由多个进程不断产生的事件流的东西.订单并不重要.

我记得听过谷歌技术演示文稿之一,GFS支持这样的追加功能,但尝试使用HDFS进行一些有限的测试(使用常规文件追加()或使用SequenceFile)似乎不起作用.

谢谢,

hadoop gfs hdfs

21
推荐指数
2
解决办法
1万
查看次数

来自stdin的wget或curl

我想在从stdin提供URL时下载网页.基本上一个进程连续生成stdout/file的URL,我想将它们管道输出到wget或curl.(如果您愿意,可以将其视为简单的网络爬虫).

这似乎工作正常:

tail 1.log | wget -i - -O - -q 
Run Code Online (Sandbox Code Playgroud)

但是当我使用'tail -f'并且它不再起作用时(缓冲或wget正在等待EOF?):

tail -f 1.log | wget -i - -O - -q
Run Code Online (Sandbox Code Playgroud)

任何人都可以使用wget,curl或任何其他标准Unix工具提供解决方案吗?理想情况下,我不想在循环中重新启动wget,只是让它继续运行下载URL.

unix curl stdin wget xargs

15
推荐指数
2
解决办法
8818
查看次数

处理Akka演员异常的最佳实践

我有以下任务,我的Java/Executors解决方案运行良好,但我想在Akka中实现相同的功能并寻找最佳实践建议.

问题:

从多个URL并行获取/解析数据,阻塞直到获取所有数据并返回聚合结果.应该重试错误(IOException等)达到一定次数.

到目前为止,我的实现非常简单 - 创建Fetcher actor,它知道应该获取哪些URL,它创建一堆Worker actor并发送它们URL,每个消息一个.完成特定的URL Worker后,将结果发送回Fetcher.Fetcher保持结果状态,工人无国籍.以下简化代码.

提取程序:

class Fetcher extends UntypedActor {
  private ActorRef worker;

  public void onReceive(Object message) throws Exception {
    if (message instanceof FetchMessage) {
      this.worker = context().actorOf(SpringExtension.SpringExtProvider.get(actorSystem).props("Worker")
              .withRouter(new RoundRobinPool(4)), "worker");
      for(URL u: urls) {
        this.worker.tell(new WorkUnit(u), getSelf());
      }
   }
   else if (message instanceof Result) {
     // accumulate results
   }
}
Run Code Online (Sandbox Code Playgroud)

工人:

class Worker extends UntypedActor {

  public void onReceive(Object message) throws Exception {
    if (message instanceof WorkUnit) {
      // fetch URL, parse etc
      // …
Run Code Online (Sandbox Code Playgroud)

java akka

9
推荐指数
1
解决办法
4041
查看次数

@Nullable和SonarQube'有条件执行的块应该可以访问'警告

包有以下package-info.java:

@ParametersAreNonnullByDefault
package foo;
import javax.annotation.ParametersAreNonnullByDefault;
Run Code Online (Sandbox Code Playgroud)

类具有以下方法:

private static String toIsoString(@Nullable Instant dateTime) {
  return dateTime == null ? null : dateTime.toString();
}
Run Code Online (Sandbox Code Playgroud)

SonarQube(版本6.2,SonarJava 4.14.0.11784)在其上发出以下警告(squid:S2583):

在此输入图像描述

我怎样才能说服SonarQube代码实际上是正确的?

有趣的是,Idea中的SonarLint插件(3.0.0.2041)不会生成相同的警告.

java nullable notnull sonarqube

9
推荐指数
1
解决办法
1176
查看次数

Spring WebFlux、安全性和请求正文

我需要使用请求正文的 HMAC 来保护使用 Spring Boot、WebFlux 和 Spring Security 实现的 REST API。稍微简化一下,在较高的层面上 - 请求附带具有请求正文的哈希值的标头,因此我必须读取标头,读取正文,计算正文的哈希值并与标头值进行比较。

我认为我应该实现,ServerAuthenticationConverter但到目前为止我能找到的所有示例都只查看请求标头,而不是正文,我不确定是否可以只读取正文,或者应该用缓存的正文包装/改变请求那么它可以被底层组件第二次消耗吗?

可以使用以下内容吗?

public class HttpHmacAuthenticationConverter implements ServerAuthenticationConverter {

    @Override
    public Mono<Authentication> convert(ServerWebExchange exchange) {
        exchange.getRequest().getBody()
                .next()
                .flatMap(dataBuffer -> {
                    try {
                        return Mono.just(StreamUtils.copyToString(dataBuffer.asInputStream(), StandardCharsets.UTF_8));
                    } catch (IOException e) {
                        return Mono.error(e); 
                    }
                })
                ...


Run Code Online (Sandbox Code Playgroud)

我收到来自 IDE 的警告copyToString不适当的阻塞方法调用

有什么指导方针或例子吗?

谢谢!

我也尝试过:

    @Override
    public Mono<Authentication> convert(ServerWebExchange exchange) {
        return Mono.justOrEmpty(exchange.getRequest().getHeaders().toSingleValueMap())
                .zipWith(exchange.getRequest().getBody().next()
                        .flatMap(dataBuffer -> Mono.just(dataBuffer.asByteBuffer().array()))
                )
                .flatMap(tuple -> create(tuple.getT1(), tuple.getT2()));

Run Code Online (Sandbox Code Playgroud)

但这不起作用 - 最后一行的 create() 方法中的代码永远不会执行。

spring spring-security spring-webflux

5
推荐指数
1
解决办法
2397
查看次数

正确使用 LoadbalanceRSocketClient 和 Spring 的 RSocketRequester

我试图了解LoadbalanceRSocketClient SpringBoot 应用程序 ( RSocketRequester)上下文中的正确配置和使用模式。

我有两个 RSocket 服务器后端(SpringBoot、RSocket 消息传递)RSocketRequester在客户端上运行和配置,如下所示:

List<LoadbalanceTarget> servers = new ArrayList<>();
for (String url: backendUrls) {
  HttpClient httpClient = HttpClient.create()
    .baseUrl(url)
    .secure(ssl -> 
       ssl.sslContext(SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE)));
  servers.add(LoadbalanceTarget.from(url, WebsocketClientTransport.create(httpClient, url)));
}

// RSocketRequester.Builder is autowired by Spring boot
RSocketRequester requester = builder
  .setupRoute("/connect")
  .setupData("test")
  //.rsocketConnector(connector -> connector.reconnect(Retry.fixedDelay(60, Duration.ofSeconds(1))))
 .transports(Flux.just(servers), new RoundRobinLoadbalanceStrategy());   
Run Code Online (Sandbox Code Playgroud)

配置完成后,请求者将在计时器循环中重复使用,如下所示:

@Scheduled(fixedDelay = 10000, initialDelay = 1000)
public void timer() {
  requester.route("/foo").data(Data).send().block();
}
Run Code Online (Sandbox Code Playgroud)

它工作 - 客户端启动,连接到其中一台服务器并将消息推送到它。如果我终止客户端连接的服务器,客户端会在下一个计时器事件中重新连接到另一台服务器。如果我再次启动第一个服务器并杀死第二个服务器,客户端将不再连接,并且在客户端观察到以下异常:

java.util.concurrent.CancellationException: Pool is exhausted
    at io.rsocket.loadbalance.RSocketPool.select(RSocketPool.java:202) ~[rsocket-core-1.1.0.jar:na]
    at io.rsocket.loadbalance.LoadbalanceRSocketClient.lambda$fireAndForget$0(LoadbalanceRSocketClient.java:49) …
Run Code Online (Sandbox Code Playgroud)

spring-messaging rsocket rsocket-java

3
推荐指数
1
解决办法
596
查看次数