我尝试编写控制台可观察的内容,如下例所示,但它不起作用。订阅存在一些问题。如何解决这些问题?
static class Program
{
static async Task Main(string[] args)
{
// var observable = Observable.Interval(TimeSpan.FromMilliseconds(1000)).Publish().RefCount(); // works
// var observable = FromConsole().Publish().RefCount(); // doesn't work
var observable = FromConsole(); // doesn't work
observable.Subscribe(Console.WriteLine);
await Task.Delay(1500);
observable.Subscribe(Console.WriteLine);
await new TaskCompletionSource().Task;
}
static IObservable<string> FromConsole()
{
return Observable.Create<string>(async observer =>
{
while (true)
{
observer.OnNext(Console.ReadLine());
}
});
}
}
Run Code Online (Sandbox Code Playgroud)
如果我使用Observable.Interval,它会订阅两次,并且一个输入有两个输出。如果我使用任何版本的FromConsole,我都会有一个订阅和一个阻塞的线程。
我希望能够解释传入的字符并"拆分"它们(在这种情况下,通过空格字符).
var incomingCharacters = "This is a test".ToCharArray().ToObservable();
// Yields a sequence of words - "This", "is", "a", "test"
var incomingWords = incomingCharacters.Split(' ');
Run Code Online (Sandbox Code Playgroud)
我做了一个操作员来做这个,但我想知道是否有更好的方法?
public static IObservable<string> Split(this IObservable<char> incomingCharacters, char s)
{
var wordSeq = Observable.Create<string>(observer =>
{
// Create an inner sequence to look for word separators; publish each word to the
// "outer sequence" as it is found
var innerSeq = incomingCharacters
.Concat(Observable.Return(s)) // Enables the last word to be processed
.Scan(new StringBuilder(), (builder, c) => …Run Code Online (Sandbox Code Playgroud) 我有以下代码片段,它枚举了某些xml的元素(从svn log --xml ...进程的输出中读取),然后为每个xml元素运行一个长时间运行的方法.
var proc = Process.Start(svnProcInfo);
var xml = XDocument.Load(proc.StandardOutput);
var xElements = xml.Descendants("path")
.ToObservable()
//.SubscribeOn(ThreadPoolScheduler.Instance)
.Select(descendant => return LongRunning(descendant));
xElements
//.SubscribeOn(NewThreadScheduler.Default)
.Subscribe(result => Console.WriteLine(result);
Console.ReadKey();
Run Code Online (Sandbox Code Playgroud)
该LongRunning方法并不重要,但在其中我记录了它运行的线程.让我们假设它运行一整秒.
我的问题是,取消评论任何SubscribeOn()一行都没有任何效果.调用LongRunning是顺序的,每隔一秒发生一次,在同一个线程上(尽管与主(初始)线程不同).
这是一个控制台应用程序.
我是Rx的新手.我错过了什么?
编辑:
在尝试了Lee Campbell的回答之后,我注意到了另一个问题.
Console.Error.WriteLine("Main thread " + Thread.CurrentThread.ManagedThreadId);
var xElements = xml.Descendants("path").ToObservable()
//.ObserveOn(Scheduler.CurrentThread)
.SelectMany(descendant =>
Observable.Start(()=>LongRunning(descendant),NewThreadScheduler.Default))
.Subscribe(result => Console.WriteLine(
"Result on: " + Thread.CurrentThread.ManagedThreadId));
[...]
string LongRunning(XElement el)
{
Console.WriteLine("Execute on: Thread " + Thread.CurrentThread.ManagedThreadId);
DoWork();
Console.WriteLine("Finished on Thread " …Run Code Online (Sandbox Code Playgroud) 请考虑以下代码:
internal class Program
{
private static void Main(string[] args)
{
var client = new TcpClient();
client.ConnectAsync("localhost", 7105).Wait();
var stream = client.GetStream();
var observable = stream.ReadDataObservable().Repeat();
var s = from d in observable.Buffer(4)
let headerLength = IPAddress.NetworkToHostOrder(BitConverter.ToInt16(d.ToArray(), 2))
let b = observable.Take(headerLength)
select b.ToEnumerable().ToArray();
s.Subscribe(a => Console.WriteLine("{0}", a));
Console.ReadLine();
}
}
public static class Extensions
{
public static IObservable<byte> ReadDataObservable(this Stream stream)
{
return Observable.Defer(async () =>
{
var buffer = new byte[1024];
var readBytes = await stream.ReadAsync(buffer, 0, buffer.Length); …Run Code Online (Sandbox Code Playgroud) 在Reactive Extensions中,如何将一个可观察序列延迟一个值?例如:
original: 2 3 5 7 9
delayed: 2 3 5 7
Run Code Online (Sandbox Code Playgroud)
要清楚,我想将序列延迟一步.这与延迟恒定时间不同.
我有一个C#4.0类库projet,其中我引用了Reactive extensions dlls(2.2.5版本).我正在收到如下所述的编译错误.你能告诉我吗?
C#代码:
var observable = System.Reactive.Linq.Observable.Empty<bool>();
foreach (var modelParam in modelParams)
observable = observable.Merge(modelParam.ObservePropertyChanged(p => p.IsDirty).Where(p => p));
Run Code Online (Sandbox Code Playgroud)
错误508'System.IObservable'不包含'Where'的定义,也没有扩展方法'Where'可以找到接受类型'System.IObservable'的第一个参数(你是否缺少using指令或汇编引用?)*
提前致谢.
您如何使用Rx编写类似于以下简单EventBus示例的内容?
public class EventBus
{
private readonly Dictionary<Type, List<Action<Event>>> routes = new Dictionary<Type, List<Action<Event>>>();
public void RegisterHandler<T>(Action<T> handler) where T : Event
{
List<Action<Event>> handlers;
if (!this.routes.TryGetValue(typeof(T), out handlers))
{
handlers = new List<Action<Event>>();
this.routes.Add(typeof(T), handlers);
}
handlers.Add(x => handler((T)x));
}
public void Publish<T>(T @event) where T : Event
{
List<Action<Event>> handlers;
if (!this.routes.TryGetValue(@event.GetType(), out handlers))
{
return;
}
foreach (var handler in handlers)
{
var apply = handler;
apply(@event);
}
}
}
Run Code Online (Sandbox Code Playgroud) 我正在尝试按照此应用程序松散地构建一个应用程序,但我收到了一个TypeScript错误,我想要解释什么是错误的.据我所知,我正在做我应该做的事情.
这是我到目前为止编写的代码:
import {Injectable,Inject} from '@angular/core';
import {Http,Headers,URLSearchParams, Response} from '@angular/http';
import {List, Record} from 'immutable';
import {Observable} from "rxjs/Observable";
const TodoRecord = Record({
id: 0,
description: "",
completed: false
});
export class Todo extends TodoRecord {
id:number;
description:string;
completed: boolean;
constructor(props: any) {
super(props);
}
}
@Injectable()
export class TodoBackendService {
constructor(private http: Http){
this.http = http;
}
getAllTodos(){
return this.http.get("/todo");
}
saveTodo(newTodo: Todo): Observable<List<Todo>> {
var headers = new Headers();
headers.append("Content-Type", "application/json; chartset=utf-8");
return …Run Code Online (Sandbox Code Playgroud) 我想在我的可观察序列中添加一个额外的"关闭"项.是否有一个Reactive LINQ扩展,就像StartWith可观察序列的结尾一样?
这是我想要的近似值,虽然它似乎没有触发最后一个元素:
public static IObservable<TSource> EndWith<TSource>(this IObservable<TSource> source, TSource element)
{
return source.Concat(Observable.Return(element));
}
Run Code Online (Sandbox Code Playgroud) 我为WPF应用程序设置了标准的reactive-ui路由,我有一个ViewModel可以实现的接口来提供标题信息.
public interface IHaveTitle
{
IObservable<string> Title { get; }
}
Run Code Online (Sandbox Code Playgroud)
在一个视图模型中,我正在执行以下操作(用于演示目的):
public IObservable<string> Title => Observable.Interval(TimeSpan.FromSeconds(5)).Select(_ => DateTime.Now.ToLongTimeString());
Run Code Online (Sandbox Code Playgroud)
在我的主窗口屏幕中,我正在执行以下操作:
disposer(
ViewModel.Router.CurrentViewModel
.SelectMany(vm =>
((vm as IHaveTitle)?.Title.StartWith("") ??
Observable.Return("")).Select(s => string.IsNullOrEmpty(s) ? vm.UrlPathSegment : $"{vm.UrlPathSegment} > {s}"))
.ObserveOn(RxApp.MainThreadScheduler)
.BindTo(this, w => w.Title));
Run Code Online (Sandbox Code Playgroud)
凡disposer在Action<IDisposable>传递到this.WhenActivated扩展方法.
现在,当我浏览时,标题确实会改变以反映UrlPathSegment,而在主视图模型上,标题会更新以显示每5秒的时间.
然而,我所看到的问题是,即使我导航到不同的视图模型,主视图模型上的标题可观察仍然会导致标题上的更改.
我的问题是:我如何防止这种情况发生?为什么在我离开时它不会分离,因为我选择的是基于CurrentViewModel?
system.reactive ×10
c# ×9
.net ×1
angular ×1
console ×1
observable ×1
reactiveui ×1
rx.net ×1
rxjs ×1
stream ×1
tcpclient ×1
typescript ×1
wpf ×1