需要添加的依赖

<dependency>
      <groupId>com.alibaba.jstorm</groupId>
      <artifactId>jstorm-core</artifactId>
      <version>2.1.1</version>
      <!--<scope>provided</scope>-->
      <exclusions>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-nop</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-jdk14</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.8.2</artifactId>
      <version>0.8.1</version>
      <exclusions>
        <exclusion>
          <artifactId>jmxtools</artifactId>
          <groupId>com.sun.jdmk</groupId>
        </exclusion>
        <exclusion>
          <artifactId>jmxri</artifactId>
          <groupId>com.sun.jmx</groupId>
        </exclusion>
        <exclusion>
          <artifactId>jms</artifactId>
          <groupId>javax.jms</groupId>
        </exclusion>
        <exclusion>
          <groupId>org.apache.zookeeper</groupId>
          <artifactId>zookeeper</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-api</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
  </dependencies>

<build>
    <plugins>
      <plugin>
        <artifactId>maven-assembly-plugin</artifactId>
        <configuration>
          <de orRefs>
            <de orRef>jar-with-dependencies</de orRef>
          </de orRefs>
          <archive>
            <manifest>
              <mainClass>cn.itcast.bigdata.hadoop.mapreduce.wordcount.WordCount</mainClass>
            </manifest>
          </archive>
        </configuration>
        <executions>
          <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
              <goal>single</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <configuration>
          <source>1.7</source>
          <target>1.7</target>
        </configuration>
      </plugin>
    </plugins>
  </build>

生产者

package com.thp.bigdata.kafka;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import java.util.Properties;
import java.util.UUID;

/**
 * 这是一个简单的Kafka producer代码
 * 包含两个功能:
 * 1、数据发送
 * 2、数据按照自定义的partition策略进行发送
 *
 *
 * KafkaSpout的类
 */
public class KafkaProducerSimple {
    public static void main(String[] args) {
        /**
         * 1、指定当前kafka producer生产的数据的目的地
         *  创建topic可以输入以下命令,在kafka集群的任一节点进行创建。
         *  bin/kafka-topics.sh --create --zookeeper zk01:2181 --replication-factor 1 --partitions 1 --topic test
         */
        String TOPIC = \"orderMq\";
        /**
         * 2、读取配置文件
         */
        Properties props = new Properties();
        /*
         * key.serializer.class默认为serializer.class
		 */
        props.put(\"serializer.class\", \"kafka.serializer.StringEncoder\");
        /*
		 * kafka broker对应的主机,格式为host1:port1,host2:port2
		 */
        props.put(\" data.broker.list\", \"kafka01:9092,kafka02:9092,kafka03:9092\");
        /*
         * request.required.acks,设置发送数据是否需要服务端的反馈,有三个值0,1,-1
		 * 0,意味着producer永远不会等待一个来自broker的ack,这就是0.7版本的行为。
		 * 这个选项提供了最低的延迟,但是持久化的保证是最弱的,当server挂掉的时候会丢失一些数据。
		 * 1,意味着在leader replica已经接收到数据后,producer会得到一个ack。
		 * 这个选项提供了更好的持久性,因为在server确认请求成功处理后,client才会返回。
		 * 如果刚写到leader上,还没来得及复制leader就挂了,那么消息才可能会丢失。
		 * -1,意味着在所有的ISR都接收到数据后,producer才得到一个ack。
		 * 这个选项提供了最好的持久性,只要还有一个replica存活,那么数据就不会丢失
		 */
        props.put(\"request.required.acks\", \"1\");
        /*
		 * 可选配置,如果不配置,则使用默认的partitioner partitioner.class
		 * 默认值:kafka.producer.DefaultPartitioner
		 * 用来把消息分到各个partition中,默认行为是对key进行hash。
		 */
        props.put(\"partitioner.class\", \"com.thp.bigdata.kafka.MyLogPartitioner\");
//        props.put(\"partitioner.class\", \"kafka.producer.DefaultPartitioner\");
        /**
         * 3、通过配置文件,创建生产者
         */
        Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(props));
        /**
         * 4、通过for循环生产数据
         */
        for (int messageNo = 1; messageNo < 100000; messageNo++) {
//            String messageStr = new String(messageNo + \"注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,\" +
//                    \"注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发\" +
//                    \"注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发\" +
//                    \"注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发\" +
//                    \"注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发\" +
//                    \"注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发\" +
//                    \"注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发\" +
//                    \"注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发\" +
//                    \"用来配合自定义的MyLogPartitioner进行数据分发\");

            /**
             * 5、调用producer的send方法发送数据
             * 注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发
             */
            producer.send(new KeyedMessage<String, String>(TOPIC, messageNo + \"\", \"appid\" + UUID.randomUUID() + \"itcast\"));
        }
    }
}

自定义的Partitioner

package com.thp.bigdata.kafka;

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
import org.apache.log4j.Logger;


public class MyLogPartitioner implements Partitioner {
    private static Logger logger = Logger.getLogger(MyLogPartitioner.class);

    public MyLogPartitioner(VerifiableProperties props) {
    }

    public int partition(  obj, int numPartitions) {
        return Integer.parseInt(obj.toString())%numPartitions;
//        return 1;
    }

}

消费者

package com.thp.bigdata.kafka;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAnd data;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class KafkaConsumerSimple implements Runnable {
    public String  ;
    public KafkaStream<byte[], byte[]> stream;
    public KafkaConsumerSimple(String  , KafkaStream<byte[], byte[]> stream) {
        this.  =  ;
        this.stream = stream;
    }
    @Override
    public void run() {
        System.out.println(\"开始运行 \" +  );
        // ConsumerIterator<byte[], byte[]> it = stream.iterator();
        // stream.iterator();
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        /**
         * 不停地从stream读取新到来的消息,在等待新的消息时,hasNext()会阻塞
         * 如果调用 `ConsumerConnector#shutdown`,那么`hasNext`会返回false
         * */
        while (it.hasNext()) {
            MessageAnd data<byte[], byte[]> data = it.next();
            String topic = data.topic();
            int partition = data.partition();
            long offset = data.offset();
            // 数据本身的内容
            String msg = new String(data.message());
            System.out.println(String.format(
                    \"Consumer: [%s],  Topic: [%s],  PartitionId: [%d], Offset: [%d], msg: [%s]\",
                     , topic, partition, offset, msg));
        }
        System.out.println(String.format(\"Consumer: [%s] exiting ...\",  ));
    }

    public static void main(String[] args) throws Exception{
        Properties props = new Properties();
        props.put(\"group.id\", \"dashujujiagoushi\");
        props.put(\"zookeeper.connect\", \"zk01:2181,zk02:2181,zk03:2181\");
        props.put(\"auto.offset.reset\", \"largest\");
        props.put(\"auto.commit.interval.ms\", \"1000\");
        props.put(\"partition.assignment.strategy\", \"roundrobin\");
        ConsumerConfig config = new ConsumerConfig(props);
        String topic = \"orderMq\";
        // String topic2 = \"paymentMq\";
        //只要ConsumerConnector还在的话,consumer会一直等待新消息,不会自己退出
        ConsumerConnector consumerConn = Consumer.createJavaConsumerConnector(config);
        //定义一个map
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, 4);
        //Map<String, List<KafkaStream<byte[], byte[]>> 中String是topic, List<KafkaStream<byte[], byte[]>是对应的流
        Map<String, List<KafkaStream<byte[], byte[]>>> topicStreamsMap = consumerConn.createMessageStreams(topicCountMap);
        //取出 `kafkaTest` 对应的 streams
        List<KafkaStream<byte[], byte[]>> streams = topicStreamsMap.get(topic);
        //创建一个容量为4的线程池
        ExecutorService executor = Executors.newFixedThreadPool(4);
        //创建20个consumer threads
        for (int i = 0; i < streams.size(); i++)
            executor.execute(new KafkaConsumerSimple(\"消费者\" + (i + 1), streams.get(i)));
    }
}

\"在这里插入图片描述\"

\"在这里插入图片描述\"

收藏 打印