下面一个客户端代码例子访问kafka服务器,来发送和接受消息。
使用方式
1、命令行参数
$ ./kafkaclient -h Usage of ./client: -ca string CA Certificate (default \"ca.pem\") -cert string Client Certificate (default \"cert.pem\") -command string consumer|producer (default \"consumer\") -host string Common separated kafka hosts (default \"localhost:9093\") -key string Client Key (default \"key.pem\") -partition int Kafka topic partition -tls TLS enable -topic string Kafka topic (default \"test--topic\")
2、作为producer启动
$ ./kafkaclient -command producer \\ -host kafka1:9092,kafka2:9092 ## TLS-enabled $ ./kafkaclient -command producer \\ -tls -cert client.pem -key client.key -ca ca.pem \\ -host kafka1:9093,kafka2:9093
producer发送消息给kafka:
> aaa 2018/12/15 07:11:21 Produced message: [aaa] > bbb 2018/12/15 07:11:30 Produced message: [bbb] > quit
3、作为consumer启动
$ ./kafkaclient -command consumer \\ -host kafka1:9092,kafka2:9092 ## TLS-enabled $ ./kafkaclient -command consumer \\ -tls -cert client.pem -key client.key -ca ca.pem \\ -host kafka1:9093,kafka2:9093
consumer从kafka接受消息:
2018/12/15 07:11:21 Consumed message: [aaa], offset: [4]
2018/12/15 07:11:30 Consumed message: [bbb], offset: [5]
完整源代码如下
这个代码使用到了Shopify/sarama库,请自行下载使用。
$ cat kafkaclient.go
package main
import (
\"flag\"
\"fmt\"
\"log\"
\"os\"
\"io/ioutil\"
\"bufio\"
\"strings\"
\"crypto/tls\"
\"crypto/x509\"
\"github.com/Shopify/sarama\"
)
var (
command string
tlsEnable bool
hosts string
topic string
partition int
clientcert string
clientkey string
cacert string
)
func main() {
flag.StringVar(&command, \"command\", \"consumer\", \"consumer|producer\")
flag.BoolVar(&tlsEnable, \"tls\", false, \"TLS enable\")
flag.StringVar(&hosts, \"host\", \"localhost:9093\", \"Common separated kafka hosts\")
flag.StringVar(&topic, \"topic\", \"test--topic\", \"Kafka topic\")
flag.IntVar(&partition, \"partition\", 0, \"Kafka topic partition\")
flag.StringVar(&clientcert, \"cert\", \"cert.pem\", \"Client Certificate\")
flag.StringVar(&clientkey, \"key\", \"key.pem\", \"Client Key\")
flag.StringVar(&cacert, \"ca\", \"ca.pem\", \"CA Certificate\")
flag.Parse()
config := sarama.NewConfig()
if tlsEnable {
//sarama.Logger = log.New(os.Stdout, \"[sarama] \", log.LstdFlags)
tlsConfig, err := genTLSConfig(clientcert, clientkey, cacert)
if err != nil {
log.Fatal(err)
}
config.Net.TLS.Enable = true
config.Net.TLS.Config = tlsConfig
}
client, err := sarama.NewClient(strings.Split(hosts, \",\"), config)
if err != nil {
log.Fatalf(\"unable to create kafka client: %q\", err)
}
if command == \"consumer\" {
consumer, err := sarama.NewConsumerFromClient(client)
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
loopConsumer(consumer, topic, partition)
} else {
producer, err := sarama.NewAsyncProducerFromClient(client)
if err != nil {
log.Fatal(err)
}
defer producer.Close()
loopProducer(producer, topic, partition)
}
}
func genTLSConfig(clientcertfile, clientkeyfile, cacertfile string) (*tls.Config, error) {
// load client cert
clientcert, err := tls.LoadX509KeyPair(clientcertfile, clientkeyfile)
if err != nil {
return nil, err
}
// load ca cert pool
cacert, err := ioutil.ReadFile(cacertfile)
if err != nil {
return nil, err
}
cacertpool := x509.NewCertPool()
cacertpool.AppendCertsFromPEM(cacert)
// generate tlcconfig
tlsConfig := tls.Config{}
tlsConfig.RootCAs = cacertpool
tlsConfig.Certificates = []tls.Certificate{clientcert}
tlsConfig.BuildNameToCertificate()
// tlsConfig.InsecureSkipVerify = true // This can be used on test server if domain does not match cert:
return &tlsConfig, err
}
func loopProducer(producer sarama.AsyncProducer, topic string, partition int) {
scanner := bufio.NewScanner(os.Stdin)
fmt.Print(\"> \")
for scanner.Scan() {
text := scanner.Text()
if text == \"\" {
} else if text == \"exit\" || text == \"quit\" {
break
} else {
producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)}
log.Printf(\"Produced message: [%s]\\n\",text)
}
fmt.Print(\"> \")
}
}
func loopConsumer(consumer sarama.Consumer, topic string, partition int) {
partitionConsumer, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest)
if err != nil {
log.Println(err)
return
}
defer partitionConsumer.Close()
for {
msg := <-partitionConsumer.Messages()
log.Printf(\"Consumed message: [%s], offset: [%d]\\n\", msg.Value, msg.Offset)
}
}
编译:
$ go build kafkaclient.go
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。
继续阅读与本文标签相同的文章
上一篇 :
5G眼镜首次为进博会“保驾护航”
下一篇 :
衣联网许润丰:从45天20套,到4个月10万套!
-
一眼望去 都是中国好CP的形状
2026-05-19栏目: 教程
-
前端开发深水区讨论
2026-05-19栏目: 教程
-
精读《使用 css 变量生成颜色主题》
2026-05-19栏目: 教程
-
震撼!全球首台“智慧旅游黑科技车”现身井陉……
2026-05-19栏目: 教程
-
科技产品没有时尚力?搜狗AI录音笔,要科技更要时尚
2026-05-19栏目: 教程
