我正在寻找缓存地图中的信息,所以我不必保持,例如,击中数据库.在Java中我会使用Google集合的优秀MapMaker并设置一个到期时间,以保持缓存尽可能新鲜,以及softValues,以减少内存使用量.然后我有一个函数来计算当前未缓存的键的值.
MapMaker().softValues
.expireAfterWrite(10, TimeUnit.MINUTES)
.makeComputingMap(Function(...));
Run Code Online (Sandbox Code Playgroud)
在Scala中执行此操作的最佳方法是什么?
我正在将多个Observable压缩在一起,然后以导致Observable的方式对它们进行转换:
final Observable<Observable<M>> result = Observable.zip(obs1, obs2, transformFunc);
Run Code Online (Sandbox Code Playgroud)
我希望能做的是:
final Observable<M> result = Observable.flatZip(obs1, obs2, transformFunc);
Run Code Online (Sandbox Code Playgroud)
这是最干净的方法,因为flatZip不存在(也许我应该提交一个).目前我不得不将结果平面映射到自身.
我正在寻找将一个人转换ListenableFuture<Iterable<A>>成一系列个人的最好方法ListenableFutures.这是我正在寻找的那种方法签名:
public <A, B> Iterable<ListenableFuture<B>> splitAndRun(
final ListenableFuture<Iterable<A>> elements,
final Function<A, B> func,
final ListeningExecutorService executor
);
Run Code Online (Sandbox Code Playgroud)
显然,如果我回来ListenableFuture<Iterable<ListenableFuture<B>>>,我可以做到,但我觉得我应该能够分裂并运行它并保持其异步性.
这是我到目前为止的代码,但你会注意到最后的讨厌.get(),这会破坏事物.如果我的事情过于复杂,请原谅.
public class CallableFunction<I, O> implements Callable<O>{
private final I input;
private final Function<I, O> func;
public CallableFunction(I input, Function<I, O> func) {
this.input = input;
this.func = func;
}
@Override public O call() throws Exception {
return func.apply(input);
}
}
public <A, B> Iterable<ListenableFuture<B>> splitAndRun(
final ListenableFuture<Iterable<A>> elements,
final Function<A, B> func,
final …Run Code Online (Sandbox Code Playgroud) 鉴于 Hystrix 进入维护模式,我一直致力于将(相当大的)代码库迁移到 Resilience4j。
我在 Hystrix 中大量使用以下模式:
new HystrixCommand<SomeReturnValue>(DependencyKeys.DEPENDENCY) {
@Override
protected SomeReturnValue run() {
return someExpensiveCall();
}
}
.observe()
Run Code Online (Sandbox Code Playgroud)
我想用 Resilience4j 复制 Hystrix 的一些功能。
到目前为止,我有以下语法来连接外部调用:
resilience.single(DependencyKeys.DEPENDENCY, this::someExpensiveCall);
Run Code Online (Sandbox Code Playgroud)
其中Resilience类提供single方法:
public <T> Single<T> single(ResilienceKey key, Callable<T> callable) {
return Completable.complete()
.subscribeOn(Schedulers.computation())
.observeOn(configuration.scheduler(key))
.andThen(Single.defer(() -> Single.fromCallable(callable)
.lift(CircuitBreakerOperator.of(configuration.circuitBreaker(key)))
.lift(RateLimiterOperator.of(configuration.rateLimiter(key)))
.lift(BulkheadOperator.of(configuration.bulkhead(key)))
))
.observeOn(Schedulers.computation());
}
Run Code Online (Sandbox Code Playgroud)
在断路和在不同线程池上运行代码方面,这看起来如何更好地类似于 Hystrix,但以更理智的方式。我真的不喜欢用 just 来启动链,这样我就可以在实际的可调用对象被包装之前Completable.complete()强制使用 a 。observeOn
虽然我知道有几种方法可以做到这一点,但我最感兴趣的是找到最惯用和最实用的Scala方法.
给出以下陈腐的例子:
case class User(id: String)
val users = List(User("1"), User("2"), User("3"), User("4"))
Run Code Online (Sandbox Code Playgroud)
什么是创建user.id - > User的不可变查找Map的最佳方法,以便我可以通过user.id执行快速查找.
在Java中,我可能会使用Google-Collection的Maps.uniqueIndex,尽管我不太关心它的独特属性.
是否可以在Scala中获取泛型类的类型名称?我知道在类型擦除的Java中这是不可能的,但我希望Scala是一个不同的情况.
目前我必须做类似的事情:
trait Model
case class User(id: String) extends Model
def fromMap[M<:Model : Manifest](data: Map[String, String], modelType: String) = {
modelType match {
case "user" => User(data.get("id").get)
}
}
val user = fromMap[User](Map("id" -> "id1"), "user")
Run Code Online (Sandbox Code Playgroud)
显然,如果我可以在不必将其传入的情况下解决"用户"问题会更容易.
是否可以拦截对System.out.print*和System.err.print*(在Java中)的调用并为它们添加时间戳?不用担心,我们使用通常的日志记录框架,但偶尔会有一些sys.out漏出来,知道它何时发生会很好,所以我们可以把它绑定到正确的日志文件.
我有一个Observable发出文件行(从GCS读取许多GB).
return StringObservable.byLine(
Observable.using(
() -> storage.get(blobId).reader(),
reader -> Observable.create(
new OnSubscribeReadChannel(reader, 64 * 1024)
),
ReadChannel::close
)
)
Run Code Online (Sandbox Code Playgroud)
每行产生多个(在某些情况下很多)调用各种DB,所有这些都包含在Hystrix命令中.显然这些线最终压倒了Hystrix命令,电路开始打开,每个人都有糟糕的一天.
这大致就是我正在做的事情:
readLinesFromCloudStorageFile.readLines(blobInfo.getBlobId()))
.map(this::deserializeLine)
.flatMap(this::addDataToObjectFromSomeDb)
.flatMap(this::writeObj)
.map(Set::size)
.reduce(0, (a, b) -> a + b)
.toBlocking().single()
Run Code Online (Sandbox Code Playgroud)
有没有办法可以应用一些背压,或限制一次处理的线数?
我有一个非常标准的 API 分页问题,您可以通过一些简单的递归来处理。这是一个虚构的例子:
public Observable<List<Result>> scan() {
return scanPage(Optional.empty(), ImmutableList.of());
}
private Observable<?> scanPage(Optional<KEY> startKey, List<Result> results) {
return this.scanner.scan(startKey, LIMIT)
.flatMap(page -> {
if (!page.getLastKey().isPresent()) {
return Observable.just(results);
}
return scanPage(page.getLastKey(), ImmutableList.<Result>builder()
.addAll(results)
.addAll(page.getResults())
.build()
);
});
}
Run Code Online (Sandbox Code Playgroud)
但这显然会创建一个庞大的调用堆栈。如何强制执行此操作但维护 Observable 流?
这是一个命令式阻塞示例:
public List<Result> scan() {
Optional<String> startKey = Optional.empty();
final ImmutableList.Builder<Result> results = ImmutableList.builder();
do {
final Page page = this.scanner.scan(startKey);
startKey = page.getLastKey();
results.addAll(page.getResults());
} while (startKey.isPresent());
return results.build();
}
Run Code Online (Sandbox Code Playgroud) java ×5
observable ×3
rx-java ×3
scala ×3
asynchronous ×2
collections ×2
hystrix ×2
concurrency ×1
future ×1
generics ×1
guava ×1
java-8 ×1
pagination ×1
resilience4j ×1
rx-java2 ×1
system.out ×1