标签: system.reactive

如何从控制台输入创建 IObservable<string>

我尝试编写控制台可观察的内容,如下例所示,但它不起作用。订阅存在一些问题。如何解决这些问题?

static class Program
{
    static async Task Main(string[] args)
    {
        // var observable = Observable.Interval(TimeSpan.FromMilliseconds(1000)).Publish().RefCount(); // works
        // var observable = FromConsole().Publish().RefCount(); // doesn't work
        var observable = FromConsole(); // doesn't work
        observable.Subscribe(Console.WriteLine);
        await Task.Delay(1500);
        observable.Subscribe(Console.WriteLine);
        await new TaskCompletionSource().Task;
    }

    static IObservable<string> FromConsole()
    {
        return Observable.Create<string>(async observer =>
        {
            while (true)
            {
                observer.OnNext(Console.ReadLine());
            }
        });
    }
}
Run Code Online (Sandbox Code Playgroud)

如果我使用Observable.Interval,它会订阅两次,并且一个输入有两个输出。如果我使用任何版本的FromConsole,我都会有一个订阅和一个阻塞的线程。

c# console system.reactive observable

1
推荐指数
1
解决办法
704
查看次数

在Reactive Extensions中实现String Split()运算符

我希望能够解释传入的字符并"拆分"它们(在这种情况下,通过空格字符).

var incomingCharacters = "This is a test".ToCharArray().ToObservable();

// Yields a sequence of words - "This", "is", "a", "test"
var incomingWords = incomingCharacters.Split(' ');
Run Code Online (Sandbox Code Playgroud)

我做了一个操作员来做这个,但我想知道是否有更好的方法?

public static IObservable<string> Split(this IObservable<char> incomingCharacters, char s)
{
    var wordSeq = Observable.Create<string>(observer =>
        {
            // Create an inner sequence to look for word separators; publish each word to the
            // "outer sequence" as it is found
            var innerSeq = incomingCharacters
                .Concat(Observable.Return(s))           // Enables the last word to be processed
                .Scan(new StringBuilder(), (builder, c) => …
Run Code Online (Sandbox Code Playgroud)

c# system.reactive

0
推荐指数
1
解决办法
872
查看次数

Rx如何并行化长时间运行的任务?

我有以下代码片段,它枚举了某些xml的元素(从svn log --xml ...进程的输出中读取),然后为每个xml元素运行一个长时间运行的方法.

var proc = Process.Start(svnProcInfo);
var xml = XDocument.Load(proc.StandardOutput);

var xElements = xml.Descendants("path")
                   .ToObservable()
                   //.SubscribeOn(ThreadPoolScheduler.Instance) 
                   .Select(descendant => return LongRunning(descendant));
xElements
    //.SubscribeOn(NewThreadScheduler.Default)
    .Subscribe(result => Console.WriteLine(result);

Console.ReadKey();
Run Code Online (Sandbox Code Playgroud)

LongRunning方法并不重要,但在其中我记录了它运行的线程.让我们假设它运行一整秒.

我的问题是,取消评论任何SubscribeOn()一行都没有任何效果.调用LongRunning是顺序的,每隔一秒发生一次,在同一个线程上(尽管与主(初始)线程不同).

这是一个控制台应用程序.

我是Rx的新手.我错过了什么?

编辑:

在尝试了Lee Campbell的回答之后,我注意到了另一个问题.

Console.Error.WriteLine("Main thread " + Thread.CurrentThread.ManagedThreadId);

var xElements = xml.Descendants("path").ToObservable()
    //.ObserveOn(Scheduler.CurrentThread)
    .SelectMany(descendant =>     
          Observable.Start(()=>LongRunning(descendant),NewThreadScheduler.Default))
    .Subscribe(result => Console.WriteLine(
         "Result on: " + Thread.CurrentThread.ManagedThreadId));

[...]

string LongRunning(XElement el)
{
    Console.WriteLine("Execute on: Thread " + Thread.CurrentThread.ManagedThreadId);
    DoWork();
    Console.WriteLine("Finished on Thread " …
Run Code Online (Sandbox Code Playgroud)

c# parallel-processing system.reactive

0
推荐指数
1
解决办法
1735
查看次数

使用TcpClient和Reactive Extensions从Stream读取连续字节流

请考虑以下代码:

internal class Program
{
    private static void Main(string[] args)
    {
        var client = new TcpClient();
        client.ConnectAsync("localhost", 7105).Wait();
        var stream = client.GetStream();
        var observable = stream.ReadDataObservable().Repeat();

        var s = from d in observable.Buffer(4)
                let headerLength = IPAddress.NetworkToHostOrder(BitConverter.ToInt16(d.ToArray(), 2))
                let b = observable.Take(headerLength)
                select b.ToEnumerable().ToArray();
        s.Subscribe(a => Console.WriteLine("{0}", a));
        Console.ReadLine();
    }
}

public static class Extensions
{
    public static IObservable<byte> ReadDataObservable(this Stream stream)
    {
        return Observable.Defer(async () =>
        {
            var buffer = new byte[1024];
            var readBytes = await stream.ReadAsync(buffer, 0, buffer.Length); …
Run Code Online (Sandbox Code Playgroud)

c# stream tcpclient reactive-programming system.reactive

0
推荐指数
1
解决办法
2410
查看次数

如何通过一个值"延迟"可观察序列?

在Reactive Extensions中,如何将一个可观察序列延迟一个值?例如:

original: 2 3 5 7 9   
delayed:    2 3 5 7
Run Code Online (Sandbox Code Playgroud)

要清楚,我想将序列延迟一步.这与延迟恒定时间不同.

.net c# system.reactive

0
推荐指数
1
解决办法
134
查看次数

C#.NET - 反应性扩展错误

我有一个C#4.0类库projet,其中我引用了Reactive extensions dlls(2.2.5版本).我正在收到如下所述的编译错误.你能告诉我吗?

C#代码:

var observable = System.Reactive.Linq.Observable.Empty<bool>();

            foreach (var modelParam in modelParams)
                observable = observable.Merge(modelParam.ObservePropertyChanged(p => p.IsDirty).Where(p => p));
Run Code Online (Sandbox Code Playgroud)

*错误522'System.IObservable'不包含'Where'的定义,也没有扩展方法'Where'可以找到接受类型'System.IObservable'的第一个参数(你是否缺少using指令或汇编引用?

错误508'System.IObservable'不包含'Where'的定义,也没有扩展方法'Where'可以找到接受类型'System.IObservable'的第一个参数(你是否缺少using指令或汇编引用?)*

提前致谢.

c# system.reactive

0
推荐指数
1
解决办法
222
查看次数

带有Rx的EventBus

您如何使用Rx编写类似于以下简单EventBus示例的内容?

public class EventBus
{
    private readonly Dictionary<Type, List<Action<Event>>> routes = new Dictionary<Type, List<Action<Event>>>();

    public void RegisterHandler<T>(Action<T> handler) where T : Event
    {
        List<Action<Event>> handlers;

        if (!this.routes.TryGetValue(typeof(T), out handlers))
        {
            handlers = new List<Action<Event>>();
            this.routes.Add(typeof(T), handlers);
        }

        handlers.Add(x => handler((T)x));
    }

    public void Publish<T>(T @event) where T : Event
    {
        List<Action<Event>> handlers;

        if (!this.routes.TryGetValue(@event.GetType(), out handlers))
        {
            return;
        }

        foreach (var handler in handlers)
        {
            var apply = handler;
            apply(@event);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

c# system.reactive

0
推荐指数
1
解决办法
1128
查看次数

类型Observable <Response>不能分配给Observable <List <Todo >>类型

我正在尝试按照此应用程序松散地构建一个应用程序,但我收到了一个TypeScript错误,我想要解释什么是错误的.据我所知,我正在做我应该做的事情.

这是我到目前为止编写的代码:

import {Injectable,Inject} from '@angular/core';
import {Http,Headers,URLSearchParams, Response} from '@angular/http';
import {List, Record} from 'immutable';
import {Observable} from "rxjs/Observable";


const TodoRecord = Record({
    id: 0,
    description: "",
    completed: false
});

export class Todo extends TodoRecord {

    id:number;
    description:string;
    completed: boolean;

    constructor(props: any) {
        super(props);
    }

}

@Injectable()

export class TodoBackendService {
    constructor(private http: Http){
        this.http = http;
    }

    getAllTodos(){
        return this.http.get("/todo");
    }

    saveTodo(newTodo: Todo): Observable<List<Todo>> {
        var headers = new Headers();
        headers.append("Content-Type", "application/json; chartset=utf-8");

        return …
Run Code Online (Sandbox Code Playgroud)

system.reactive rxjs typescript angular

0
推荐指数
1
解决办法
6292
查看次数

是否有像StartWith这样的扩展,但是对于可观察序列的结尾?

我想在我的可观察序列中添加一个额外的"关闭"项.是否有一个Reactive LINQ扩展,就像StartWith可观察序列的结尾一样?

这是我想要的近似值,虽然它似乎没有触发最后一个元素:

public static IObservable<TSource> EndWith<TSource>(this IObservable<TSource> source, TSource element)
{
    return source.Concat(Observable.Return(element));
}
Run Code Online (Sandbox Code Playgroud)

c# system.reactive rx.net

0
推荐指数
1
解决办法
184
查看次数

ObMable仍在SelectMany之后订阅

我为WPF应用程序设置了标准的reactive-ui路由,我有一个ViewModel可以实现的接口来提供标题信息.

public interface IHaveTitle
{
    IObservable<string> Title { get; }
}
Run Code Online (Sandbox Code Playgroud)

在一个视图模型中,我正在执行以下操作(用于演示目的):

public IObservable<string> Title => Observable.Interval(TimeSpan.FromSeconds(5)).Select(_ => DateTime.Now.ToLongTimeString());
Run Code Online (Sandbox Code Playgroud)

在我的主窗口屏幕中,我正在执行以下操作:

disposer(
    ViewModel.Router.CurrentViewModel
    .SelectMany(vm =>
        ((vm as IHaveTitle)?.Title.StartWith("") ?? 
            Observable.Return("")).Select(s => string.IsNullOrEmpty(s) ? vm.UrlPathSegment : $"{vm.UrlPathSegment} > {s}"))
    .ObserveOn(RxApp.MainThreadScheduler)
    .BindTo(this, w => w.Title));
Run Code Online (Sandbox Code Playgroud)

disposerAction<IDisposable>传递到this.WhenActivated扩展方法.

现在,当我浏览时,标题确实会改变以反映UrlPathSegment,而在主视图模型上,标题会更新以显示每5秒的时间.

然而,我所看到的问题是,即使我导航到不同的视图模型,主视图模型上的标题可观察仍然会导致标题上的更改.

我的问题是:我如何防止这种情况发生?为什么在我离开时它不会分离,因为我选择的是基于CurrentViewModel

c# wpf system.reactive reactiveui

0
推荐指数
1
解决办法
124
查看次数