标签: reactive-programming

为了演示功能反应式编程而编写了哪些Python库?

我们通过套接字服务器处理大量数据流,并且需要一种非阻塞的方式来管理回调以防止竞争条件.

最近我开始了解功能反应式编程是一种编程方法,解决方案正是我们所寻求的.

Haskell(反应香蕉),ClojureScript和Javascript(培根js)中有一些例子,但没有python的例子.是否有为Python启用功能反应式编程的库?如果没有任何图书馆,哪里有一个好的起点?写一个可能遇到的挑战是什么?

python asynchronous functional-programming reactive-programming

14
推荐指数
2
解决办法
3710
查看次数

反应式编程是否与函数式编程有关?

我想知道Reactive Programming如何与Functional-Programming联系起来.

大多数论文将"反应式编程"称为"功能反应式编程".

反应式编程是否可以在功能编程之外实现?

用功能语言编写反应式程序更容易吗?

java paradigms functional-programming scala reactive-programming

14
推荐指数
1
解决办法
3089
查看次数

Android优缺点:Event Bus和RxJava

我一直在我的应用程序中使用事件总线(即:greenrobot/EventBus).但我发现使用事件总线有一些缺点:

  • 链接任务执行很困难
  • 很多类来表示事件
  • 不太清晰的代码(嗯,它仍然可以跟踪,但不是很清楚)

我一直在研究处理这个问题的新技术.我读了很多关于RxJava的内容,并想知道它是不是一个解决方案.

所以关于RxJava的问题(基于我最近阅读的内容):

  • RxJava观察员可以随时注册吗?所以不只是在创建Observable时.使用EventBus这是可能的,我可以随时订阅,而不仅仅是在创建Observable时.
  • 如何处理两个或多个发布相同类型事件的发布者(例如:导航事件)?
  • 紧密耦合发布者和订阅者意味着我必须每次都明确指定发布者.所以我不仅要担心事件的类型,还要担心发起者.使用EventBus,我只需要担心事件的类型而不是发起者.

android reactive-programming event-bus rx-java

14
推荐指数
1
解决办法
4434
查看次数

使用Web应用程序中的RxJava Observables无法解决性能提升的不可解决性

我正在执行一些测试来评估使用基于Observables的反应API是否真的有优势,而不是阻止传统的API.

整个例子可以在Githug上找到

令人惊讶的是,结果表明,输出结果是:

  • 最好的:返回包含阻塞操作的Callable/的REST服务DeferredResult.

  • 还不错:阻止REST服务.

  • 最糟糕的:返回DeferredResult的REST服务,其结果由RxJava Observable设置.

这是我的Spring WebApp:

申请:

@SpringBootApplication
public class SpringNioRestApplication {

   @Bean
    public ThreadPoolTaskExecutor executor(){
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(20);
        return executor;
    }

    public static void main(String[] args) {
        SpringApplication.run(SpringNioRestApplication.class, args);
    }
}
Run Code Online (Sandbox Code Playgroud)

SyncController:

@RestController("SyncRestController")
@Api(value="", description="Synchronous data controller")
public class SyncRestController {

    @Autowired
    private DataService dataService;

    @RequestMapping(value="/sync/data", method=RequestMethod.GET, produces="application/json")
    @ApiOperation(value = "Gets data", notes="Gets data synchronously")
    @ApiResponses(value={@ApiResponse(code=200, message="OK")})
    public List<Data> getData(){ …
Run Code Online (Sandbox Code Playgroud)

java netflix reactive-programming rx-java

14
推荐指数
1
解决办法
3094
查看次数

如何使用RxJS Observables制作倒数计时器?

我正在努力使用Observables创建一个倒计时器,http: //reactivex.io/documentation/operators/timer.html上的例子似乎不起作用.在此特定示例中,与timerInterval相关的错误不是从计时器返回的Observable的函数.

我也一直在尝试其他方法,我提出的最好的方法是:

Observable.interval(1000).take(10).subscribe(x => console.log(x));
Run Code Online (Sandbox Code Playgroud)

这里的问题是它从0到10计数,我想要一个倒数计时器,例如10,9,8 ... 0.

我也尝试了这个但是timerObservable类型不存在

Observable.range(10, 0).timer(1000).subscribe(x => console.log(x));
Run Code Online (Sandbox Code Playgroud)

同样,它根本不产生任何输出.

Observable.range(10, 0).debounceTime(1000).subscribe(x => console.log(x));
Run Code Online (Sandbox Code Playgroud)

为了澄清我需要帮助ReactiveX的RxJS实现,而不是MircoSoft版本.

reactive-programming observable rxjs

14
推荐指数
3
解决办法
2万
查看次数

Web反应式编程 - 从HTTP客户端的角度来看,有哪些优势?

让我们假设控制器的这两种情况产生一些延迟的随机数:

1)Reactive Spring 5反应性应用:

@GetMapping("/randomNumbers")
public Flux<Double> getReactiveRandomNumbers() {
    return generateRandomNumbers(10, 500);
}

/**
 * Non-blocking randon number generator
 * @param amount - # of numbers to generate
 * @param delay - delay between each number generation in milliseconds
 * @return
 */
public Flux<Double> generateRandomNumbers(int amount, int delay){
    return Flux.range(1, amount)
               .delayMillis(delay)
               .map(i -> Math.random());
}
Run Code Online (Sandbox Code Playgroud)

2)传统的Spring MVC DeferredResult:

@GetMapping("/randomNumbers")
public DeferredResult<Double[]> getReactiveRandomNumbers() {
    DeferredResult<Double[]> dr = new DeferredResult<Double[]>();

    CompletableFuture.supplyAsync(() -> {
        return generateRandomNumbers(10, 500);
    }).whenCompleteAsync((p1, p2) -> { …
Run Code Online (Sandbox Code Playgroud)

spring reactive-programming rx-java project-reactor

14
推荐指数
1
解决办法
2248
查看次数

RxJava 2.0 - 如何将Observable转换为Publisher

如何在RxJava版本2中将Observable转换为Publisher?

在第一个版本中,我们有https://github.com/ReactiveX/RxJavaReactiveStreams项目,它完全符合我的需要.但是我怎么能在RxJava 2中做到这一点?

java reactive-programming reactive-streams reactivex rx-java2

14
推荐指数
1
解决办法
9249
查看次数

RxJava,一个可观察的多个订阅者:publish().autoConnect()

我正在玩rxJava/rxAndroid,并且有一些非常基本的东西不像我期望的那样.我有一个可观察的和两个订阅者:

Observable<Integer> dataStream = Observable.just(1, 2, 3).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());

Log.d(TAG, "subscribing sub1...");
dataStream.subscribe(v -> System.out.println("Subscriber #1: "+ integer));

Log.d(TAG, "subscribing sub2...");
dataStream.subscribe(v -> System.out.println("Subscriber #2: "+ integer));
Run Code Online (Sandbox Code Playgroud)

这是输出:

D/RxJava: subscribing sub1...
D/RxJava: subscribing sub2...
D/RxJava: Subscriber #1: 1
D/RxJava: Subscriber #1: 2
D/RxJava: Subscriber #1: 3
D/RxJava: Subscriber #2: 1
D/RxJava: Subscriber #2: 2
D/RxJava: Subscriber #2: 3
Run Code Online (Sandbox Code Playgroud)

现在,我知道我可以通过使用避免重复计数,publish().autoConnect()但我试图首先理解这种默认行为.每当有人订阅observable时,它就会开始发出数字序列.我明白了.因此,当Subscriber 1连接时它开始发射物品.Subscriber 2马上连接,为什么不能获得价值呢?

这是我理解它的方式,从可观察的角度来看:

  1. 有人订阅了我,我应该开始发出物品
    [订阅者:1] [项目到EMIT:1,2,3]

  2. 向订户发出项目"1"
    [订阅者:1] [项目到EMIT:2,3]

  3. 有人订阅了我,当我完成后我将再次发出1,2,3
    [订阅者:1&2] [项目到EMIT:2,3,1,2,3]

  4. 向订户发出项目'2'
    [订阅者:1&2] [项目到EMIT:3,1,2,3]

  5. 向订户发出项目'3'
    [订阅者:1&2] …

reactive-programming rx-java

14
推荐指数
2
解决办法
1万
查看次数

如何使用rxpython超时长时间运行的程序?

假设我有一个长期运行的python函数,看起来像这样?

import random
import time
from rx import Observable
def intns(x):
    y = random.randint(5,10)
    print(y)
    print('begin')
    time.sleep(y)
    print('end')
    return x
Run Code Online (Sandbox Code Playgroud)

我希望能够设置超时1000ms.

所以我就像这样,通过上面的强烈计算创建一个可观察的并映射它.

a = Observable.repeat(1).map(lambda x: intns(x))
Run Code Online (Sandbox Code Playgroud)

现在,每个值发出的,如果超过1000毫秒我就越想尽快结束观察到,当我到达1000ms使用on_erroron_completed

a.timeout(1000).subscribe(lambda x: print(x), lambda x: print(x))
Run Code Online (Sandbox Code Playgroud)

上面的语句确实得到超时和调用on_error,但它继续完成计算强烈的计算,然后才返回到下一个语句.有没有更好的方法呢?

最后一个语句打印以下内容

8 # no of seconds to sleep
begin # begins sleeping, trying to emit the first value
Timeout # operation times out, and calls on_error
end # thread waits till the function ends
Run Code Online (Sandbox Code Playgroud)

这个想法是,如果一个特定的函数超时,我希望能够继续我的程序,并忽略结果.

我想知道 …

python reactive-programming system.reactive rx-py

14
推荐指数
1
解决办法
374
查看次数

使用ssl的Spring 5 WebClient

我正在尝试查找WebClient使用的示例.我的目标是使用Spring 5 WebClient使用https和自签名证书查询REST服务

任何例子?

ssl spring self-signed reactive-programming spring-webflux

14
推荐指数
3
解决办法
1万
查看次数