使用.net客户端连接到Kafka

这里推荐使用一个开源.net客户端:https://github.com/Jroland/kafka-net

Producer
1 var options = new KafkaOptions(new Uri("http://SERVER1:9092"), new Uri("http://SERVER2:9092"));2 var router = new BrokerRouter(options);3 var client = new Producer(router);4 5 client.SendMessageAsync("TestHarness", new[] { new Message("hello world")}).Wait();6 7 using (client) { }

 

Consumer
 1 var options = new KafkaOptions(new Uri("http://SERVER1:9092"), new Uri("http://SERVER2:9092")); 2 var router = new BrokerRouter(options); 3 var consumer = new Consumer(new ConsumerOptions("TestHarness", router)); 4  5 //Consume returns a blocking IEnumerable (ie: never ending stream) 6 foreach (var message in consumer.Consume()) 7 { 8     Console.WriteLine("Response: P{0},O{1} : {2}",  9         message. .PartitionId, message. .Offset, message.Value);  10 }

 

完整测试代码

Producer生产者:

 1 class Program 2     { 3         static void Main(string[] args) 4         { 5             do 6             { 7                 Produce(GetKafkaBroker(), getTopicName()); 8                 System.Threading.Thread.Sleep(3000); 9             } while (true);10         }11 12         private static void Produce(string broker, string topic)13         {14             var options = new KafkaOptions(new Uri(broker));15             var router = new BrokerRouter(options);16             var client = new Producer(router);17 18             var currentDatetime =DateTime.Now;19             var key = currentDatetime.Second.ToString();20             var events = new[] { new Message("Hello World " + currentDatetime, key) };21             client.SendMessageAsync(topic, events).Wait(1500);22             Console.WriteLine("Produced: Key: {0}. Message: {1}", key, events[0].Value.ToUtf8String());23 24             using (client) { }25         }26 27         private static string GetKafkaBroker()28         {29             string KafkaBroker = string.Empty;30             const string kafkaBrokerKeyName = "KafkaBroker";31 32             if (!ConfigurationManager.AppSettings.AllKeys.Contains(kafkaBrokerKeyName))33             {34                 KafkaBroker = "http://localhost:9092";35             }36             else37             {38                 KafkaBroker = ConfigurationManager.AppSettings[kafkaBrokerKeyName];39             }40             return KafkaBroker;41         }42         private static string getTopicName()43         {44             string TopicName = string.Empty;45             const string topicNameKeyName = "Topic";46 47             if (!ConfigurationManager.AppSettings.AllKeys.Contains(topicNameKeyName))48             {49                 throw new Exception("Key "" + topicNameKeyName + "" not found in Config file -> configuration/AppSettings");50             }51             else52             {53                 TopicName = ConfigurationManager.AppSettings[topicNameKeyName];54             }55             return TopicName;56         }57     }

 

Consumer消费者:

 1 class Program 2     { 3         static void Main(string[] args) 4         { 5             Consume(getKafkaBroker(), getTopicName()); 6  7         } 8  9         private static void Consume(string broker, string topic)10         {   11             var options = new KafkaOptions(new Uri(broker));12             var router = new BrokerRouter(options);13             var consumer = new Consumer(new ConsumerOptions(topic, router));14 15             //Consume returns a blocking IEnumerable (ie: never ending stream)16             foreach (var message in consumer.Consume())17             {18                 Console.WriteLine("Response: Partition {0},Offset {1} : {2}",19                     message. .PartitionId, message. .Offset, message.Value.ToUtf8String());20             }21         }22 23         private static string getKafkaBroker()24         {25             string KafkaBroker = string.Empty;26             var KafkaBrokerKeyName = "KafkaBroker";27 28             if (!ConfigurationManager.AppSettings.AllKeys.Contains(KafkaBrokerKeyName))29             {30                 KafkaBroker = "http://localhost:9092";31             }32             else33             {34                 KafkaBroker = ConfigurationManager.AppSettings[KafkaBrokerKeyName];35             }36             return KafkaBroker;37         }38 39         private static string getTopicName()40         {41             string TopicName = string.Empty;42             var TopicNameKeyName = "Topic";43 44             if (!ConfigurationManager.AppSettings.AllKeys.Contains(TopicNameKeyName))45             {46                 throw new Exception("Key "" + TopicNameKeyName + "" not found in Config file -> configuration/AppSettings");47             }48             else49             {50                 TopicName = ConfigurationManager.AppSettings[TopicNameKeyName];51             }52             return TopicName;53         }54     }
View Code

 

 

 

消息队列一

 

消息队列二

 

消息队列三

 

消息队列四

 

消息队列五

 

消息队列六

 

消息队列七

 

收藏 打印