微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

c# – 如何使用Reactive Extensions对状态进行轮询?

使用Reactive( Database polling with Reactive Extensions)进行数据库轮询已经有了一个很好的问题

我有一个类似的问题,但有一个转折:我需要将前一个结果中的值提供给下一个请求.基本上,我想对此进行调查:

interface ResultSet<T>
{
   int? CurrentAsOfHandle {get;}
   IList<T> Results {get;}
}

Task<ResultSet<T>> GetNewResultsAsync<T>(int? prevIoUsRequestHandle);

我们的想法是返回自上一个请求以来的所有新项目

>每分钟我都想调用GetNewResultsAsync
>我想将前一次调用的CurrentAsOf作为参数传递给prevIoUsRequest参数
>下一次调用GetNewResultsAsync实际上应该在前一次调用后一分钟发生

基本上,有一个更好的方法

return Observable.Create<IMessage>(async (observer,cancellationToken) =>
{
    int? currentVersion = null;

    while (!cancellationToken.IsCancellationRequested)
    {
        MessageResultSet messageResultSet = await ReadLatestMessagesAsync(currentVersion);

        currentVersion = messageResultSet.CurrentVersionHandle;

        foreach (IMessage resultMessage in messageResultSet.Messages)
            observer.OnNext(resultMessage);

        await Task.Delay(TimeSpan.FromMinutes(1),cancellationToken);
    }
});

另请注意,此版本允许在等待下一次迭代时收集messageResultSet(例如,我想也许我可以使用Scan将先前的结果集对象传递到下一次迭代)

解决方法

您的问题基本上减少到:有一个带签名的扫描功能

IObservable<TAccumulate> Scan<TSource,TAccumulate>(this IObservable<TSource> source,TAccumulate initialValue,Func<TAccumulate,TSource,TAccumulate> accumulator)

但你需要类似的东西

IObservable<TAccumulate> Scan<TSource,IObservable<TAccumulate>> accumulator)

…累加器函数返回一个observable,Scan函数自动减少它以传入下一个调用.

这是一个穷人的扫描功能实现:

public static IObservable<TAccumulate> MyScan<TSource,TAccumulate> accumulator)
{
    return source
        .Publish(_source => _source
            .Take(1)
            .Select(s => accumulator(initialValue,s))
            .SelectMany(m => _source.MyScan(m,accumulator).StartWith(m))
        );
}

鉴于此,我们可以稍微改变一下以结合减少功能

public static IObservable<TAccumulate> MyObservableScan<TSource,IObservable<TAccumulate>> accumulator)
{
    return source
        .Publish(_source => _source
            .Take(1)
            .Select(s => accumulator(initialValue,s))
            .SelectMany(async o => (await o.LastOrDefaultAsync())
                .Let(m => _source
                    .MyObservableScan(m,accumulator)
                    .StartWith(m)
                )
            )
            .Merge()
        );
}

//Wrapper to accommodate easy Task -> Observable transformations
public static IObservable<TAccumulate> MyObservableScan<TSource,Task<TAccumulate>> accumulator)
{
    return source.MyObservableScan(initialValue,(a,s) => Observable.FromAsync(() => accumulator(a,s)));  
}

//Function to prevent re-evaluation in functional scenarios
public static U Let<T,U>(this T t,Func<T,U> selector)
{
    return selector(t);
}

现在我们有了这个花哨的MyObservableScan操作符,我们可以相对轻松地解决您的问题:

var o = Observable.Interval(TimeSpan.FromMinutes(1))
    .MyObservableScan<long,ResultSet<string>>(null,(r,_) => Methods.GetNewResultsAsync<string>(r?.CurrentAsOfHandle))

请注意,在测试中我注意到如果累加器Task / Observable函数的时间超过源中的间隔,则observable将终止.我不知道为什么.如果有人可以纠正,那就很有必要了.

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐