基本上整个问题都在标题中.我想知道是否可以同时从多台计算机上附加到位于HDFS上的文件?类似于存储由多个进程不断产生的事件流的东西.订单并不重要.
我记得听过谷歌技术演示文稿之一,GFS支持这样的追加功能,但尝试使用HDFS进行一些有限的测试(使用常规文件追加()或使用SequenceFile)似乎不起作用.
谢谢,
我想在从stdin提供URL时下载网页.基本上一个进程连续生成stdout/file的URL,我想将它们管道输出到wget或curl.(如果您愿意,可以将其视为简单的网络爬虫).
这似乎工作正常:
tail 1.log | wget -i - -O - -q
Run Code Online (Sandbox Code Playgroud)
但是当我使用'tail -f'并且它不再起作用时(缓冲或wget正在等待EOF?):
tail -f 1.log | wget -i - -O - -q
Run Code Online (Sandbox Code Playgroud)
任何人都可以使用wget,curl或任何其他标准Unix工具提供解决方案吗?理想情况下,我不想在循环中重新启动wget,只是让它继续运行下载URL.
我有以下任务,我的Java/Executors解决方案运行良好,但我想在Akka中实现相同的功能并寻找最佳实践建议.
问题:
从多个URL并行获取/解析数据,阻塞直到获取所有数据并返回聚合结果.应该重试错误(IOException等)达到一定次数.
到目前为止,我的实现非常简单 - 创建Fetcher actor,它知道应该获取哪些URL,它创建一堆Worker actor并发送它们URL,每个消息一个.完成特定的URL Worker后,将结果发送回Fetcher.Fetcher保持结果状态,工人无国籍.以下简化代码.
提取程序:
class Fetcher extends UntypedActor {
private ActorRef worker;
public void onReceive(Object message) throws Exception {
if (message instanceof FetchMessage) {
this.worker = context().actorOf(SpringExtension.SpringExtProvider.get(actorSystem).props("Worker")
.withRouter(new RoundRobinPool(4)), "worker");
for(URL u: urls) {
this.worker.tell(new WorkUnit(u), getSelf());
}
}
else if (message instanceof Result) {
// accumulate results
}
}
Run Code Online (Sandbox Code Playgroud)
工人:
class Worker extends UntypedActor {
public void onReceive(Object message) throws Exception {
if (message instanceof WorkUnit) {
// fetch URL, parse etc
// …Run Code Online (Sandbox Code Playgroud) 包有以下package-info.java:
@ParametersAreNonnullByDefault
package foo;
import javax.annotation.ParametersAreNonnullByDefault;
Run Code Online (Sandbox Code Playgroud)
类具有以下方法:
private static String toIsoString(@Nullable Instant dateTime) {
return dateTime == null ? null : dateTime.toString();
}
Run Code Online (Sandbox Code Playgroud)
SonarQube(版本6.2,SonarJava 4.14.0.11784)在其上发出以下警告(squid:S2583):
我怎样才能说服SonarQube代码实际上是正确的?
有趣的是,Idea中的SonarLint插件(3.0.0.2041)不会生成相同的警告.
我需要使用请求正文的 HMAC 来保护使用 Spring Boot、WebFlux 和 Spring Security 实现的 REST API。稍微简化一下,在较高的层面上 - 请求附带具有请求正文的哈希值的标头,因此我必须读取标头,读取正文,计算正文的哈希值并与标头值进行比较。
我认为我应该实现,ServerAuthenticationConverter但到目前为止我能找到的所有示例都只查看请求标头,而不是正文,我不确定是否可以只读取正文,或者应该用缓存的正文包装/改变请求那么它可以被底层组件第二次消耗吗?
可以使用以下内容吗?
public class HttpHmacAuthenticationConverter implements ServerAuthenticationConverter {
@Override
public Mono<Authentication> convert(ServerWebExchange exchange) {
exchange.getRequest().getBody()
.next()
.flatMap(dataBuffer -> {
try {
return Mono.just(StreamUtils.copyToString(dataBuffer.asInputStream(), StandardCharsets.UTF_8));
} catch (IOException e) {
return Mono.error(e);
}
})
...
Run Code Online (Sandbox Code Playgroud)
我收到来自 IDE 的警告copyToString:不适当的阻塞方法调用
有什么指导方针或例子吗?
谢谢!
我也尝试过:
@Override
public Mono<Authentication> convert(ServerWebExchange exchange) {
return Mono.justOrEmpty(exchange.getRequest().getHeaders().toSingleValueMap())
.zipWith(exchange.getRequest().getBody().next()
.flatMap(dataBuffer -> Mono.just(dataBuffer.asByteBuffer().array()))
)
.flatMap(tuple -> create(tuple.getT1(), tuple.getT2()));
Run Code Online (Sandbox Code Playgroud)
但这不起作用 - 最后一行的 create() 方法中的代码永远不会执行。
我试图了解LoadbalanceRSocketClient SpringBoot 应用程序 ( RSocketRequester)上下文中的正确配置和使用模式。
我有两个 RSocket 服务器后端(SpringBoot、RSocket 消息传递)RSocketRequester在客户端上运行和配置,如下所示:
List<LoadbalanceTarget> servers = new ArrayList<>();
for (String url: backendUrls) {
HttpClient httpClient = HttpClient.create()
.baseUrl(url)
.secure(ssl ->
ssl.sslContext(SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE)));
servers.add(LoadbalanceTarget.from(url, WebsocketClientTransport.create(httpClient, url)));
}
// RSocketRequester.Builder is autowired by Spring boot
RSocketRequester requester = builder
.setupRoute("/connect")
.setupData("test")
//.rsocketConnector(connector -> connector.reconnect(Retry.fixedDelay(60, Duration.ofSeconds(1))))
.transports(Flux.just(servers), new RoundRobinLoadbalanceStrategy());
Run Code Online (Sandbox Code Playgroud)
配置完成后,请求者将在计时器循环中重复使用,如下所示:
@Scheduled(fixedDelay = 10000, initialDelay = 1000)
public void timer() {
requester.route("/foo").data(Data).send().block();
}
Run Code Online (Sandbox Code Playgroud)
它工作 - 客户端启动,连接到其中一台服务器并将消息推送到它。如果我终止客户端连接的服务器,客户端会在下一个计时器事件中重新连接到另一台服务器。如果我再次启动第一个服务器并杀死第二个服务器,客户端将不再连接,并且在客户端观察到以下异常:
java.util.concurrent.CancellationException: Pool is exhausted
at io.rsocket.loadbalance.RSocketPool.select(RSocketPool.java:202) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.loadbalance.LoadbalanceRSocketClient.lambda$fireAndForget$0(LoadbalanceRSocketClient.java:49) …Run Code Online (Sandbox Code Playgroud)