我们通过套接字服务器处理大量数据流,并且需要一种非阻塞的方式来管理回调以防止竞争条件.
最近我开始了解功能反应式编程是一种编程方法,解决方案正是我们所寻求的.
Haskell(反应香蕉),ClojureScript和Javascript(培根js)中有一些例子,但没有python的例子.是否有为Python启用功能反应式编程的库?如果没有任何图书馆,哪里有一个好的起点?写一个可能遇到的挑战是什么?
python asynchronous functional-programming reactive-programming
我想知道Reactive Programming如何与Functional-Programming联系起来.
大多数论文将"反应式编程"称为"功能反应式编程".
反应式编程是否可以在功能编程之外实现?
用功能语言编写反应式程序更容易吗?
java paradigms functional-programming scala reactive-programming
我一直在我的应用程序中使用事件总线(即:greenrobot/EventBus).但我发现使用事件总线有一些缺点:
我一直在研究处理这个问题的新技术.我读了很多关于RxJava的内容,并想知道它是不是一个解决方案.
所以关于RxJava的问题(基于我最近阅读的内容):
我正在执行一些测试来评估使用基于Observables的反应API是否真的有优势,而不是阻止传统的API.
整个例子可以在Githug上找到
令人惊讶的是,结果表明,输出结果是:
最好的:返回包含阻塞操作的Callable/的REST服务DeferredResult.
还不错:阻止REST服务.
最糟糕的:返回DeferredResult的REST服务,其结果由RxJava Observable设置.
这是我的Spring WebApp:
申请:
@SpringBootApplication
public class SpringNioRestApplication {
@Bean
public ThreadPoolTaskExecutor executor(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
return executor;
}
public static void main(String[] args) {
SpringApplication.run(SpringNioRestApplication.class, args);
}
}
Run Code Online (Sandbox Code Playgroud)
SyncController:
@RestController("SyncRestController")
@Api(value="", description="Synchronous data controller")
public class SyncRestController {
@Autowired
private DataService dataService;
@RequestMapping(value="/sync/data", method=RequestMethod.GET, produces="application/json")
@ApiOperation(value = "Gets data", notes="Gets data synchronously")
@ApiResponses(value={@ApiResponse(code=200, message="OK")})
public List<Data> getData(){ …Run Code Online (Sandbox Code Playgroud) 我正在努力使用Observables创建一个倒计时器,http: //reactivex.io/documentation/operators/timer.html上的例子似乎不起作用.在此特定示例中,与timerInterval相关的错误不是从计时器返回的Observable的函数.
我也一直在尝试其他方法,我提出的最好的方法是:
Observable.interval(1000).take(10).subscribe(x => console.log(x));
Run Code Online (Sandbox Code Playgroud)
这里的问题是它从0到10计数,我想要一个倒数计时器,例如10,9,8 ... 0.
我也尝试了这个但是timerObservable类型不存在
Observable.range(10, 0).timer(1000).subscribe(x => console.log(x));
Run Code Online (Sandbox Code Playgroud)
同样,它根本不产生任何输出.
Observable.range(10, 0).debounceTime(1000).subscribe(x => console.log(x));
Run Code Online (Sandbox Code Playgroud)
为了澄清我需要帮助ReactiveX的RxJS实现,而不是MircoSoft版本.
让我们假设控制器的这两种情况产生一些延迟的随机数:
1)Reactive Spring 5反应性应用:
@GetMapping("/randomNumbers")
public Flux<Double> getReactiveRandomNumbers() {
return generateRandomNumbers(10, 500);
}
/**
* Non-blocking randon number generator
* @param amount - # of numbers to generate
* @param delay - delay between each number generation in milliseconds
* @return
*/
public Flux<Double> generateRandomNumbers(int amount, int delay){
return Flux.range(1, amount)
.delayMillis(delay)
.map(i -> Math.random());
}
Run Code Online (Sandbox Code Playgroud)
2)传统的Spring MVC DeferredResult:
@GetMapping("/randomNumbers")
public DeferredResult<Double[]> getReactiveRandomNumbers() {
DeferredResult<Double[]> dr = new DeferredResult<Double[]>();
CompletableFuture.supplyAsync(() -> {
return generateRandomNumbers(10, 500);
}).whenCompleteAsync((p1, p2) -> { …Run Code Online (Sandbox Code Playgroud) 如何在RxJava版本2中将Observable转换为Publisher?
在第一个版本中,我们有https://github.com/ReactiveX/RxJavaReactiveStreams项目,它完全符合我的需要.但是我怎么能在RxJava 2中做到这一点?
java reactive-programming reactive-streams reactivex rx-java2
我正在玩rxJava/rxAndroid,并且有一些非常基本的东西不像我期望的那样.我有一个可观察的和两个订阅者:
Observable<Integer> dataStream = Observable.just(1, 2, 3).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
Log.d(TAG, "subscribing sub1...");
dataStream.subscribe(v -> System.out.println("Subscriber #1: "+ integer));
Log.d(TAG, "subscribing sub2...");
dataStream.subscribe(v -> System.out.println("Subscriber #2: "+ integer));
Run Code Online (Sandbox Code Playgroud)
这是输出:
D/RxJava: subscribing sub1...
D/RxJava: subscribing sub2...
D/RxJava: Subscriber #1: 1
D/RxJava: Subscriber #1: 2
D/RxJava: Subscriber #1: 3
D/RxJava: Subscriber #2: 1
D/RxJava: Subscriber #2: 2
D/RxJava: Subscriber #2: 3
Run Code Online (Sandbox Code Playgroud)
现在,我知道我可以通过使用避免重复计数,publish().autoConnect()但我试图首先理解这种默认行为.每当有人订阅observable时,它就会开始发出数字序列.我明白了.因此,当Subscriber 1连接时它开始发射物品.Subscriber 2马上连接,为什么不能获得价值呢?
这是我理解它的方式,从可观察的角度来看:
有人订阅了我,我应该开始发出物品
[订阅者:1] [项目到EMIT:1,2,3]
向订户发出项目"1"
[订阅者:1] [项目到EMIT:2,3]
有人订阅了我,当我完成后我将再次发出1,2,3
[订阅者:1&2] [项目到EMIT:2,3,1,2,3]
向订户发出项目'2'
[订阅者:1&2] [项目到EMIT:3,1,2,3]
向订户发出项目'3'
[订阅者:1&2] …
假设我有一个长期运行的python函数,看起来像这样?
import random
import time
from rx import Observable
def intns(x):
y = random.randint(5,10)
print(y)
print('begin')
time.sleep(y)
print('end')
return x
Run Code Online (Sandbox Code Playgroud)
我希望能够设置超时1000ms.
所以我就像这样,通过上面的强烈计算创建一个可观察的并映射它.
a = Observable.repeat(1).map(lambda x: intns(x))
Run Code Online (Sandbox Code Playgroud)
现在,每个值发出的,如果超过1000毫秒我就越想尽快结束观察到,当我到达1000ms使用on_error或on_completed
a.timeout(1000).subscribe(lambda x: print(x), lambda x: print(x))
Run Code Online (Sandbox Code Playgroud)
上面的语句确实得到超时和调用on_error,但它继续完成计算强烈的计算,然后才返回到下一个语句.有没有更好的方法呢?
最后一个语句打印以下内容
8 # no of seconds to sleep
begin # begins sleeping, trying to emit the first value
Timeout # operation times out, and calls on_error
end # thread waits till the function ends
Run Code Online (Sandbox Code Playgroud)
这个想法是,如果一个特定的函数超时,我希望能够继续我的程序,并忽略结果.
我想知道 …
我正在尝试查找WebClient使用的示例.我的目标是使用Spring 5 WebClient使用https和自签名证书查询REST服务
任何例子?