我一直在努力试探最近的.NET Reactive Extensions,但是已经触及了一些概念墙:我无法弄清楚为什么IObservable.First()会阻塞.
我有一些示例代码看起来像这样:
var a = new ListItem(a);
var b = new ListItem(b);
var c = new ListItem(c);
var d = new ListItem(d);
var observableList = new List<ListItem> { a,b,c,d }.ToObservable();
var itemA = observableList.First();
// Never reached
Assert.AreEqual(expectedFoo, itemA.Foo);
Run Code Online (Sandbox Code Playgroud)
我期待发生的事情是itemA在引用上等于a并且能够访问其成员等.相反,发生的是First()块和Assert.AreEqual()从未到达的块.
现在,我知道在使用Rx时,代码应该Subscribe()是IObservables,所以很可能我在这里做错了.但是,根据各种方法签名,不可能执行以下任一操作:
observableList.First().Subscribe(item => Assert.AreEqual(expectedFoo, item));
Run Code Online (Sandbox Code Playgroud)
要么
observableList.Subscribe(SomeMethod).First() // This doesn't even make sense, right?
Run Code Online (Sandbox Code Playgroud)
我错过了什么?
我正在研究一些通过websockets与远程API交互的代码.我的数据层负责建立和监控websocket连接.它还包含应用程序可用于排队要发送的websocket消息的方法.应用程序代码不应该负责检查websocket连接的状态,即"即发即忘".
理想情况下,我希望数据层的功能如下:
self.isConnected == NO)的连接时,消息在内部缓冲.self.isConnected == YES)时,立即发送缓冲的消息,并立即发送任何后续消息.这是我能够提出的:
#import "RACSignal+Buffering.h"
@implementation RACSignal (Buffering)
- (RACSignal*)bufferWithSignal:(RACSignal*)shouldBuffer
{
return [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
NSMutableArray* bufferedValues = [[NSMutableArray alloc] init];
__block BOOL buffering = NO;
void (^bufferHandler)() = ^{
if (!buffering)
{
for (id val in bufferedValues)
{
[subscriber sendNext:val];
}
[bufferedValues removeAllObjects];
}
};
RACDisposable* bufferDisposable = [shouldBuffer subscribeNext:^(NSNumber* shouldBuffer) {
buffering = shouldBuffer.boolValue;
bufferHandler();
}];
if (bufferDisposable)
{
[disposable …Run Code Online (Sandbox Code Playgroud) 我正在探索反应式编程和RxJava.这很有趣,但我遇到了一个无法找到答案的问题.我的基本问题:什么是一种反应性的方法来终止一个无限运行的Observable?我也欢迎有关我的代码的批评和反应性最佳实践.
作为练习,我正在编写一个日志文件尾实用程序.日志文件中的行流由a表示Observable<String>.为了BufferedReader继续阅读添加到文件中的文本,我忽略了通常的reader.readLine() == null终止检查,而是将其解释为意味着我的线程应该休眠并等待更多的记录器文本.
但是当我可以使用Ob终止Observer时takeUntil,我需要找到一种干净的方法来终止无限运行的文件观察器.我可以编写自己的terminateWatcher方法/字段,但这会打破Observable/Observer封装 - 我希望尽可能严格遵守反应范式.
这是Observable<String>代码:
public class FileWatcher implements OnSubscribeFunc<String> {
private Path path = . . .;
@Override
// The <? super String> generic is pointless but required by the compiler
public Subscription onSubscribe(Observer<? super String> observer) {
try (BufferedReader reader = new BufferedReader(new FileReader(path.toFile()))) {
String newLine = "";
while (!Thread.interrupted()) { // How do I terminate this reactively?
if ((newLine = reader.readLine()) …Run Code Online (Sandbox Code Playgroud) 我必须及时查询数据库以了解遗留系统的状态.我想过围绕一个查询包装Observable,但我不知道正确的方法.
基本上,它将是每5秒相同的查询.但我担心我将不得不面对这些问题:
额外细节:
SELECT返回带有状态代码列表的数据集(工作,出错).我几乎可以确定查询应该在另一个线程中执行,但是我不知道observable应该是什么样子的,看过Lee Campbell的Rx简介.
我StepVerifier用来测试值:
@Test
public void testStuff() {
Thing thing = new Thing();
Mono<Thing> result = Mono.just(thing);
StepVerifier.create(result).consumeNextWith(r -> {
assertEquals(thing, r);
}).verifyComplete();
}
Run Code Online (Sandbox Code Playgroud)
我现在要做的是测试单声道中没有项目.像这样:
@Test
public void testNoStuff() {
Mono<Thing> result = Mono.empty();
StepVerifier.create(result)... // what goes here?
}
Run Code Online (Sandbox Code Playgroud)
我想测试Mono实际上是空的.我怎么做?
在Spring Data中,我们PagingAndSortingRepository继承了它CrudRepository.在反应式Spring数据中,我们只有
ReactiveSortingRepository继承自ReactiveCrudRepository.我们怎么能以反应的方式进行分页呢?例如,我们将来能够做到这一点ReactivePagingAndSortingRepository吗?
如何在 dart 中测试流?我有这个代码:
test('words are reading sequentially correct', () {
WordTrackerInterface wordTracker = WordTracker.byContent('0 1 2');
wordTracker.setWordsCountPerChunk(1);
var stream = wordTracker.nextWordStringStream();
expect(
stream,
emitsInOrder(List<Word>.generate(
6, (i) => i > 2 ? Word('') : Word(i.toString()))));
for (int i = 0; i < 6; i++) {
wordTracker.nextWord();
}
});
Run Code Online (Sandbox Code Playgroud)
我需要测试该成员数据Word::content这是一个String等于在所提供的emitsInOrder。
类似于以下内容的流:
expect(
stream,
emitsInOrder(List<Word>.generate(
6, (i) => i > 2 ? Word('') : Word(i.toString()))),
expect((Word actual, Word expected) {
return actual.content == expected.content;
}));
Run Code Online (Sandbox Code Playgroud) 有没有办法在 RxSwift 中的 combineLatest 中放置 8 个以上的 observable?
这是我目前的实现:
let registerUserParameters = Observable.combineLatest(
firstname.asObservable(),
lastname.asObservable(),
email.asObservable(),
mobile.asObservable(),
addresses.asObservable(),
cities.asObservable(),
pword.asObservable(),
confirm_pword.asObservable(),
instagramid.asObservable(),
facebookid.asObservable()) {
($0, $1, $2, $3, $4, $5, $6, $7, $8, $9)
}
Run Code Online (Sandbox Code Playgroud)
我发现 combineLatest 根据其功能最多只接受 8 个参数。
public static func combineLatest<O1, O2, O3, O4, O5, O6, O7, O8>(_ source1: O1, _ source2: O2, _ source3: O3, _ source4: O4, _ source5: O5, _ source6: O6, _ source7: O7, _ source8: O8, resultSelector: @escaping (O1.E, O2.E, O3.E, O4.E, …Run Code Online (Sandbox Code Playgroud) 我看到很多人有非常相似的问题,但我尝试的任何方法都不起作用。
我有一个最喜欢的想法列表。每当我单击ideaItem 内的按钮时,它都应该从列表中删除。
当我删除任何ideaItem时,屏幕上的最后一个总是被删除,而不是我点击的那个。My FavoriteIdeasListView 似乎正确更新了项目计数,这意味着附加的列表有效,但 UI 没有重绘ideaItems。
一开始我直接在ideaItem上有删除功能,我读到我应该做一个VoidCallback并处理List本身的删除,所以它会注意到变化。它没有用
我还尝试使用 Stream Builder,因此该流会通知 ListView 进行刷新。它也没有用
我一直尝试调用 SetState 并且它不会重新加载,它只会在初始状态的开头构建列表。
class FavoritesList extends StatefulWidget {
FavoritesList({Key key}) : super(key: key);
@override
_FavoritesListState createState() => _FavoritesListState();
}
class _FavoritesListState extends State<FavoritesList> {
List<Idea> _favorites = [];
@override
void initState() {
super.initState();
favoritesInitialState();
}
Future <void> favoritesInitialState() async {
List<Idea> ideas = await IdeasDB.db.ideas();
setState(() {
_favorites = ideas;
});
}
@override
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(
title: Text('Favorites')), …Run Code Online (Sandbox Code Playgroud) 假设我们有一个实例o.s.w.reactive.function.server.ServerResponse.
获取其主体内容的正确方法是什么,换句话说,如何实现fetchBodyAsString功能?
test(){
ServerResponse response = getResponseFromService("mock data");
String body = fetchBodyAsString(response);
assertEquals("hello", body);
}
Run Code Online (Sandbox Code Playgroud)
您能否详细说明一下为什么ServerResponse所有内容都有方法(cookies(), headers(), statusCode()),但响应正文却没有?我想应该有一种方法可以用方法获取主体 writeTo(),尽管如何使用它绝对是模糊的。
java spring server-response reactive-programming spring-webflux
java ×3
c# ×2
dart ×2
ios ×2
.net ×1
flutter ×1
listview ×1
objective-c ×1
reactive ×1
rx-java ×1
spring ×1
spring-data ×1
swift ×1
unit-testing ×1