微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

c# – Reactive框架消除了Observable.Create中的while循环

我正在使用一个可观察的字节流,从网络出来,我想把它带到一层抽象.格式有两个字节,包含下一条消息的长度.我想很好地适应反应框架.到目前为止,我感觉不太正确,所以我想知道我可能错过了什么技巧来消除这里的while循环.

这是我想到的概念:

public static IObservable<Stream> GetToplevelStreams(IObservable<byte> byteStreamArg) {
    return Observable.Create((IObserver<Stream>o)=>{
        bool done = false;
        var byteStream = byteStreamArg.Do(
            b => { },(ex) => { done = true; },() => { done = true; });
        while (!done)
        {
            var size = byteStream.Take(2).
                           Aggregate(0,(n,b) => (n << 8) + b).Single();
            var buf = byteStream.Skip(2).Take(size);
            var stream = new MemoryStream(buf.ToEnumerable().ToArray());
            if (!done)
            {
                o.OnNext(stream);
            }
        }
        return (() => {});
    });
}

解决方法

IObservable在这里有点奇怪 – 记住你正在返回一个“未来流列表” – 我实际上只是返回一个Stream,或者可能是IObservable< byte []>,其中每个数组代表一条消息.或者做得更好,并返回IObservable< ParsedMessage>

此外,你的While循环使这非同步并且行为奇怪.怎么样更像这样的东西:

public static IObservable<System.IO.Stream> GetToplevelStreams(IObservable<byte> byteStream)
{
    return Observable.Create((IObserver<System.IO.Stream> o) =>
    {
        int? size1=null;
        int? size=null;
        var buf = new MemoryStream();
        var subscription = byteStream.Subscribe(v =>
        {
            if (!size1.HasValue)
            {
                size1 = ((int)v) << 8;
            }
            else if (!size.HasValue)
            {
                size = size1.Value + v;
            }
            else
            {
                buf.WriteByte(v);
            }
            if (size.HasValue && buf.Length == size)
            {
                buf.Position = 0;
                o.OnNext(buf);
                buf.SetLength(0);
                size1 = null;
                size = null;
            }

        },(ex)=>o.OnError(ex),()=>o.OnCompleted());
        return () => subscription.dispose();
    });
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐