我们使用Rebus作为sql server的队列系统.我们有几个不同类型的邮件收件人.每个消息都可以由某个类型的几个工作人员处理.
一条消息只应由一个工作人员处理/处理(第一个工作人员处理它).如果某个工作人员由于某种原因无法完成它,它会使用超时服务推迟该消息.
一条消息只应由一个工作人员处理/处理(第一个工作人员处理它).如果某个工作人员由于某种原因无法完成它,它会使用超时服务推迟该消息.
如果我已正确理解它,它将成为TimeoutRequest并放入
超时表.当重新运行时,它会在作为原始消息重新引入队列之前变为TimeoutReply.
我们遇到的问题是,当它成为TimeoutReply时,所有工作人员都会选择它并创建原始消息.当超时时,一条原始消息会变成几条消息(尽可能多的是工作人员).
我们的Rebus设置如下:
“服务器端”:
var adapter = new BuiltinContainerAdapter(); Configure.With(adapter) .Logging(l => l.Log4Net()) .Transport(t => t.UsesqlServerInOneWayClientMode(connectionString).EnsureTableIsCreated()) .CreateBus() .Start(); return adapter;
“工人方”:
_adapter = new BuiltinContainerAdapter(); Configure.With(_adapter) .Logging(l => l.Log4Net()) .Transport(t => t.UsesqlServer(_connectionString,_inputQueue,"error") .EnsureTableIsCreated()) .Events(x => x.AfterMessage += ((bus,exception,message) => SendWorkerFinishedJob(exception,message))) .Events(x => x.BeforeMessage += (bus,message) => SignalWorkerStartedJob(message)) .Behavior(x => x.SetMaxRetriesFor<Exception>(0)) .Timeouts(x => x.StoreInsqlServer(_connectionString,"timeouts").EnsureTableIsCreated()) .CreateBus().Start(numberOfWorkers);
任何帮助解决问题或提供理解的人都非常感谢!
解决方法
我可以想象为什么你最终会有多个超时回复的唯一原因是因为每个工作者都充当超时管理器,并且它们似乎共享相同的存储.
这样,由于超时管理器在查询到期超时时不使用任何类型的锁定或其他任何东西,它们最终可能会抢到相同的超时时间,这反过来会导致多次超时回复 – 这是一个竞争条件,但它会没有注意到因为this SQL没有注意到是否实际删除了一行).
我建议你要么a)为工人使用单独的超时表(例如_inputQueue“.timeouts”),要么b)让所有工人使用外部超时管理器(即省略超时(x => …)东西并启动一个独立的专用超时管理器.
在你的场景中,我猜(a)是最简单的方法,因为它非常接近你现在所拥有的.
我确实更喜欢(b)我自己,通常每台机器有一个超时管理器来托管Rebus端点.
如果能解决您的问题,请告诉我.
另外,我很想知道sql传输是如何为你工作的:)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。