下面一个客户端代码例子访问kafka服务器,来发送和接受消息。

使用方式

1、命令行参数

$ ./kafkaclient -h
Usage of ./client:
 -ca string
  CA Certificate (default \"ca.pem\")
 -cert string
  Client Certificate (default \"cert.pem\")
 -command string
  consumer|producer (default \"consumer\")
 -host string
  Common separated kafka hosts (default \"localhost:9093\")
 -key string
  Client Key (default \"key.pem\")
 -partition int
  Kafka topic partition
 -tls
  TLS enable
 -topic string
  Kafka topic (default \"test--topic\")

2、作为producer启动

$ ./kafkaclient -command producer \\
 -host kafka1:9092,kafka2:9092

## TLS-enabled
$ ./kafkaclient -command producer \\
 -tls -cert client.pem -key client.key -ca ca.pem \\
 -host kafka1:9093,kafka2:9093

producer发送消息给kafka:

> aaa
2018/12/15 07:11:21 Produced message: [aaa]
> bbb
2018/12/15 07:11:30 Produced message: [bbb]
> quit

3、作为consumer启动

$ ./kafkaclient -command consumer \\
 -host kafka1:9092,kafka2:9092

## TLS-enabled
$ ./kafkaclient -command consumer \\
 -tls -cert client.pem -key client.key -ca ca.pem \\
 -host kafka1:9093,kafka2:9093

consumer从kafka接受消息:

2018/12/15 07:11:21 Consumed message: [aaa], offset: [4]
2018/12/15 07:11:30 Consumed message: [bbb], offset: [5]

完整源代码如下

这个代码使用到了Shopify/sarama库,请自行下载使用。

$ cat kafkaclient.go
package main

import (
 \"flag\"
 \"fmt\"
 \"log\"
 \"os\"
 \"io/ioutil\"
 \"bufio\"
 \"strings\"

 \"crypto/tls\"
 \"crypto/x509\"

 \"github.com/Shopify/sarama\"
)

var (
 command  string
 tlsEnable bool
 hosts  string
 topic  string
 partition int
 clientcert string
 clientkey string
 cacert  string
)

func main() {
 flag.StringVar(&command, \"command\",  \"consumer\",   \"consumer|producer\")
 flag.BoolVar(&tlsEnable, \"tls\",   false,    \"TLS enable\")
 flag.StringVar(&hosts,  \"host\",   \"localhost:9093\", \"Common separated kafka hosts\")
 flag.StringVar(&topic,  \"topic\",  \"test--topic\",  \"Kafka topic\")
 flag.IntVar(&partition,  \"partition\", 0,     \"Kafka topic partition\")
 flag.StringVar(&clientcert, \"cert\",   \"cert.pem\",   \"Client Certificate\")
 flag.StringVar(&clientkey, \"key\",   \"key.pem\",   \"Client Key\")
 flag.StringVar(&cacert,  \"ca\",   \"ca.pem\",   \"CA Certificate\")
 flag.Parse()

 config := sarama.NewConfig()
 if tlsEnable {
  //sarama.Logger = log.New(os.Stdout, \"[sarama] \", log.LstdFlags)
  tlsConfig, err := genTLSConfig(clientcert, clientkey, cacert)
  if err != nil {
   log.Fatal(err)
  }

  config.Net.TLS.Enable = true
  config.Net.TLS.Config = tlsConfig
 }
 client, err := sarama.NewClient(strings.Split(hosts, \",\"), config)
 if err != nil {
  log.Fatalf(\"unable to create kafka client: %q\", err)
 }

 if command == \"consumer\" {
  consumer, err := sarama.NewConsumerFromClient(client)
  if err != nil {
   log.Fatal(err)
  }
  defer consumer.Close()
  loopConsumer(consumer, topic, partition)
 } else {
  producer, err := sarama.NewAsyncProducerFromClient(client)
  if err != nil {
   log.Fatal(err)
  }
  defer producer.Close()
  loopProducer(producer, topic, partition)
 }
}

func genTLSConfig(clientcertfile, clientkeyfile, cacertfile string) (*tls.Config, error) {
 // load client cert
 clientcert, err := tls.LoadX509KeyPair(clientcertfile, clientkeyfile)
 if err != nil {
  return nil, err
 }

 // load ca cert pool
 cacert, err := ioutil.ReadFile(cacertfile)
 if err != nil {
  return nil, err
 }
 cacertpool := x509.NewCertPool()
 cacertpool.AppendCertsFromPEM(cacert)

 // generate tlcconfig
 tlsConfig := tls.Config{}
 tlsConfig.RootCAs = cacertpool
 tlsConfig.Certificates = []tls.Certificate{clientcert}
 tlsConfig.BuildNameToCertificate()
 // tlsConfig.InsecureSkipVerify = true // This can be used on test server if domain does not match cert:
 return &tlsConfig, err
}

func loopProducer(producer sarama.AsyncProducer, topic string, partition int) {
 scanner := bufio.NewScanner(os.Stdin)
 fmt.Print(\"> \")
 for scanner.Scan() {
  text := scanner.Text()
  if text == \"\" {
  } else if text == \"exit\" || text == \"quit\" {
   break
  } else {
   producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)}
   log.Printf(\"Produced message: [%s]\\n\",text)
  }
  fmt.Print(\"> \")
 }
}

func loopConsumer(consumer sarama.Consumer, topic string, partition int) {
 partitionConsumer, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest)
 if err != nil {
  log.Println(err)
  return
 }
 defer partitionConsumer.Close()

 for {
  msg := <-partitionConsumer.Messages()
  log.Printf(\"Consumed message: [%s], offset: [%d]\\n\", msg.Value, msg.Offset)
 }
}

编译:

$ go build kafkaclient.go

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

收藏 打印