C# Rabbitmq 多个队列多个线程消费

C# Rabbitmq 多个队列多个线程消费

猿掌柜
2023-07-21 / 1 评论 / 13 阅读 / 正在检测是否收录...

代码如下

private async static void RunMq(BoxInfo item, string queue,int phase)
        {
            Random random = new Random();
            int rallyNumber = random.Next(1, 1000);
            await Task.Run(() =>{
                try
                {
                    Console.WriteLine(queue);
                    int asyncCount = 1;
                    List<Task<bool>> tasks = new List<Task<bool>>();
                    var connection = RabbitMQFactory.SharedConnection;
                    for (int i = 1; i <= asyncCount; i++)
                    {
                        tasks.Add(Task.Factory.StartNew(() => MessageWorkItemCallback(connection, rallyNumber, queue, item, phase)));
                    }
                    Task.WaitAll(tasks.ToArray());
                }
                catch (Exception ex)
                {
                    Console.WriteLine($"消费异常:{ex.Message}");
                }
                
            });
            
        }

第二段

private static bool MessageWorkItemCallback(object state, int rallyNumber,string queue, BoxInfo item, int phase)
        {
            bool syncResult = false;
            IModel channel = null;
            try
            {
                IConnection connection = state as IConnection;
                //不能使用using (channel = connection.CreateModel())来创建信道,让RabbitMQ自动回收channel。
                channel = connection.CreateModel();
                channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
                channel.QueueDeclare(queue, false, false, false, null);
                var consumer = new EventingBasicConsumer(channel);
                channel.BasicConsume(queue: queue, autoAck: false, consumer: consumer);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body.ToArray();
                    if (body.Length > 1028)
                    {
                        TreatData.ConsumeData(body, item, phase);
                    }
                    else
                    {
                        string message = Encoding.UTF8.GetString(ea.Body.ToArray());
                        Console.WriteLine($"随机ID: {rallyNumber} Received  {message}");
                    }
                    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                };
                syncResult = true;
            }
            catch (Exception ex)
            {
                syncResult = false;
                Console.WriteLine(ex.Message);
            }
            return syncResult;
        }
2

评论 (1)

取消
  1. 头像
    游客 作者
    Linux · Google Chrome

    123123

    回复