我通过Observable监控股票报价流,我认为它符合某种条件,例如 -
Observable
.Empty<Quote>
.Where(q => q.Price > watchPrice)
.Subscribe(q => { // do stuff } );
Run Code Online (Sandbox Code Playgroud)
现在,在"做东西"的时候,我理想的是得到最后3个"q"来自where子句,所以有点像BufferWithCount(),但每个条目都包含Subscribe()包含最后3个条目.这样我就可以保存导致条件评估的报价更改的快照.
伪大理石图 -
in - a b c d e f
out - a ba cba dcb edc fde
Run Code Online (Sandbox Code Playgroud)
任何想法都赞赏
我想通过RX创建一个心跳,我似乎无法找到一种方法来创建一个Observable无限脉冲的方法.
这甚至可能吗?
让我先解释一下我想要实现的目标.
假设我从事件流中传入了以下数据
var data = new string[] {
"hello",
"Using",
"ok:michael",
"ok",
"begin:events",
"1:232",
"2:343",
"end:events",
"error:dfljsdf",
"fdl",
"error:fjkdjslf",
"ok"
};
Run Code Online (Sandbox Code Playgroud)
当我订阅数据源时,我希望得到以下结果
"ok:michael"
"ok"
"begin:events 1:232 2:343 end:events"
"error:dfljsdf"
"error:fjkdjslf"
"ok"
Run Code Online (Sandbox Code Playgroud)
基本上,我希望得到以ok或错误开头的数据以及开始和结束之间的数据.
到目前为止我试过这个..
var data = new string[] {
"hello",
"Using",
"ok:michael",
"ok",
"begin:events",
"1:232",
"2:343",
"end:events",
"error:dfljsdf",
"fdl",
"error:fjkdjslf",
"ok"
};
var dataStream = Observable.Generate(
data.GetEnumerator(),
e => e.MoveNext(),
e => e,
e => e.Current.ToString(),
e => TimeSpan.FromSeconds(0.1));
var onelineStream = from d in …Run Code Online (Sandbox Code Playgroud) Rx中Subject的真实生活用途是什么?我在101个样本中找到了一个样本,但我认为这并没有让我清楚地知道应该在哪里应用它.请在任何一篇文章中展示一些简单的样本,以展示Rx中的主题作品.我在Windows手机的上下文中使用它.
我很难为自己找出这个因此在这里问.我有一个应用程序,它在远程计算机上安装客户端,客户端通过套接字报告.
我试图创建一个保持计时器.这意味着客户端将每1分钟向服务器发送一个问候消息.如果服务器在保持定时器内没有从客户端获取hello数据包,则客户端将被删除.我可以用一个客户端做得很好......但对于多个我无法理解它.我正在考虑创建一个线程foreach连接到服务器的新客户端.然后在该线程内部启动一个保持计时器..但我如何区分客户端与线程,以及如何重置保持计时器如果从客户端收到一个hello数据包.
我想为每个hello数据包创建一个新线程并停止旧数据包.但我不知道这将是多么密集的cpu
如果你不明白我的问题,请说出来,并试着解释你不理解的任何事情.
哪种解决方案更好?解决方案A)每次打开hello数据包时启动一个新线程并停止旧线程?(每40秒) - 来自100个客户
解决方案B)在此处插入更好的可扩展解决方案.
解决方案C)我可以访问我动态创建的线程内的计时器吗? - 我在所有动态创建的线程上都有唯一的线程名称.
public void _clientholdtimer()
{
holdtimer = new Timer();
holdtimer.Interval = SECtoMS(Settings.Default.Hold);
holdtimer.Elapsed += holdtimer_Elapsed;
}
Run Code Online (Sandbox Code Playgroud)
当我从客户端收到一个与线程同名的hello数据包时重置它?
解决方案D)将定时器存储在字典中公共字典timersDict = new Dictionary(); 并且循环在dict中找到关于hello数据包接收重置该定时器的引用.
foreach(var item in timersDict)
{
if(item.Key == stripip(client.RemoteEndPoint.ToString()))
TimerReset(item.Value);
}
Run Code Online (Sandbox Code Playgroud)
解
设置远程主机应用程序,每隔3秒向服务器发送一条"hello"消息.在服务器上创建一个计时器,每1秒发生一次经过的事件.设置一个静态值来保存一个int,它决定了,在我认为远程机器死了多久之前,我选择了9秒.
当服务器收到hello消息时,它将时间戳存储在列表中,并带有客户端的ip地址.
在timer_elapsed事件上
void holdtimer_Elapsed(object sender, System.Timers.ElapsedEventArgs e)
{
try
{
if (server.Clients.GetAllItems().Count != 0)
{
foreach (IScsServerClient _client in server.Clients.GetAllItems())
{
string ipaddr = stripip(_client.RemoteEndPoint.ToString());
bool containsentry = _clientlist.Any(item => item.ipadress == ipaddr);
if (containsentry)
{ …Run Code Online (Sandbox Code Playgroud) 我正在开发一个代码,每隔30秒轮询一次Bitstamp交换代码.这是我的代码:
public IObservable<string> Stream(Uri way, WebClient wc)
{
Func<IObserver<string>, Task> Fun = async Observer =>
{
var res = await wc.DownloadStringTaskAsync(way);
Observer.OnNext(value: res);
};
return Observable.Create<string>(Fun);
}
public IObservable<string> GetDelay(int secs)
{
var exe = TimeSpan.FromSeconds(secs);
return Observable.Empty<string>("x").Delay(exe);
}
Stream(new Uri("https://bitstamp.net/api/ticker"), new WebClient { }).Concat(GetDelay(30))
.Repeat(5).Subscribe(res => Debug.WriteLine("got result: {0}", res));
Run Code Online (Sandbox Code Playgroud)
问题是WebClient(并且HttpClient)也会在第一次调用后返回缓存的结果,可以通过相同的时间戳看到:
got result: {"high": "690.00", "last": "645.10", "timestamp": "1387715532" ... }
got result: {"high": "690.00", "last": "645.10", "timestamp": "1387715532" ... }
...
即使在关闭网络后,它们也会正常返回结果,所以很明显它们会将其缓存到某处.添加类似"?cache = random"的内容不起作用,因为Bitstamp上的ticker不允许使用请求参数.设置 …
caching webclient httprequest system.reactive windows-phone-8
当我订阅有时抛出异常的方法时,我会得到2个不同的行为.如果我在中间连接LINQ方法,那么订阅就会被处理掉,另外一点也不是,为什么呢?
void main(){
var numbersSubject=new Subject<int>();
numbersSubject.subscribe(throwMethod); // 1,2,3,4,6,7,8,9,10
// numbersSubject.select(num=>num).subscribe(throwMethod); // 1,2,3,4
for(int i=0;i<10;i++)
{
try{
numbersSubject.OnNext(i);
}catch{}
}
}
void throwMethod(int num)
{
if(num==5)
throw new Exception();
Console.writeLine(i);
}
Run Code Online (Sandbox Code Playgroud) 我从旧示例(Reactive UI 4)获得此代码:
StartAsyncCommand = new ReactiveCommand();
StartAsyncCommand.RegisterAsyncAction(_ =>
{
Progress = 0;
var exe = Enumerable.Range(0, 10).Select(x =>
{
Thread.Sleep(100);
return x;
}).ToObservable();
exe.Subscribe(x =>Progress += 10);
});
Run Code Online (Sandbox Code Playgroud)
它工作正常,UI不会阻止,按钮被禁用,直到进度达到100%.
当我迁移到版本6时,我尝试了许多方法来实现相同的功能而没有成功.
这些是我的尝试:
1)使用 CreateAsyncObservable
GenerateCommand = ReactiveCommand.CreateAsyncObservable(canGenerate, x => DoSomething());
public IObservable<Unit> DoSomething()
{
var exe = Enumerable.Range(0, 10).Select(
x =>
{
Thread.Sleep(200);
return x;
}).ToObservable().ObserveOnDispatcher().SubscribeOn(NewThreadScheduler.Default);
exe.Subscribe(x =>
Progress += 10
);
return Observable.Return(new Unit());
}
Run Code Online (Sandbox Code Playgroud)
它工作但按钮不会禁用.
2)有 CreateAsyncTask
GenerateCommand = ReactiveCommand.CreateAsyncTask(canGenerate, x => DoSomething());
public async Task<Unit> DoSomething()
{ …Run Code Online (Sandbox Code Playgroud) 我有以下代码:
class Program
{
static void Main(string[] args)
{
var watcher = new SNotifier(DumpToConsole);
watcher.StartQueue();
Console.ReadLine();
}
private static void DumpToConsole(IList<Timestamped<int>> currentCol)
{
Console.WriteLine("buffer time elapsed, current collection contents is: {0} items.", currentCol.Count);
Console.WriteLine("holder has: {0}", currentCol.Count);
}
}
Run Code Online (Sandbox Code Playgroud)
SNotifier:
public class SNotifier
{
private BlockingCollection<int> _holderQueue;
private readonly Action<IList<Timestamped<int>>> _dumpAction;
public SNotifier(Action<IList<Timestamped<int>>> dumpAction)
{
PopulateListWithStartValues();
_dumpAction = dumpAction;
}
public void StartQueue()
{
PopulateQueueOnDiffThread();
var observableCollection = _holderQueue.ToObservable();
var myCollectionTimestamped = observableCollection.Timestamp();
var bufferedTimestampedCollection = myCollectionTimestamped.Buffer(TimeSpan.FromSeconds(3), TimeSpan.FromSeconds(3));
using (bufferedTimestampedCollection.Subscribe(_dumpAction)) …Run Code Online (Sandbox Code Playgroud) 我一直在尝试使用Rx和可观察集合实现一个简单的生产者 - 消费者模式.我还需要能够轻松限制用户数量.我在并行扩展中看到过很多对LimitedConcurrencyLevelTaskScheduler的引用,但我似乎无法使用多个线程.
我想我做的很傻,所以我希望有人可以解释一下.在下面的单元测试中,我希望使用多个(2)线程来使用阻塞集合中的字符串.我究竟做错了什么?
[TestClass]
public class LimitedConcurrencyLevelTaskSchedulerTestscs
{
private ConcurrentBag<string> _testStrings = new ConcurrentBag<string>();
ConcurrentBag<int> _threadIds= new ConcurrentBag<int>();
[TestMethod]
public void WhenConsumingFromBlockingCollection_GivenLimitOfTwoThreads_TwoThreadsAreUsed()
{
// Setup the command queue for processing combinations
var commandQueue = new BlockingCollection<string>();
var taskFactory = new TaskFactory(new LimitedConcurrencyLevelTaskScheduler(2));
var scheduler = new TaskPoolScheduler(taskFactory);
commandQueue.GetConsumingEnumerable()
.ToObservable(scheduler)
.Subscribe(Go, ex => { throw ex; });
var iterationCount = 100;
for (int i = 0; i < iterationCount; i++)
{
commandQueue.Add(string.Format("string {0}", i));
}
commandQueue.CompleteAdding();
while (!commandQueue.IsCompleted)
{
Thread.Sleep(100); …Run Code Online (Sandbox Code Playgroud) c# concurrency observablecollection parallel-extensions system.reactive
system.reactive ×10
c# ×7
.net ×1
asynchronous ×1
caching ×1
concurrency ×1
httprequest ×1
reactiveui ×1
timer ×1
webclient ×1
wpf ×1