我有以下API:
IObservable<IList<sqlDataRecord>> WritetoDBAndGetFailedSource(sqlConnection conn,IList<sqlDataRecord> batch)
它试图将批处理写入数据库.如果失败,则返回整个批处理,否则返回的observable为空.
我也有一个产生批次的来源:
IObservable<IList<sqlDataRecord>> GetDataSource(string filePath,int bufferThreshold)
现在,我可以像这样组合它们:
var FailedBatchesSource = GetDataSource(filePath,1048576) .Select(batch => WritetoDBAndGetFailedSource(conn,batch)) .Merge(100);
这将编写所有批次(最多100个并发)并返回可观察到的失败批次.
我真正想要的是在一段暂停之后将失败的批次送回批次的来源,可能是在原始来源仍在生产批次时.当然,我可以这样写:
var FailedBatchesSource = GetDataSource(filePath,batch)) .Merge(100) .Select(batch => WritetoDBAndGetFailedSource(conn,batch)) .Merge(100);
但当然,这是错误的,因为:
>这打破了在再次处理失败的批次之前暂停的要求.
>它可能会为数据库生成100多个并发写入请求.
>这就像解开一个带有未知迭代次数的for循环 – 没有效果.
一旦我收集了所有的失败并在循环中重新开始,我也可以打破可观察的monad:
var src = GetDataSource(filePath,1048576); for (;;) { var Failed = await src .Select(batch => WritetoDBAndGetFailedSource(conn,batch)) .Merge(100) .ToList(); if (Failed.Count == 0) { break; } src = Failed.ToObservable(); }
但是我想知道我是否可以在可观察的monad中保持更好的状态.
解决方法
我认为这可能会成功
public static IObservable<T> ProcessAll<T>(this IObservable<T> source,Func<T,IObservable<T>> processor,int mergeCount,TimeSpan failureDelay) { return Observable.Create<T>( observer => { var Failed = new Subject<T>(); return source.Merge(Failed) .Select(processor) .Merge(mergeCount) .Delay(failureDelay) .Subscribe(Failed.OnNext,observer.OnError,observer.OnCompleted); }); }
并像这样使用它:
GetDataSource(filePath,1048576) .ProcessAll(batch => WritetoDBAndGetFailedSource(conn,batch),100,TimeSpan.FromMilliseconds(500)) .Subscribe();
ProcessAll是一个可怕的名字,但它是星期五晚上,我想不出更好的名字.
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。