VB.net 2010 视频教程 VB.net 2010 视频教程 python基础视频教程
SQL Server 2008 视频教程 c#入门经典教程 Visual Basic从门到精通视频教程
当前位置:
首页 > Python基础教程 >
  • C#队列学习笔记:RabbitMQ安装及使用(2)

void Main(string[] args) { #region Hello World //1.实例化连接工厂 var factory = new ConnectionFactory { HostName = "localhost", UserName = "hello", Password = "world" }; //2.建立连接 using (var connection = factory.CreateConnection()) { //3.建立信道 using (var channel = connection.CreateModel()) { //4.声明队列(指定durable:true,告知rabbitmq对消息进行持久化。) channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: null); //将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true var properties = channel.CreateBasicProperties(); properties.Persistent = true; //5.构建byte消息数据包 string message = args.Length > 0 ? args[0] : "Hello World"; var body = Encoding.UTF8.GetBytes(message);//消息是以二进制数组的形式传输的 //6.发送数据包(指定basicProperties) channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: properties, body: body); Console.WriteLine($"Send {message}"); Console.Read(); } } #endregion } }
复制代码
复制代码
    class Program
    {
        static void Main(string[] args)
        {
            #region Hello World
            //1.实例化连接工厂
            var factory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName = "hello",
                Password = "world"
            };
            //2.建立连接
            using (var connection = factory.CreateConnection())
            {
                //3.创建信道
                using (var channel = connection.CreateModel())
                {
                    //4.声明队列(指定durable:true,告知rabbitmq对消息进行持久化。)
                    channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: null);
                    //将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true
                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;
                    //设置prefetchCount : 1来告知RabbitMQ,在未收到消费端的消息确认时,不再分发消息。
                    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
                    //5.构造消费者实例
                    var consumer = new EventingBasicConsumer(channel);
                    //6.绑定消息接收后的事件委托
                    consumer.Received += (model, ea) =>
                    {
                        var message = Encoding.UTF8.GetString(ea.Body);
                        int dots = message.Split('.').Length - 1;
                        Thread.Sleep(dots * 1000);
                        Console.WriteLine($"Received {message}");
                        //7.发送消息确认信号(手动消息确认)
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                    //8.启动消费者(noAck: false 启用消息响应)
                    channel.BasicConsume(queue: "hello", noAck: false, consumer: consumer);
                    Console.Read();
                }
            }
            #endregion
        }
    }
复制代码

    四、Exchange

    上面的示例,生产者和消费者直接是通过相同队列名称进行匹配衔接的。消费者订阅某个队列,生产者创建消息发布到队列中,队列再将消息转发到订阅的消费者。这样就会有一个局限性,即消费者一次只能发送消息到某一个队列。

    那消费者如何才能发送消息到多个消息队列呢?

    RabbitMQ提供了Exchange,它类似于路由器的功能,对消息进行路由,将消息发送到多个队列上。Exchange一方面从生产者接收消息,另一方面将消息推送到队列。但是Exchange是如何知道将消息附加到哪个队列或者直接忽略的呢?这些其实是由Exchange Type来定义的。关于Exchange的图文介绍,请看上一篇《C#队列学习笔记:RabbitMQ基础知识》,此处仅提供示例代码。

    4.1、fanout

复制代码
    class Program
    {
        static void Main(string[] args)
        {
            #region fanout exchange type
            //1.实例化连接工厂
            var factory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName = "hello",
                Password = "world"
            };
            //2.建立连接
            using (var connection = factory.CreateConnection())
            {
                //3.建立信道
                using (var channel = connection.CreateModel())
                {
                    //4.使用fanout exchange type,指定exchange名称。
                    channel.ExchangeDeclare(exchange: "fanoutEC", type: "fanout");
                    //将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true
                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;
                    //5.构建byte消息数据包
                    for (int i = 1; i <= 10; i++)
                    {
                        string message = args.Length > 0 ? args[0] : "Hello World" + new string('.', i);
                        var body = Encoding.UTF8.GetBytes(message);//消息是以二进制数组的形式传输的
                        //6.发送数据包(指定exchange;fanout类型无需指定routingKey;指定basicProperties)
                        channel.BasicPublish(exchange: "fanoutEC", routingKey: "", basicProperties: properties, body: body);
                        Console.WriteLine($"Send {message}");
                    }
                    Console.Read();
                }
            }
            #endregion
        }
    }
复制代码
复制代码
    class Program
    {
        static void Main(string[] args)
        {
            #region fanout exchange type
            //1.实例化连接工厂
            var factory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName = "hello",
                Password = "world"
            };
            //2.建立连接
            using (var connection = factory.CreateConnection())
            {
                //3.创建信道
                using (var channel = connection.CreateModel())
                {
                    //4.使用fanout exchange type,指定exchange名称。
                    channel.ExchangeDeclare(exchange: "fanoutEC", type: "fanout");
                    //5.声明队列(随机生成队列名称)
                    var queueName = channel.QueueDeclare().QueueName;
                    //绑定队列到指定fanout类型exchange,fanout类型无需指定routingKey。
                    channel.QueueBind(queue: queueName, exchange: "fanoutEC", routingKey: "");
                    //将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true
                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;
                    //设置prefetchCount : 1来告知RabbitMQ,在未收到消费端的消息确认时,不再分发消息。
                    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
                    //6.构造消费者实例
                    var consumer = new EventingBasicConsumer(channel);
                    //7.绑定消息接收后的事件委托
                    consumer.Received += (model, ea) =>
                    {
                        var message = Encoding.UTF8.GetString(ea.Body);
                        int dots = message.Split('.').Length - 1;
                        Thread.Sleep(dots * 1000);
                        Console.WriteLine($"Received {message}");
                        //8.发送消息确认信号(手动消息确认)
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                    };
                    //9.启动消费者(noAck: false 启用消息响应)
                    channel.BasicConsume(queue: queueName, noAck: false, consumer: consumer);
                    Console.Read();
                }
            }
            #endregion
        }
    }
复制代码

    4.2、direct

复制代码
    class Program
    {
        static void Main(string[] args)
        {
            #region direct exchange type
            //1.实例化连接工厂
            var factory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName = "hello",
                Password = "world"
            };
            //2.建立连接
            using (var connection = factory.CreateConnection())
            {
                //3.建立信道
                using (var channel = connection.CreateModel())
                {
                    //4.使用direct exchange type,指定exchange名称。
                    channel.ExchangeDeclare(exchange: "directEC", type: "direct");
                    //将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true
                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;
                    //5.构建byte消息数据包
                    for (int i = 1; i <= 10; i++)
                    {
                        string message = args.Length > 0 ? args[0] : "Hello World" + new string('.', i);
                        var body = Encoding.UTF8.GetBytes(message);//消息是以二进制数组的形式传输的
                        //6.发送数据包(指定exchange;direct类型必须指定routingKey;指定basicProperties)
                        channel.BasicPublish(exchange: "directEC", routingKey: "green", basicProperties: properties, body: body);
                        Console.WriteLine($"Send {message}");
                    }
                    Console.Read();
                }
            }
            #endregion
        }
    }
复制代码
复制代码
    class Program
    {
        static void Main(string[] args)
        {
            #region direct exchange type
            //1.实例化连接工厂
            var factory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName = "hello",
                Password = "world"
            };
            //2.建立连接
            using (var connection = factory.CreateConnection())
            {
                //3.创建信道
                using (var channel = connection.CreateModel())
                {
                    //4.使用direct exchange type,指定exchange名称。
                    channel.ExchangeDeclare(exchange: "directEC", type: "direct");
                    //5.声明队列(随机生成队列名称)
                    var queueName = channel.QueueDeclare().QueueName;
                    //绑定队列到指定direct类型exchange,direct类型必须指定routingKey。
                    channel.QueueBind(queue: queueName, exchange: "directEC", routingKey: "green");
                    //将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true
                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;
                    //设置prefetchCount : 1来告知RabbitMQ,在未收到消费端的消息确认时,不再分发消息。
                    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
                    //6.构造消费者实例
                    var consumer = new EventingBasicConsumer(channel);
                    //7.绑定消息接收后的事件委托
                    consumer.Received += (model, ea) =>
                    {
                        var message = Encoding.UTF8.GetString(ea.Body);
                        int dots = message.Split('.').Length - 1;
                        Thread.Sleep(dots * 1000);
                        Console.WriteLine($"Received {message}");
                        //8.发送消息确认信号(手动消息确认)
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                    };
                    //9.启动消费者(noAck: false 启用消息响应)
                    channel.BasicConsume(queue: queueName, noAck: false, consumer: consumer);
                    Console.Read();
                }
            }
            #endregion
        }
    }
复制代码

    4.3、topic

复制代码
    class Program
    {
        static void Main(string[] args)
        {
            #region topic exchange type
            //1.实例化连接工厂
            var factory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName = "hello",
                Password = "world"
            };
            //2.建立连接
            using (var connection = factory.CreateConnection())
            {
                //3.建立信道
                using (var channel = connection.CreateModel())
                {
                    //4.使用topic exchange type,指定exchange名称。
                    channel.ExchangeDeclare(exchange: "topicEC", type: "topic");
                    //将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true
                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;
                    //5.构建byte消息数据包
                    for (int i = 1; i <= 10; i++)
                    {
                        string message = args.Length > 0 ? args[0] : "Hello World" + new string('.', i);
                        var body = Encoding.UTF8.GetBytes(message);//消息是以二进制数组的形式传输的
                        //6.发送数据包(指定exchange;topic类型必须指定routingKey;指定basicProperties)
                        channel.BasicPublish(exchange: "topicEC", routingKey: "first.green.fast", basicProperties: properties, body: body);
                        Console.WriteLine($"Send {message}");
                    }
                    Console.Read();
                }
            }
            #endregion
        }
    }
复制代码
复制代码
    class Program
    {
        static void Main(string[] args)
        {
            #region topic exchange type
            //1.实例化连接工厂
            var factory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName = "hello",
                Password = "world"
            };
            //2.建立连接
            using (var connection = factory.CreateConnection())
            {
                //3.创建信道
                using (var channel = connection.CreateModel())
                {
                    //4.使用topic exchange type,指定exchange名称。
                    channel.ExchangeDeclare(exchange: "topicEC", type: "topic");
                    //5.声明队列(随机生成队列名称)
                    var queueName = channel.QueueDeclare().QueueName;
                    //绑定队列到指定topic类型exchange,topic类型必须指定routingKey。
                    channel.QueueBind(queue: queueName, exchange: "topicEC", routingKey: "#.*.fast");
                    //将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true
                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;
                    //设置prefetchCount : 1来告知RabbitMQ,在未收到消费端的消息确认时,不再分发消息。
                    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
                    //6.构造消费者实例
                    var consumer = new EventingBasicConsumer(channel);
                    //7.绑定消息接收后的事件委托
                    consumer.Received += (model, ea) =>
                    {
                        var message = Encoding.UTF8.GetString(ea.Body);
                        int dots = message.Split('.').Length - 1;
                        Thread.Sleep(dots * 1000);
                        Console.WriteLine($"Received {message}");
                        //8.发送消息确认信号(手动消息确认)
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                    };
                    //9.启动消费者(noAck: false 启用消息响应)
                    channel.BasicConsume(queue: queueName, noAck: false, consumer: consumer);
                    Console.Read();
                }
            }
            #endregion
        }
    }
复制代码

    五、RPC

    RPC--Remote Procedure Call,远程过程调用。RabbitMQ是如何进行远程调用的呢?示意图如下:

    第一步:主要是进行远程调用的客户端需要指定接收远程回调的队列,并声明消费者监听此队列。

    第二步:远程调用的服务端除了要声明消费端接收远程调用请求外,还要将结果发送到客户端用来监听结果的队列中去。


相关教程
关于我们--广告服务--免责声明--本站帮助-友情链接--版权声明--联系我们       黑ICP备07002182号