标签: reactive-programming

为什么IObservable <T> .First()阻止?

我一直在努力试探最近的.NET Reactive Extensions,但是已经触及了一些概念墙:我无法弄清楚为什么IObservable.First()会阻塞.

我有一些示例代码看起来像这样:

var a = new ListItem(a);
var b = new ListItem(b);
var c = new ListItem(c);
var d = new ListItem(d);

var observableList = new List<ListItem> { a,b,c,d }.ToObservable();

var itemA = observableList.First();

// Never reached
Assert.AreEqual(expectedFoo, itemA.Foo);
Run Code Online (Sandbox Code Playgroud)

我期待发生的事情是itemA在引用上等于a并且能够访问其成员等.相反,发生的是First()块和Assert.AreEqual()从未到达的块.

现在,我知道在使用Rx时,代码应该Subscribe()IObservables,所以很可能我在这里做错了.但是,根据各种方法签名,不可能执行以下任一操作:

observableList.First().Subscribe(item => Assert.AreEqual(expectedFoo, item));
Run Code Online (Sandbox Code Playgroud)

要么

observableList.Subscribe(SomeMethod).First() // This doesn't even make sense, right?
Run Code Online (Sandbox Code Playgroud)

我错过了什么?

c# reactive-programming system.reactive

10
推荐指数
2
解决办法
772
查看次数

如何有条件地缓冲RACSignal值?

我正在研究一些通过websockets与远程API交互的代码.我的数据层负责建立和监控websocket连接.它还包含应用程序可用于排队要发送的websocket消息的方法.应用程序代码不应该负责检查websocket连接的状态,即"即发即忘".

理想情况下,我希望数据层的功能如下:

  • 当数据层没有与websocket endpoint(self.isConnected == NO)的连接时,消息在内部缓冲.
  • 当连接变为可用(self.isConnected == YES)时,立即发送缓冲的消息,并立即发送任何后续消息.

这是我能够提出的:

#import "RACSignal+Buffering.h"

@implementation RACSignal (Buffering)

- (RACSignal*)bufferWithSignal:(RACSignal*)shouldBuffer
{
    return [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {

        RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];

        NSMutableArray* bufferedValues = [[NSMutableArray alloc] init];
        __block BOOL buffering = NO;

        void (^bufferHandler)() = ^{
            if (!buffering)
            {
                for (id val in bufferedValues)
                {
                    [subscriber sendNext:val];
                }

                [bufferedValues removeAllObjects];
            }
        };

        RACDisposable* bufferDisposable = [shouldBuffer subscribeNext:^(NSNumber* shouldBuffer) {

            buffering = shouldBuffer.boolValue;
            bufferHandler();

        }];

        if (bufferDisposable)
        {
            [disposable …
Run Code Online (Sandbox Code Playgroud)

objective-c reactive-programming ios reactive-cocoa

10
推荐指数
1
解决办法
1673
查看次数

RxJava - 终止无限流

我正在探索反应式编程和RxJava.这很有趣,但我遇到了一个无法找到答案的问题.我的基本问题:什么是一种反应性的方法来终止一个无限运行的Observable?我也欢迎有关我的代码的批评和反应性最佳实践.

作为练习,我正在编写一个日志文件尾实用程序.日志文件中的行流由a表示Observable<String>.为了BufferedReader继续阅读添加到文件中的文本,我忽略了通常的reader.readLine() == null终止检查,而是将其解释为意味着我的线程应该休眠并等待更多的记录器文本.

但是当我可以使用Ob终止Observer时takeUntil,我需要找到一种干净的方法来终止无限运行的文件观察器.我可以编写自己的terminateWatcher方法/字段,但这会打破Observable/Observer封装 - 我希望尽可能严格遵守反应范式.

这是Observable<String>代码:

public class FileWatcher implements OnSubscribeFunc<String> {
    private Path path = . . .;

    @Override
    // The <? super String> generic is pointless but required by the compiler
    public Subscription onSubscribe(Observer<? super String> observer) {
        try (BufferedReader reader = new BufferedReader(new FileReader(path.toFile()))) {
            String newLine = "";
            while (!Thread.interrupted()) {  // How do I terminate this reactively?
                if ((newLine = reader.readLine()) …
Run Code Online (Sandbox Code Playgroud)

java reactive-programming rx-java

10
推荐指数
1
解决办法
3503
查看次数

使用Reactive Extensions进行数据库轮询

我必须及时查询数据库以了解遗留系统的状态.我想过围绕一个查询包装Observable,但我不知道正确的方法.

基本上,它将是每5秒相同的查询.但我担心我将不得不面对这些问题:

  • 如果执行查询需要10秒钟怎么办?如果仍在处理上一个查询,我不想执行任何新查询.
  • 此外,应该有一个超时.如果当前查询在例如20秒之后未执行,则应记录信息性消息并应发送新的尝试(相同的查询).

额外细节:

  • 查询只是一个SELECT返回带有状态代码列表的数据集(工作,出错).
  • Observable序列将始终采用从查询接收的最新数据,例如Switch扩展方法.
  • 我想将数据库查询(lenghty操作)包装成一个Task,但我不确定它是否是最好的选择.

我几乎可以确定查询应该在另一个线程中执行,但是我不知道observable应该是什么样子的,看过Lee Campbell的Rx简介.

.net c# reactive-programming system.reactive

10
推荐指数
1
解决办法
3583
查看次数

如何使用Reactor的StepVerifier来验证Mono是否为空?

StepVerifier用来测试值:

@Test
public void testStuff() {
    Thing thing = new Thing();
    Mono<Thing> result = Mono.just(thing);
    StepVerifier.create(result).consumeNextWith(r -> {
        assertEquals(thing, r);
    }).verifyComplete();
}
Run Code Online (Sandbox Code Playgroud)

我现在要做的是测试单声道中没有项目.像这样:

@Test
public void testNoStuff() {
    Mono<Thing> result = Mono.empty();
    StepVerifier.create(result)... // what goes here?
}
Run Code Online (Sandbox Code Playgroud)

我想测试Mono实际上是空的.我怎么做?

java unit-testing reactive-programming project-reactor

10
推荐指数
2
解决办法
3123
查看次数

如何在反应性Spring数据中应用分页?

在Spring Data中,我们PagingAndSortingRepository继承了它CrudRepository.在反应式Spring数据中,我们只有 ReactiveSortingRepository继承自ReactiveCrudRepository.我们怎么能以反应的方式进行分页呢?例如,我们将来能够做到这一点ReactivePagingAndSortingRepository吗?

reactive-programming spring-data spring-data-mongodb

9
推荐指数
2
解决办法
7325
查看次数

如何在 Dart 中测试流

如何在 dart 中测试流?我有这个代码:

test('words are reading sequentially correct', () {
  WordTrackerInterface wordTracker = WordTracker.byContent('0 1 2');
  wordTracker.setWordsCountPerChunk(1);
  var stream = wordTracker.nextWordStringStream();

  expect(
      stream,
      emitsInOrder(List<Word>.generate(
          6, (i) => i > 2 ? Word('') : Word(i.toString()))));

  for (int i = 0; i < 6; i++) {
    wordTracker.nextWord();
  }
});
Run Code Online (Sandbox Code Playgroud)

我需要测试该成员数据Word::content这是一个String等于在所提供的emitsInOrder

类似于以下内容的流:

expect(
    stream,
    emitsInOrder(List<Word>.generate(
        6, (i) => i > 2 ? Word('') : Word(i.toString()))),
    expect((Word actual, Word expected) {
  return actual.content == expected.content;
}));
Run Code Online (Sandbox Code Playgroud)

reactive-programming dart

9
推荐指数
3
解决办法
5788
查看次数

使用 RxSwift 在 combineLatest 中超过 8 个参数

有没有办法在 RxSwift 中的 combineLatest 中放置 8 个以上的 observable?

这是我目前的实现:

let registerUserParameters = Observable.combineLatest(
        firstname.asObservable(),
        lastname.asObservable(),
        email.asObservable(),
        mobile.asObservable(),
        addresses.asObservable(),
        cities.asObservable(),
        pword.asObservable(),
        confirm_pword.asObservable(),
        instagramid.asObservable(),
        facebookid.asObservable()) {
            ($0, $1, $2, $3, $4, $5, $6, $7, $8, $9)
    }
Run Code Online (Sandbox Code Playgroud)

我发现 combineLatest 根据其功能最多只接受 8 个参数。

public static func combineLatest<O1, O2, O3, O4, O5, O6, O7, O8>(_ source1: O1, _ source2: O2, _ source3: O3, _ source4: O4, _ source5: O5, _ source6: O6, _ source7: O7, _ source8: O8, resultSelector: @escaping (O1.E, O2.E, O3.E, O4.E, …
Run Code Online (Sandbox Code Playgroud)

reactive-programming ios swift reactive

9
推荐指数
1
解决办法
1513
查看次数

Flutter ListView 不会使用 setState() 刷新 UI,尽管 itemCount 和附加列表正确更新

我看到很多人有非常相似的问题,但我尝试的任何方法都不起作用。

语境

我有一个最喜欢的想法列表。每当我单击ideaItem 内的按钮时,它都应该从列表中删除。

问题

当我删除任何ideaItem时,屏幕上的最后一个总是被删除,而不是我点击的那个。My FavoriteIdeasListView 似乎正确更新了项目计数,这意味着附加的列表有效,但 UI 没有重绘ideaItems。

我的尝试

  • 一开始我直接在ideaItem上有删除功能,我读到我应该做一个VoidCallback并处理List本身的删除,所以它会注意到变化。它没有用

  • 我还尝试使用 Stream Builder,因此该流会通知 ListView 进行刷新。它也没有用

  • 我一直尝试调用 SetState 并且它不会重新加载,它只会在初始状态的开头构建列表。


    class FavoritesList extends StatefulWidget {
      FavoritesList({Key key}) : super(key: key);

      @override
      _FavoritesListState createState() => _FavoritesListState();
    }

    class _FavoritesListState extends State<FavoritesList> {

      List<Idea> _favorites = [];

      @override
      void initState() {
        super.initState();
        favoritesInitialState();
      }

      Future <void> favoritesInitialState() async {
        List<Idea> ideas = await IdeasDB.db.ideas();
        setState(() {
          _favorites = ideas;
        });
      }

      @override
      Widget build(BuildContext context) {
        return Scaffold(
            appBar: AppBar(
                title: Text('Favorites')), …
Run Code Online (Sandbox Code Playgroud)

listview reactive-programming dart flutter

9
推荐指数
1
解决办法
3289
查看次数

如何从 ServerResponse 获取字符串形式的正文进行测试?

假设我们有一个实例o.s.w.reactive.function.server.ServerResponse.

获取其主体内容的正确方法是什么,换句话说,如何实现fetchBodyAsString功能?

test(){
  ServerResponse response = getResponseFromService("mock data");

  String body = fetchBodyAsString(response);

  assertEquals("hello", body);
}
Run Code Online (Sandbox Code Playgroud)

您能否详细说明一下为什么ServerResponse所有内容都有方法(cookies(), headers(), statusCode()),但响应正文却没有?我想应该有一种方法可以用方法获取主体 writeTo(),尽管如何使用它绝对是模糊的。

java spring server-response reactive-programming spring-webflux

9
推荐指数
2
解决办法
1万
查看次数