我正在使用RabbitMQ处理其他人的项目,并且在出列数据和丢失数据方面遇到了麻烦.
我发布时,数据全部作为字符串存在,并且它也正确地存在于RabbitMQ队列中.当我关闭数据时,数据的数据就像用户ID一样,但其余部分都消失了.我在整个代码中都看了一下,我对RabbitMQ正在发生的事情表示相当肯定,当我出局时它就会发生.任何帮助将不胜感激.谢谢.
这是发布之前的代码.
private bool sendJobToMQ(EncodeJobModel job,string p_correlation_id,string p_request_routing_key) { JavaScriptSerializer ser = new JavaScriptSerializer(); StringBuilder sb_job = new StringBuilder(); ser.Serialize(job,sb_job); string rpc_reply_queue; ConnectionFactory factory = new ConnectionFactory(); factory.HostName = HOST_NAME; factory.VirtualHost = VHOST_NAME; factory.UserName = USERNAME; factory.Password = PASSWORD; IConnection rabconn = factory.CreateConnection(); IModel sender_channel = rabconn.CreateModel(); try { sender_channel.ExchangeDeclare(EXCHANGE_NAME,ExchangeType.Direct,true,false,null); } catch (Exception err) { logger.Error("Error Declaring Exchange " + EXCHANGE_NAME + ": " + err.ToString()); return false; } try { sender_channel.QueueDeclare(REQUEST_QUEUE,null); } catch (Exception err) { logger.Error("Error QueueDeclare (" + REQUEST_QUEUE + " true,null): " + err.ToString()); return false; } try { sender_channel.QueueBind(REQUEST_QUEUE,EXCHANGE_NAME,REQUEST_ROUTING_KEY,null); } catch (Exception err) { logger.Error("Error QueueBind (" + REQUEST_QUEUE + " -> " + EXCHANGE_NAME + " " + REQUEST_ROUTING_KEY + ",null): " + err.ToString()); return false; } //rpc_reply_queue = sender_channel.QueueDeclare("rq_" + job.encodejob_id.ToString(),null); //////bind the rpc reply queue to the exchange via a routing key (I appended _routingkey to signify this) //sender_channel.QueueBind(rpc_reply_queue,rpc_reply_queue + "_routingkey"); //// Not sure what the props object is for yet but you can try to pass null in the mean time - Steve "Apeshit" Han BasicProperties props = new BasicProperties(); props.CorrelationId = p_correlation_id; //props.ReplyTo = rpc_reply_queue; try { sender_channel.Basicpublish(EXCHANGE_NAME,props,Encoding.UTF8.GetBytes(sb_job.ToString())); }
以及出列的代码.
queueingBasicConsumer consumer = new queueingBasicConsumer(p_channel); string consumerTag = p_channel.BasicConsume(p_queue,consumer); if (_is_console && Environment.UserInteractive) Console.WriteLine("Listening..."); while (m_Listen) { try { //get the properties of the message,including the ReplyTo queue,to which we can append '_routingkey' (designated by me),to reply with messages BasicDeliverEventArgs e; Object message; if (!consumer.Queue.Dequeue(4000,out message)) { // we do not wait to indefinitely block on waiting for the queue // if nothing in queue continue loop iteration and wait again continue; } // cast as necessary back to BasicDeliverEventArgs e = (BasicDeliverEventArgs)message; IBasicProperties props = e.BasicProperties; //get the Correlation ID sent by the client to track the job string client_correlation_id = props.CorrelationId; // I left out the reply_to field in the wizard,it can be set back in ApiEncodeServiceDefault - Steve "Smurfing Smurf" Han //string reply_to = props.ReplyTo; //get the body of the request byte[] body = e.Body; string body_result = Encoding.UTF8.GetString(body); bool redelivered = e.Redelivered;
e.Body字符串缺少数据.
解决方法
如果你没有任何消息,为什么继续
最好阻止,直到你收到一条消息,否则这个过程没有意义(没有数据工作?)
试试这样
最好阻止,直到你收到一条消息,否则这个过程没有意义(没有数据工作?)
试试这样
queueingBasicConsumer consumer = new queueingBasicConsumer(channel); channel.BasicConsume(queueName,null,consumer); while (m_Listen) { try { RabbitMQ.Client.Events.BasicDeliverEventArgs e = (RabbitMQ.Client.Events.BasicDeliverEventArgs) consumer.Queue.Dequeue(); IBasicProperties props = e.BasicProperties; byte[] body = e.Body; // ... process the message channel.BasicAck(e.DeliveryTag,false); } catch (OperationInterruptedException ex) { // The consumer was removed,either through // channel or connection closure,or through the // action of IModel.BasicCancel(). break; }
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。