安装
CentOS安装 kafka
Kafka : http://kafka.apache.org/downloads
ZooLeeper : https://zookeeper.apache.org/releases.html
下载并解压
# 下载,并解压 $ wget https://archive.apache.org/dist/kafka/2.1.1/kafka_2.12-2.1.1.tgz $ tar -zxvf kafka_2.12-2.1.1.tgz $ mv kafka_2.12-2.1.1.tgz /data/kafka # 下载 zookeeper,解压 $ wget https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.5.8/apache-zookeeper-3.5.8-bin.tar.gz $ tar -zxvf apache-zookeeper-3.5.8-bin.tar.gz $ mv apache-zookeeper-3.5.8-bin /data/zookeeper
启动 ZooKeeper
# 复制配置模版 $ cd /data/kafka/conf $ cp zoo_sample.cfg zoo.cfg # 看看配置需不需要改 $ vim zoo.cfg # 命令 $ ./bin/zkServer.sh start # 启动 $ ./bin/zkServer.sh status # 状态 $ ./bin/zkServer.sh stop # 停止 $ ./bin/zkServer.sh restart # 重启 # 使用客户端测试 $ ./bin/zkCli.sh -server localhost:2181 $ quit
启动 Kafka
# 备份配置 $ cd /data/kafka $ cp config/server.properties config/server.properties_copy # 修改配置 $ vim /data/kafka/config/server.properties # 集群配置下,每个 broker 的 id 是必须不同的 # broker.id=0 # 监听地址设置(内网) # listeners=PLAINTEXT://ip:9092 # 对外提供服务的IP、端口 # advertised.listeners=PLAINTEXT://106.75.84.97:9092 # 修改每个topic的默认分区参数num.partitions,默认是1,具体合适的取值需要根据服务器配置进程确定,UCloud.ukafka = 3 # num.partitions=3 # zookeeper 配置 # zookeeper.connect=localhost:2181 # 通过配置启动 kafka $ ./bin/kafka-server-start.sh config/server.properties& # 状态查看 $ ps -ef|grep kafka $ jps
docker下安装Kafka
docker pull wurstmeister/zookeeper docker run -d --name zookeeper -p 2181:2181 wurstmeister/zookeeper
docker pull wurstmeister/kafka docker run -d --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAME=192.168.1.111 --env KAFKA_ADVERTISED_PORT=9092 wurstmeister/kafka
介绍
kafka partition 和 consumer 数目关系
在 .NET Core 项目中安装组件
Install-Package Confluent.Kafka
开源地址: https://github.com/confluentinc/confluent-kafka-dotnet
添加 IKafkaService 服务接口
public interface IKafkaService { /// <summary> /// 发送消息至指定主题 /// </summary> /// <typeparam name="TMessage"></typeparam> /// <param name="topicName"></param> /// <param name="message"></param> /// <returns></returns> Task PublishAsync<TMessage>(string topicName, TMessage message) where TMessage : class; /// <summary> /// 从指定主题订阅消息 /// </summary> /// <typeparam name="TMessage"></typeparam> /// <param name="topics"></param> /// <param name="messageFunc"></param> /// <param name="cancellationToken"></param> /// <returns></returns> Task SubscribeAsync<TMessage>(IEnumerable<string> topics, Action<TMessage> messageFunc, CancellationToken cancellationToken) where TMessage : class; }
实现 IKafkaService
public class KafkaService : IKafkaService { public async Task PublishAsync<TMessage>(string topicName, TMessage message) where TMessage : class { var config = new ProducerConfig { BootstrapServers = "127.0.0.1:9092" }; using var producer = new ProducerBuilder<string, string>(config).Build(); await producer.ProduceAsync(topicName, new Message<string, string> { Key = Guid.NewGuid().ToString(), Value = message.SerializeToJson() }); } public async Task SubscribeAsync<TMessage>(IEnumerable<string> topics, Action<TMessage> messageFunc, CancellationToken cancellationToken) where TMessage : class { var config = new ConsumerConfig { BootstrapServers = "127.0.0.1:9092", GroupId = "crow-consumer", EnableAutoCommit = false, StatisticsIntervalMs = 5000, SessionTimeoutMs = 6000, AutoOffsetReset = AutoOffsetReset.Earliest, EnablePartitionEof = true }; //const int commitPeriod = 5; using var consumer = new ConsumerBuilder<Ignore, string>(config) .SetErrorHandler((_, e) => { Console.WriteLine($"Error: {e.Reason}"); }) .SetStatisticsHandler((_, json) => { Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} > 消息监听中.."); }) .SetPartitionsAssignedHandler((c, partitions) => { string partitionsStr = string.Join(", ", partitions); Console.WriteLine($" - 分配的 kafka 分区: {partitionsStr}"); }) .SetPartitionsRevokedHandler((c, partitions) => { string partitionsStr = string.Join(", ", partitions); Console.WriteLine($" - 回收了 kafka 的分区: {partitionsStr}"); }) .Build(); consumer.Subscribe(topics); try { while (true) { try { var consumeResult = consumer.Consume(cancellationToken); Console.WriteLine($"Consumed message '{consumeResult.Message?.Value}' at: '{consumeResult?.TopicPartitionOffset}'."); if (consumeResult.IsPartitionEOF) { Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} 已经到底了:{consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}."); continue; } TMessage messageResult = null; try { messageResult = JsonConvert.DeserializeObject<TMessage>(consumeResult.Message.Value); } catch (Exception ex) { var errorMessage = $" - {DateTime.Now:yyyy-MM-dd HH:mm:ss}【Exception 消息反序列化失败,Value:{consumeResult.Message.Value}】 :{ex.StackTrace?.ToString()}"; Console.WriteLine(errorMessage); messageResult = null; } if (messageResult != null/* && consumeResult.Offset % commitPeriod == 0*/) { messageFunc(messageResult); try { consumer.Commit(consumeResult); } catch (KafkaException e) { Console.WriteLine(e.Message); } } } catch (ConsumeException e) { Console.WriteLine($"Consume error: {e.Error.Reason}"); } } } catch (OperationCanceledException) { Console.WriteLine("Closing consumer."); consumer.Close(); } await Task.CompletedTask; } }
注入 IKafkaService ,在需要使用的地方直接调用即可。
public class MessageService : IMessageService, ITransientDependency { private readonly IKafkaService _kafkaService; public MessageService(IKafkaService kafkaService) { _kafkaService = kafkaService; } public async Task RequestTraceAdded(XxxEventData eventData) { await _kafkaService.PublishAsync(eventData.TopicName, eventData); } }
以上相当于一个生产者,当我们消息队列发出后,还需一个消费者进行消费,所以可以使用一个控制台项目接收消息来处理业务。
var cts = new CancellationTokenSource(); Console.CancelKeyPress += (_, e) => { e.Cancel = true; cts.Cancel(); }; await kafkaService.SubscribeAsync<XxxEventData>(topics, async (eventData) => { // Your logic Console.WriteLine($" - {eventData.EventTime:yyyy-MM-dd HH:mm:ss} 【{eventData.TopicName}】- > 已处理"); }, cts.Token);
在 IKafkaService 中已经写了订阅消息的接口,这里也是注入后直接使用即可。
生产者消费者示例
生产者
static async Task Main(string[] args) { if (args.Length != 2) { Console.WriteLine("Usage: .. brokerList topicName"); // 127.0.0.1:9092 helloTopic return; } var brokerList = args.First(); var topicName = args.Last(); var config = new ProducerConfig { BootstrapServers = brokerList }; using var producer = new ProducerBuilder<string, string>(config).Build(); Console.WriteLine("\n-----------------------------------------------------------------------"); Console.WriteLine($"Producer {producer.Name} producing on topic {topicName}."); Console.WriteLine("-----------------------------------------------------------------------"); Console.WriteLine("To create a kafka message with UTF-8 encoded key and value:"); Console.WriteLine("> key value<Enter>"); Console.WriteLine("To create a kafka message with a null key and UTF-8 encoded value:"); Console.WriteLine("> value<enter>"); Console.WriteLine("Ctrl-C to quit.\n"); var cancelled = false; Console.CancelKeyPress += (_, e) => { e.Cancel = true; cancelled = true; }; while (!cancelled) { Console.Write("> "); var text = string.Empty; try { text = Console.ReadLine(); } catch (IOException) { break; } if (string.IsNullOrWhiteSpace(text)) { break; } var key = string.Empty; var val = text; var index = text.IndexOf(" "); if (index != -1) { key = text.Substring(0, index); val = text.Substring(index + 1); } try { var deliveryResult = await producer.ProduceAsync(topicName, new Message<string, string> { Key = key, Value = val }); Console.WriteLine($"delivered to: {deliveryResult.TopicPartitionOffset}"); } catch (ProduceException<string, string> e) { Console.WriteLine($"failed to deliver message: {e.Message} [{e.Error.Code}]"); } } }
消费者
static void Main(string[] args) { if (args.Length != 2) { Console.WriteLine("Usage: .. brokerList topicName"); // 127.0.0.1:9092 helloTopic return; } var brokerList = args.First(); var topicName = args.Last(); Console.WriteLine($"Started consumer, Ctrl-C to stop consuming"); var cts = new CancellationTokenSource(); Console.CancelKeyPress += (_, e) => { e.Cancel = true; cts.Cancel(); }; var config = new ConsumerConfig { BootstrapServers = brokerList, GroupId = "consumer", EnableAutoCommit = false, StatisticsIntervalMs = 5000, SessionTimeoutMs = 6000, AutoOffsetReset = AutoOffsetReset.Earliest, EnablePartitionEof = true }; const int commitPeriod = 5; using var consumer = new ConsumerBuilder<Ignore, string>(config) .SetErrorHandler((_, e) => { Console.WriteLine($"Error: {e.Reason}"); }) .SetStatisticsHandler((_, json) => { Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} > monitoring.."); //Console.WriteLine($"Statistics: {json}"); }) .SetPartitionsAssignedHandler((c, partitions) => { Console.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}]"); }) .SetPartitionsRevokedHandler((c, partitions) => { Console.WriteLine($"Revoking assignment: [{string.Join(", ", partitions)}]"); }) .Build(); consumer.Subscribe(topicName); try { while (true) { try { var consumeResult = consumer.Consume(cts.Token); if (consumeResult.IsPartitionEOF) { Console.WriteLine($"Reached end of topic {consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}."); continue; } Console.WriteLine($"Received message at {consumeResult.TopicPartitionOffset}: {consumeResult.Message.Value}"); if (consumeResult.Offset % commitPeriod == 0) { try { consumer.Commit(consumeResult); } catch (KafkaException e) { Console.WriteLine($"Commit error: {e.Error.Reason}"); } } } catch (ConsumeException e) { Console.WriteLine($"Consume error: {e.Error.Reason}"); } } } catch (OperationCanceledException) { Console.WriteLine("Closing consumer."); consumer.Close(); } }
到此这篇关于.NET Core下使用Kafka的方法步骤的文章就介绍到这了,更多相关.NET Core使用Kafka内容请搜索小牛知识库以前的文章或继续浏览下面的相关文章希望大家以后多多支持小牛知识库!
本文向大家介绍在Spring Boot应用程序中使用Apache Kafka的方法步骤详解,包括了在Spring Boot应用程序中使用Apache Kafka的方法步骤详解的使用技巧和注意事项,需要的朋友参考一下 第1步:生成我们的项目: Spring Initializr 来生成我们的项目。我们的项目将提供Spring MVC / Web支持和Apache Kafka支持。 第2步:发布/读取
本文向大家介绍CentOS使用EPEL源的方法步骤,包括了CentOS使用EPEL源的方法步骤的使用技巧和注意事项,需要的朋友参考一下 个人非常喜欢用Ubuntu,因为Debian强大的在线在线安装包。但某些时候我不得不用CentOS,这就得用著名的EPEL源了。 EPEL (Extra Packages for Enterprise Linux)是基于Fedora的一个项目,为“红帽系”的操作系
我构建了一个ASP。NET核心应用程序,我创建了一个用于单元测试的.NET核心类库。 我想在我的库中使用(获取文件的物理路径),因此我在启动时添加了这一行。我的ASP。NET核心应用程序: 在库中,我添加了对我的ASP.NET应用程序的引用,在我的类中,我写了这个: 但是当我运行它时,它会给我这个错误: 以下构造函数参数没有匹配的设备日期:IHostingEnvironment env 有什么问题
本文向大家介绍QT5编译使用QFtp的方法步骤,包括了QT5编译使用QFtp的方法步骤的使用技巧和注意事项,需要的朋友参考一下 背景 使用 QNetworkAccessManager 可以实现 Ftp 的上传/下载功能,但它没有提供例如list、cd、remove、mkdir、rmdir、rename 等功能。这种情况下,我们可以使用QFtp,需要下载源码、编译并处理一些坑。 下载 从 GitHu
本文向大家介绍.net下Quartz.Net的使用方法,包括了.net下Quartz.Net的使用方法的使用技巧和注意事项,需要的朋友参考一下 Quartz.net是作业调度框架,具体内容如下 1. 项目中添加quartz.net的引用(这里使用nuget管理) 新建一个类TimingJob,该类主要用于实现任务逻辑 在Program.cs中:(这里是控制台应用程序) 程序运行时,经过5秒后,每间
本文向大家介绍springboot下使用mybatis的方法,包括了springboot下使用mybatis的方法的使用技巧和注意事项,需要的朋友参考一下 使用mybatis-spring-boot-starter即可。 简单来说就是mybatis看见spring boot这么火,于是搞出来mybatis-spring-boot-starter这个解决方案来与springboot更好的集成 详见