标签: system.reactive

我如何从IObservable获取历史记录?

我通过Observable监控股票报价流,我认为它符合某种条件,例如 -

Observable
.Empty<Quote>
.Where(q => q.Price > watchPrice)
.Subscribe(q => { // do stuff } );
Run Code Online (Sandbox Code Playgroud)

现在,在"做东西"的时候,我理想的是得到最后3个"q"来自where子句,所以有点像BufferWithCount(),但每个条目都包含Subscribe()包含最后3个条目.这样我就可以保存导致条件评估的报价更改的快照.

伪大理石图 -

in  -  a     b     c     d     e     f
out -  a     ba    cba   dcb   edc   fde
Run Code Online (Sandbox Code Playgroud)

任何想法都赞赏

c# system.reactive

1
推荐指数
1
解决办法
174
查看次数

是否有可能永远重复的RX观察

我想通过RX创建一个心跳,我似乎无法找到一种方法来创建一个Observable无限脉冲的方法.

这甚至可能吗?

.net c# system.reactive

1
推荐指数
1
解决办法
764
查看次数

Rx中基于数据的缓冲

让我先解释一下我想要实现的目标.

假设我从事件流中传入了以下数据

var data = new string[] { 
                "hello", 
                "Using", 
                "ok:michael", 
                "ok", 
                "begin:events", 
                "1:232", 
                "2:343", 
                "end:events", 
                "error:dfljsdf",
                "fdl", 
                "error:fjkdjslf",
                "ok"  
            };
Run Code Online (Sandbox Code Playgroud)

当我订阅数据源时,我希望得到以下结果

"ok:michael"
"ok"
"begin:events 1:232 2:343 end:events"
"error:dfljsdf"
"error:fjkdjslf"
"ok"
Run Code Online (Sandbox Code Playgroud)

基本上,我希望得到以ok或错误开头的数据以及开始和结束之间的数据.

到目前为止我试过这个..

var data = new string[] { 
                "hello", 
                "Using", 
                "ok:michael", 
                "ok", 
                "begin:events", 
                "1:232", 
                "2:343", 
                "end:events", 
                "error:dfljsdf",
                "fdl", 
                "error:fjkdjslf",
                "ok"  
            };



            var dataStream = Observable.Generate(
                                data.GetEnumerator(), 
                                e => e.MoveNext(), 
                                e => e, 
                                e => e.Current.ToString(), 
                                e => TimeSpan.FromSeconds(0.1));         

            var onelineStream = from d in …
Run Code Online (Sandbox Code Playgroud)

system.reactive

1
推荐指数
1
解决办法
377
查看次数

Rx中的主题<T>

Rx中Subject的真实生活用途是什么?我在101个样本中找到了一个样本,但我认为这并没有让我清楚地知道应该在哪里应用它.请在任何一篇文章中展示一些简单的样本,以展示Rx中的主题作品.我在Windows手机的上下文中使用它.

reactive-programming system.reactive windows-phone-7

1
推荐指数
1
解决办法
179
查看次数

保持定时器类型线程

我很难为自己找出这个因此在这里问.我有一个应用程序,它在远程计算机上安装客户端,客户端通过套接字报告.

我试图创建一个保持计时器.这意味着客户端将每1分钟向服务器发送一个问候消息.如果服务器在保持定时器内没有从客户端获取hello数据包,则客户端将被删除.我可以用一个客户端做得很好......但对于多个我无法理解它.我正在考虑创建一个线程foreach连接到服务器的新客户端.然后在该线程内部启动一个保持计时器..但我如何区分客户端与线程,以及如何重置保持计时器如果从客户端收到一个hello数据包.

我想为每个hello数据包创建一个新线程并停止旧数据包.但我不知道这将是多么密集的cpu

如果你不明白我的问题,请说出来,并试着解释你不理解的任何事情.

哪种解决方案更好?解决方案A)每次打开hello数据包时启动一个新线程并停止旧线程?(每40秒) - 来自100个客户

解决方案B)在此处插入更好的可扩展解决方案.

解决方案C)我可以访问我动态创建的线程内的计时器吗? - 我在所有动态创建的线程上都有唯一的线程名称.

public void _clientholdtimer()
    {          
    holdtimer = new Timer();
    holdtimer.Interval = SECtoMS(Settings.Default.Hold);
    holdtimer.Elapsed += holdtimer_Elapsed;          
    }
Run Code Online (Sandbox Code Playgroud)

当我从客户端收到一个与线程同名的hello数据包时重置它?

解决方案D)将定时器存储在字典中公共字典timersDict = new Dictionary(); 并且循环在dict中找到关于hello数据包接收重置该定时器的引用.

foreach(var item in timersDict)
                            {
                                if(item.Key == stripip(client.RemoteEndPoint.ToString()))
                                    TimerReset(item.Value);
                            }   
Run Code Online (Sandbox Code Playgroud)

设置远程主机应用程序,每隔3秒向服务器发送一条"hello"消息.在服务器上创建一个计时器,每1秒发生一次经过的事件.设置一个静态值来保存一个int,它决定了,在我认为远程机器死了多久之前,我选择了9秒.

当服务器收到hello消息时,它将时间戳存储在列表中,并带有客户端的ip地址.

在timer_elapsed事件上

void holdtimer_Elapsed(object sender, System.Timers.ElapsedEventArgs e)
    {

        try
        {
            if (server.Clients.GetAllItems().Count != 0)
            {
                foreach (IScsServerClient _client in server.Clients.GetAllItems())
                {
                    string ipaddr = stripip(_client.RemoteEndPoint.ToString());
                    bool containsentry = _clientlist.Any(item => item.ipadress == ipaddr);
                    if (containsentry)
                    { …
Run Code Online (Sandbox Code Playgroud)

c# multithreading timer system.reactive

1
推荐指数
1
解决办法
457
查看次数

WebClient/HttpClient缓存问题

我正在开发一个代码,每隔30秒轮询一次Bitstamp交换代码.这是我的代码:

public IObservable<string> Stream(Uri way, WebClient wc)
    {
        Func<IObserver<string>, Task> Fun = async Observer =>
        {
            var res = await wc.DownloadStringTaskAsync(way);
            Observer.OnNext(value: res);
        };

        return Observable.Create<string>(Fun);
    }

public IObservable<string> GetDelay(int secs)
    {
        var exe = TimeSpan.FromSeconds(secs);
        return Observable.Empty<string>("x").Delay(exe);
    }

Stream(new Uri("https://bitstamp.net/api/ticker"), new WebClient { }).Concat(GetDelay(30))
    .Repeat(5).Subscribe(res => Debug.WriteLine("got result: {0}", res));
Run Code Online (Sandbox Code Playgroud)

问题是WebClient(并且HttpClient)也会在第一次调用后返回缓存的结果,可以通过相同的时间戳看到:

got result: {"high": "690.00", "last": "645.10", "timestamp": "1387715532" ... }
got result: {"high": "690.00", "last": "645.10", "timestamp": "1387715532" ... }
...

即使在关闭网络后,它们也会正常返回结果,所以很明显它们会将其缓存到某处.添加类似"?cache = random"的内容不起作用,因为Bitstamp上的ticker不允许使用请求参数.设置 …

caching webclient httprequest system.reactive windows-phone-8

1
推荐指数
2
解决办法
2303
查看次数

如果中间有linq方法,则rx处理异常订阅

当我订阅有时抛出异常的方法时,我会得到2个不同的行为.如果我在中间连接LINQ方法,那么订阅就会被处理掉,另外一点也不是,为什么呢?

void main(){
  var numbersSubject=new Subject<int>();

  numbersSubject.subscribe(throwMethod);   // 1,2,3,4,6,7,8,9,10
  // numbersSubject.select(num=>num).subscribe(throwMethod);   // 1,2,3,4

  for(int i=0;i<10;i++)
  {
    try{
      numbersSubject.OnNext(i);
    }catch{}
  }
}

void throwMethod(int num)
{
   if(num==5)
       throw new Exception();
   Console.writeLine(i);
}
Run Code Online (Sandbox Code Playgroud)

c# reactive-programming system.reactive

1
推荐指数
1
解决办法
188
查看次数

Reactive UI 6:如何在版本6中实现版本4的相同功能

我从旧示例(Reactive UI 4)获得此代码:

StartAsyncCommand = new ReactiveCommand();
        StartAsyncCommand.RegisterAsyncAction(_ =>
        {
            Progress = 0;
            var exe = Enumerable.Range(0, 10).Select(x =>
                            {
                                Thread.Sleep(100);
                                return x;
                            }).ToObservable();
            exe.Subscribe(x =>Progress += 10);
        });
Run Code Online (Sandbox Code Playgroud)

它工作正常,UI不会阻止,按钮被禁用,直到进度达到100%.

当我迁移到版本6时,我尝试了许多方法来实现相同的功能而没有成功.

这些是我的尝试:

1)使用 CreateAsyncObservable

GenerateCommand = ReactiveCommand.CreateAsyncObservable(canGenerate, x => DoSomething());

public IObservable<Unit> DoSomething()
    {
        var exe = Enumerable.Range(0, 10).Select(
            x =>
            {
                Thread.Sleep(200);
                return x;
            }).ToObservable().ObserveOnDispatcher().SubscribeOn(NewThreadScheduler.Default);
        exe.Subscribe(x =>
                Progress += 10
            );
        return Observable.Return(new Unit());
    }
Run Code Online (Sandbox Code Playgroud)

它工作但按钮不会禁用.

2)有 CreateAsyncTask

GenerateCommand = ReactiveCommand.CreateAsyncTask(canGenerate, x => DoSomething());

public async Task<Unit> DoSomething()
    { …
Run Code Online (Sandbox Code Playgroud)

c# wpf asynchronous system.reactive reactiveui

1
推荐指数
1
解决办法
669
查看次数

Observable不响应阻塞集合在不同的线程上发生了变化

我有以下代码:

class Program
{
    static void Main(string[] args)
    {
        var watcher = new SNotifier(DumpToConsole);
        watcher.StartQueue();

        Console.ReadLine();
    }

    private static void DumpToConsole(IList<Timestamped<int>> currentCol)
    {
        Console.WriteLine("buffer time elapsed, current collection contents is: {0} items.", currentCol.Count);
        Console.WriteLine("holder has: {0}", currentCol.Count);
    }
}
Run Code Online (Sandbox Code Playgroud)

SNotifier:

public class SNotifier
{
    private BlockingCollection<int> _holderQueue;
    private readonly Action<IList<Timestamped<int>>> _dumpAction;

    public SNotifier(Action<IList<Timestamped<int>>> dumpAction)
    {
        PopulateListWithStartValues();
        _dumpAction = dumpAction;
    }

    public void StartQueue()
    {
        PopulateQueueOnDiffThread();

        var observableCollection = _holderQueue.ToObservable();

        var myCollectionTimestamped = observableCollection.Timestamp();
        var bufferedTimestampedCollection = myCollectionTimestamped.Buffer(TimeSpan.FromSeconds(3), TimeSpan.FromSeconds(3));

        using (bufferedTimestampedCollection.Subscribe(_dumpAction)) …
Run Code Online (Sandbox Code Playgroud)

c# multithreading system.reactive observer-pattern

1
推荐指数
1
解决办法
690
查看次数

与有限订阅者同时订阅可观察集合的简单方法

我一直在尝试使用Rx和可观察集合实现一个简单的生产者 - 消费者模式.我还需要能够轻松限制用户数量.我在并行扩展中看到过很多对LimitedConcurrencyLevelTask​​Scheduler的引用,但我似乎无法使用多个线程.

我想我做的很傻,所以我希望有人可以解释一下.在下面的单元测试中,我希望使用多个(2)线程来使用阻塞集合中的字符串.我究竟做错了什么?

[TestClass]
public class LimitedConcurrencyLevelTaskSchedulerTestscs
{
    private ConcurrentBag<string> _testStrings = new ConcurrentBag<string>();
    ConcurrentBag<int> _threadIds= new ConcurrentBag<int>();

    [TestMethod]
    public void WhenConsumingFromBlockingCollection_GivenLimitOfTwoThreads_TwoThreadsAreUsed()
    {

        // Setup the command queue for processing combinations
        var commandQueue = new BlockingCollection<string>();

        var taskFactory = new TaskFactory(new LimitedConcurrencyLevelTaskScheduler(2));
        var scheduler = new TaskPoolScheduler(taskFactory);

        commandQueue.GetConsumingEnumerable()
            .ToObservable(scheduler)
            .Subscribe(Go, ex => { throw ex; });

        var iterationCount = 100;
        for (int i = 0; i < iterationCount; i++)
        {
            commandQueue.Add(string.Format("string {0}", i));
        }
        commandQueue.CompleteAdding();

        while (!commandQueue.IsCompleted)
        {
            Thread.Sleep(100); …
Run Code Online (Sandbox Code Playgroud)

c# concurrency observablecollection parallel-extensions system.reactive

1
推荐指数
1
解决办法
1699
查看次数