Ibr*_*jar 10 .net c# reactive-programming system.reactive
我试图在具有10000多条记录的数据库表上实现即时搜索.
搜索文本框内的文本发生更改时搜索开始,当搜索框变空时,我想调用另一种加载所有数据的方法.
此外,如果用户更改搜索字符串,而正在加载另一个搜索的结果,则应停止加载这些结果以支持新搜索.
我像下面的代码一样实现它,但我想知道是否有更好或更清晰的方法来使用Rx(Reactive Extension)运算符,我觉得在第一个observable的subscribe方法中创建第二个observable比声明性的,对于if语句也是如此.
var searchStream = Observable.FromEventPattern(s => txtSearch.TextChanged += s, s => txtSearch.TextChanged -= s)
    .Throttle(TimeSpan.FromMilliseconds(300))
    .Select(evt =>
        {
            var txtbox = evt.Sender as TextBox;
            return txtbox.Text;
        }
    );
searchStream
    .DistinctUntilChanged()
    .ObserveOn(SynchronizationContext.Current)
    .Subscribe(searchTerm =>
        {
            this.parties.Clear();
            this.partyBindingSource.ResetBindings(false);
            long partyCount;
            var foundParties = string.IsNullOrEmpty(searchTerm) ? partyRepository.GetAll(out partyCount) : partyRepository.SearchByNameAndNotes(searchTerm);
            foundParties
                .ToObservable(Scheduler.Default)
                .TakeUntil(searchStream)
                .Buffer(500)
                .ObserveOn(SynchronizationContext.Current)
                .Subscribe(searchResults =>
                    {
                        this.parties.AddRange(searchResults);
                        this.partyBindingSource.ResetBindings(false);
                    }
                    , innerEx =>
                    {
                    }
                    , () => { }
                );
        }
        , ex =>
        {
        }
        , () =>
        {
        }
    );
该SearchByNameAndNotes方法只是IEnumerable<Party>通过从数据读取器读取数据来返回使用SQLite.
Jam*_*rld 18
我想你想要这样的东西.编辑:从你的评论,我看到你有一个同步存储库API - 我将保留异步版本,然后添加一个同步版本.内联注释:
异步存储库接口可能是这样的:
public interface IPartyRepository
{
    Task<IEnumerable<Party>> GetAllAsync(out long partyCount);
    Task<IEnumerable<Party>> SearchByNameAndNotesAsync(string searchTerm);
}
然后我将查询重构为:
var searchStream = Observable.FromEventPattern(
    s => txtSearch.TextChanged += s,
    s => txtSearch.TextChanged -= s)
    .Select(evt => txtSearch.Text) // better to select on the UI thread
    .Throttle(TimeSpan.FromMilliseconds(300))
    .DistinctUntilChanged()
    // placement of this is important to avoid races updating the UI
    .ObserveOn(SynchronizationContext.Current)
    .Do(_ =>
    {
        // I like to use Do to make in-stream side-effects explicit
        this.parties.Clear();
        this.partyBindingSource.ResetBindings(false);
    })
    // This is "the money" part of the answer:
    // Don't subscribe, just project the search term
    // into the query...
    .Select(searchTerm =>
    {
        long partyCount;
        var foundParties = string.IsNullOrEmpty(searchTerm)
            ? partyRepository.GetAllAsync(out partyCount)
            : partyRepository.SearchByNameAndNotesAsync(searchTerm);
        // I assume the intention of the Buffer was to load
        // the data into the UI in batches. If so, you can use Buffer from nuget
        // package Ix-Main like this to get IEnumerable<T> batched up
        // without splitting it up into unit sized pieces first
        return foundParties
            // this ToObs gets us into the monad
            // and returns IObservable<IEnumerable<Party>>
            .ToObservable()
            // the ToObs here gets us into the monad from
            // the IEnum<IList<Party>> returned by Buffer
            // and the SelectMany flattens so the output
            // is IObservable<IList<Party>>
            .SelectMany(x => x.Buffer(500).ToObservable())
            // placement of this is again important to avoid races updating the UI
            // erroneously putting it after the Switch is a very common bug
            .ObserveOn(SynchronizationContext.Current); 
    })
    // At this point we have IObservable<IObservable<IList<Party>>
    // Switch flattens and returns the most recent inner IObservable,
    // cancelling any previous pending set of batched results
    // superceded due to a textbox change
    // i.e. the previous inner IObservable<...> if it was incomplete
    // - it's the equivalent of your TakeUntil, but a bit neater
    .Switch() 
    .Subscribe(searchResults =>
    {
        this.parties.AddRange(searchResults);
        this.partyBindingSource.ResetBindings(false);
    },
    ex => { },
    () => { });
同步存储库接口可能是这样的:
public interface IPartyRepository
{
    IEnumerable<Party> GetAll(out long partyCount);
    IEnumerable<Party> SearchByNameAndNotes(string searchTerm);
}
就个人而言,我不建议像这样同步存储库接口.为什么?它通常会执行IO,因此您将浪费地阻塞线程.
你可能会说客户端可以从后台线程调用,或者你可以将他们的调用包装在一个任务中 - 但这不是我认为正确的方法.
无论如何,接受上述,一种实现的方式是这样的(当然它大部分类似于异步版本所以我只注释了差异):
var searchStream = Observable.FromEventPattern(
    s => txtSearch.TextChanged += s,
    s => txtSearch.TextChanged -= s)
    .Select(evt => txtSearch.Text)
    .Throttle(TimeSpan.FromMilliseconds(300))
    .DistinctUntilChanged()
    .ObserveOn(SynchronizationContext.Current)
    .Do(_ =>
    {
        this.parties.Clear();
        this.partyBindingSource.ResetBindings(false);
    })       
    .Select(searchTerm =>
        // Here we wrap the synchronous repository into an
        // async call. Note it's simply not enough to call
        // ToObservable(Scheduler.Default) on the enumerable
        // because this can actually still block up to the point that the
        // first result is yielded. Doing as we have here,
        // we guarantee the UI stays responsive
        Observable.Start(() =>
        {
            long partyCount;
            var foundParties = string.IsNullOrEmpty(searchTerm)
                ? partyRepository.GetAll(out partyCount)
                : partyRepository.SearchByNameAndNotes(searchTerm);
            return foundParties;
        }) // Note you can supply a scheduler, default is Scheduler.Default
        .SelectMany(x => x.Buffer(500).ToObservable())
        .ObserveOn(SynchronizationContext.Current))
    .Switch()
    .Subscribe(searchResults =>
    {
        this.parties.AddRange(searchResults);
        this.partyBindingSource.ResetBindings(false);
    },
    ex => { },
    () => { });