VB.net 2010 视频教程 VB.net 2010 视频教程 python基础视频教程
SQL Server 2008 视频教程 c#入门经典教程 Visual Basic从门到精通视频教程
当前位置:
首页 > 编程开发 > c#教程 >
  • C#教程之Winows上简单配置使用kafka(.net使用)

本站最新发布   C#从入门到精通
试听地址  
https://www.xin3721.com/eschool/CSharpxin3721/

一、kafka环境配置

1.jdk安装

安装文件:http://www.oracle.com/technetwork/java/javase/downloads/index.html 下载JDK
安装完成后需要添加以下的环境变量(右键点击“我的电脑” -> "高级系统设置" -> "环境变量" ):

JAVA_HOME: C:\Program Files\Java\jdk-13.0.1(jdk的安装路径)

Path: 现有值后追加 "%JAVA_HOME%\bin"

 

 

 

 

 

 2.zookeeper安装

Kafka的运行依赖于Zookeeper,所以在运行Kafka之前我们需要安装并运行Zookeeper

下载安装文件: http://zookeeper.apache.org/releases.html

解压文件 apache-zookeeper-3.5.6-bin.tar

打开zookeeper-3.5.6\conf,把zoo_sample.cfg重命名成zoo.cfg
从文本编辑器里打开zoo.cfg, 把dataDir的值改成“./apache-zookeeper-3.5.6/data”
添加如下系统变量:

ZOOKEEPER_HOME: C:\Users\Yc\work\apache-zookeeper-3.5.6 (zookeeper目录)

Path: 在现有的值后面添加 ";%ZOOKEEPER_HOME%\bin;"

 

 

 

 运行Zookeeper: 打开cmd然后执行 zkserver

 

 3.安装并运行kafka

 下载安装文件: http://kafka.apache.org/downloads.html
 解压文件
 打开kafka_2.12-2.3.0\config
 从文本编辑器里打开 server.properties
 修改:log.dirs=./logs
     listeners=PLAINTEXT://localhost:9092
 打开cmd
 执行命令:C:\Users\Yc>cd C:\Users\Yc\work\kafka_2.12-2.3.0(进入此目录中)
  再执行:.\bin\windows\kafka-server-start.bat .\config\server.properties

 

 4.创建Topics

cmd执行命令:cd C:\Users\Yc\work\kafka_2.12-2.3.0\bin\windows

kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

执行成功后出现 :Created topic test

 

 

 5.生产者使用测试

打开cmd窗口执行命令:cd C:\Users\Yc\work\kafka_2.12-2.3.0\bin\windows

kafka-console-producer.bat --broker-list localhost:9092 --topic test

 

 

6.消费者使用测试

 打开cmd窗口执行命令:cd C:\Users\Yc\work\kafka_2.12-2.3.0\bin\windows

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning

 

 7..net程序消费者简单使用

引入Confluent.kafka包

 

 

 

复制代码
 public static void Main(string[] args)
        {
            //地址及端口号
            var conf = new ProducerConfig { BootstrapServers = "localhost:9092" };

            Action<DeliveryReport<Null, string>> handler = r =>
                Console.WriteLine(!r.Error.IsError
                    ? $"Delivered message to {r.TopicPartitionOffset}"
                    : $"Delivery Error: {r.Error.Reason}");

            using (var p = new ProducerBuilder<Null, string>(conf).Build())
            {
                //for (int i = 0; i < 100; ++i)
                //{
                
                    p.Produce("test", new Message<Null, string> { Value = "messagehowsf"}, handler);//kafka协议数据发送

                //}

                // wait for up to 10 seconds for any inflight messages to be delivered.
                p.Flush(TimeSpan.FromSeconds(10));
            }
        }
复制代码

相关教程