consumer
	psvm{
		//1.配置生产者属性(指定多个参数)
		Properties prop = new Properties();

		//配置属性
		//服务器地址指定
		prop.put(\"bootstrap.server\",\"bigdata:9092\");
		//配置消费者组
		prop.put(\"group.id\",\"g1\");
		//配置是否自动确认偏移量
		prop.put(\"enable.auto.commit\",\"true\");
		//序列化
		prop.put(\"key.deserializer\",\"org.apache.kafka.common.serialization.StringDeserializer\");
		prop.put(\"value.deserializer\",\"org.apache.kafka.common.serialization.StringDeserializer\");

		//2.实例化消费者
		final KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);

		//4.释放资源 线程安全
		Runtime.getRuntime().addShutdownHook(new Thread(new Runnable(){
			public void run(){
				if(consumer != null){
					consumer.close();
				}
			}
		}));

		//订阅消息主题
		consumer.subscribe(Arrays.asList(\"test\");)

		//3.拉消息
		while(true){
			ConsumerRecords<String, String> records = consumer.poll(1000);
			//遍历
			for(ConsumerRecord<String, String> r : records) {
				sout(r.topic()+ \"------\" + r.value());
			}
		}
	}

 

收藏 打印