使用.net客户端连接到Kafka
这里推荐使用一个开源.net客户端:https://github.com/Jroland/kafka-net
Producer
1 var options = new KafkaOptions(new Uri("http://SERVER1:9092"), new Uri("http://SERVER2:9092"));2 var router = new BrokerRouter(options);3 var client = new Producer(router);4 5 client.SendMessageAsync("TestHarness", new[] { new Message("hello world")}).Wait();6 7 using (client) { }
Consumer
1 var options = new KafkaOptions(new Uri("http://SERVER1:9092"), new Uri("http://SERVER2:9092")); 2 var router = new BrokerRouter(options); 3 var consumer = new Consumer(new ConsumerOptions("TestHarness", router)); 4 5 //Consume returns a blocking IEnumerable (ie: never ending stream) 6 foreach (var message in consumer.Consume()) 7 { 8 Console.WriteLine("Response: P{0},O{1} : {2}", 9 message. .PartitionId, message. .Offset, message.Value); 10 }
完整测试代码
Producer生产者:
1 class Program 2 { 3 static void Main(string[] args) 4 { 5 do 6 { 7 Produce(GetKafkaBroker(), getTopicName()); 8 System.Threading.Thread.Sleep(3000); 9 } while (true);10 }11 12 private static void Produce(string broker, string topic)13 {14 var options = new KafkaOptions(new Uri(broker));15 var router = new BrokerRouter(options);16 var client = new Producer(router);17 18 var currentDatetime =DateTime.Now;19 var key = currentDatetime.Second.ToString();20 var events = new[] { new Message("Hello World " + currentDatetime, key) };21 client.SendMessageAsync(topic, events).Wait(1500);22 Console.WriteLine("Produced: Key: {0}. Message: {1}", key, events[0].Value.ToUtf8String());23 24 using (client) { }25 }26 27 private static string GetKafkaBroker()28 {29 string KafkaBroker = string.Empty;30 const string kafkaBrokerKeyName = "KafkaBroker";31 32 if (!ConfigurationManager.AppSettings.AllKeys.Contains(kafkaBrokerKeyName))33 {34 KafkaBroker = "http://localhost:9092";35 }36 else37 {38 KafkaBroker = ConfigurationManager.AppSettings[kafkaBrokerKeyName];39 }40 return KafkaBroker;41 }42 private static string getTopicName()43 {44 string TopicName = string.Empty;45 const string topicNameKeyName = "Topic";46 47 if (!ConfigurationManager.AppSettings.AllKeys.Contains(topicNameKeyName))48 {49 throw new Exception("Key "" + topicNameKeyName + "" not found in Config file -> configuration/AppSettings");50 }51 else52 {53 TopicName = ConfigurationManager.AppSettings[topicNameKeyName];54 }55 return TopicName;56 }57 }
Consumer消费者:

1 class Program 2 { 3 static void Main(string[] args) 4 { 5 Consume(getKafkaBroker(), getTopicName()); 6 7 } 8 9 private static void Consume(string broker, string topic)10 { 11 var options = new KafkaOptions(new Uri(broker));12 var router = new BrokerRouter(options);13 var consumer = new Consumer(new ConsumerOptions(topic, router));14 15 //Consume returns a blocking IEnumerable (ie: never ending stream)16 foreach (var message in consumer.Consume())17 {18 Console.WriteLine("Response: Partition {0},Offset {1} : {2}",19 message. .PartitionId, message. .Offset, message.Value.ToUtf8String());20 }21 }22 23 private static string getKafkaBroker()24 {25 string KafkaBroker = string.Empty;26 var KafkaBrokerKeyName = "KafkaBroker";27 28 if (!ConfigurationManager.AppSettings.AllKeys.Contains(KafkaBrokerKeyName))29 {30 KafkaBroker = "http://localhost:9092";31 }32 else33 {34 KafkaBroker = ConfigurationManager.AppSettings[KafkaBrokerKeyName];35 }36 return KafkaBroker;37 }38 39 private static string getTopicName()40 {41 string TopicName = string.Empty;42 var TopicNameKeyName = "Topic";43 44 if (!ConfigurationManager.AppSettings.AllKeys.Contains(TopicNameKeyName))45 {46 throw new Exception("Key "" + TopicNameKeyName + "" not found in Config file -> configuration/AppSettings");47 }48 else49 {50 TopicName = ConfigurationManager.AppSettings[TopicNameKeyName];51 }52 return TopicName;53 }54 }
消息队列一
消息队列二
消息队列三
消息队列四
消息队列五
消息队列六
消息队列七
继续阅读与本文标签相同的文章
-
centos 7安装es 及异常处理
2026-06-02栏目: 教程
-
C#如何使用ES
2026-06-02栏目: 教程
-
异步tcp通信——APM.ConsoleDemo
2026-06-02栏目: 教程
-
异步tcp通信——APM.Server 消息推送服务的实现
2026-06-02栏目: 教程
-
异步tcp通信——APM.Core 解包
2026-06-02栏目: 教程

