代码如下
private async static void RunMq(BoxInfo item, string queue,int phase)
{
Random random = new Random();
int rallyNumber = random.Next(1, 1000);
await Task.Run(() =>{
try
{
Console.WriteLine(queue);
int asyncCount = 1;
List<Task<bool>> tasks = new List<Task<bool>>();
var connection = RabbitMQFactory.SharedConnection;
for (int i = 1; i <= asyncCount; i++)
{
tasks.Add(Task.Factory.StartNew(() => MessageWorkItemCallback(connection, rallyNumber, queue, item, phase)));
}
Task.WaitAll(tasks.ToArray());
}
catch (Exception ex)
{
Console.WriteLine($"消费异常:{ex.Message}");
}
});
}
第二段
private static bool MessageWorkItemCallback(object state, int rallyNumber,string queue, BoxInfo item, int phase)
{
bool syncResult = false;
IModel channel = null;
try
{
IConnection connection = state as IConnection;
//不能使用using (channel = connection.CreateModel())来创建信道,让RabbitMQ自动回收channel。
channel = connection.CreateModel();
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
channel.QueueDeclare(queue, false, false, false, null);
var consumer = new EventingBasicConsumer(channel);
channel.BasicConsume(queue: queue, autoAck: false, consumer: consumer);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
if (body.Length > 1028)
{
TreatData.ConsumeData(body, item, phase);
}
else
{
string message = Encoding.UTF8.GetString(ea.Body.ToArray());
Console.WriteLine($"随机ID: {rallyNumber} Received {message}");
}
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
syncResult = true;
}
catch (Exception ex)
{
syncResult = false;
Console.WriteLine(ex.Message);
}
return syncResult;
}
123123