cyk*_*ath 21 java asynchronous redis jedis lettuce
我正在使用redis与Akka,所以我不需要阻止调用.生菜具有内置的异步未来调用.但是Jedis是Redis的推荐客户.如果我以正确的方式使用它们,有人可以告诉我.如果是这样哪一个更好.
JEDIS 我使用静态Jedis连接池来获取con并使用Akka将来的回调来处理结果.我关心的是当我使用另一个线程(可调用)来获取线程最终将阻塞结果的结果时.虽然生菜可能有一些更有效的方法来做到这一点.
private final class OnSuccessExtension extends OnSuccess<String> {
private final ActorRef senderActorRef;
private final Object message;
@Override
public void onSuccess(String valueRedis) throws Throwable {
log.info(getContext().dispatcher().toString());
senderActorRef.tell((String) message, ActorRef.noSender());
}
public OnSuccessExtension(ActorRef senderActorRef,Object message) {
this.senderActorRef = senderActorRef;
this.message=message;
}
}
ActorRef senderActorRef = getSender(); //never close over a future
if (message instanceof String) {
Future<String> f =akka.dispatch.Futures.future(new Callable<String>() {
public String call() {
String result;
try(Jedis jedis=JedisWrapper.redisPool.getResource()) {
result = jedis.get("name");
}
return result;
}
}, ex);
f.onSuccess(new OnSuccessExtension(senderActorRef,message), ex);
}
Run Code Online (Sandbox Code Playgroud)
生菜
ExecutorService executorService = Executors.newFixedThreadPool(10);
public void onReceive(Object message) throws Exception {
ActorRef senderActorRef = getSender(); //never close over a future
if (message instanceof String) {
final RedisFuture<String> future = lettuce.connection.get("name");
future.addListener(new Runnable() {
final ActorRef sender = senderActorRef;
final String msg =(String) message;
@Override
public void run() {
try {
String value = future.get();
log.info(value);
sender.tell(message, ActorRef.noSender());
} catch (Exception e) {
}
}
}, executorService);
Run Code Online (Sandbox Code Playgroud)
如果生菜是Async调用的更好选择.那么我应该在生产环境中使用什么类型的执行程序.如果可能,我可以使用Akka调度程序作为Letture future调用的执行上下文.
mp9*_*1de 46
你的问题没有一个答案,因为它取决于你.
Jedis和生菜都是成熟的客户.要完成Java客户端列表,还有Redisson,它添加了另一层抽象(Collection/Queue/Lock/...接口而不是原始Redis命令).
这在很大程度上取决于您与客户的合作方式.通常,Redis在数据访问方面是单线程的,因此通过并发获得的唯一好处是将协议和I/O工作卸载到不同的线程.这对于莴苣和Redisson来说并不完全正确,因为它们在引擎盖下使用netty(netty将一个套接字通道绑定到特定的事件循环线程).
使用Jedis,您一次只能使用一个连接.这与Akka actor模型很好地相关,因为一个actor实例一次只被一个线程占用.
另一方面,您需要与处理特定actor的线程一样多的Jedis连接.如果您开始在不同的actor之间共享Jedis连接,则可以选择连接池,也可以为每个actor实例提供专用的Jedis连接.请记住,您需要自己处理重新连接(一旦Redis连接断开).
使用Redisson和生菜,如果你愿意,你可以获得透明的重新连接(这是生菜的默认值,不确定Redisson).
通过使用莴苣和Redisson,您可以在所有参与者之间共享一个连接,因为它们是线程安全的.在两种情况下,您不能共享一个生菜连接:
MULTI/ EXEC,因为你会将不同的操作与事务混合在一起,这当然是你不想这样做的事情)Jedis没有异步接口,因此您需要自己完成此操作.这是可行的,我做了类似于MongoDB的事情,将I/O部分卸载/解耦给其他actor.您可以使用代码中的方法,但是您不需要提供自己的执行程序服务,因为您在runnable侦听器中执行非阻塞操作.
使用生菜4.0,您将获得Java 8支持(由于CompletionStage接口,这在异步API方面更好),您甚至可以使用RxJava(反应式编程)来实现并发性.
生菜对你的并发模型没有意见.它允许您根据需要使用它,除了Java 6/7 的普通Future/ ListenableFutureAPI和Guava不是很好用.
HTH,马克
尝试一下Redisson框架。它提供异步 API 以及通过与 Project Reactor 和 RxJava3 库集成支持的反应流 API。
异步API使用示例:
RedissonClient client = Redisson.create(config);
RMap<String, String> map = client.getMap("myMap");
// implements CompletionStage interface
RFuture<String> future = map.get("myKey");
future.whenComplete((res, exception) -> {
// ...
});
Run Code Online (Sandbox Code Playgroud)
Reactive Streams API 与 Project Reactor lib 使用示例:
RedissonReactiveClient client = Redisson.createReactive(config);
RMapReactive<String, String> map = client.getMap("myMap");
Mono<String> resp = map.get("myKey");
Run Code Online (Sandbox Code Playgroud)
带有 RxJava2 lib 的 Reactive Streams API 使用示例:
RedissonRxClient client = Redisson.createRx(config);
RMapRx<String, String> map = client.getMap("myMap");
Flowable<String> resp = map.get("myKey");
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
15061 次 |
| 最近记录: |