拦截器
	public class TimeInterceptor implements ProducerInterceptor<String, String>{
		//设置信息
		public void configure(Map<String, ?> configs) {

		}

		//业务逻辑
		public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record){
			return new ProdecerRecord<String, String>(
				record.topic(),
				record.partition(),
				record.key(),
				System.currentTimeMillis() + \"-\" +record.value());
		}

		//应答,发送失败调用
		public void onAcknowledgement(Record data  data, Exception exception){

		}

		//释放资源
		public void close(){

		}
	}
	{
		//配置生产者属性(指定多个参数)
		Properties prop = new Properties();

		//省略不写
		-----
		//消息在发送前必须序列化
		prop.put(\"key.serializer\",\"org.apache.kafka.common.serialization.StringSerializer\");
		prop.put(\"value.serializer\",\"org.apache.kafka.common.serialization.StringSerializer\");

		//拦截器
		ArrayList<String> inList = new ArrayList<String>();
		inList.add(\"com.terry.kafka.TimeInterceptor\");
		prop.put(\"ProducerConfig.In=NTERCEPTOR_CLASS_CONFIG\",inList);

		//2、实例化producer
		KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);

		//3、发送消息
		for(int i = 0; i < 99; i++) {
			producer.send(new ProducerRecord<String, String>(\"test\", i),new CallBack(){
				public void onCompletion(Record data  data, Exception exception) {
					//如果 data不为null,拿到当前的数据偏移量与分区
					if( data != null) {
						Sout( data.topic() + \"------\" +  data.offset() + \"------\" +  data.partition());
					}
				}
			});
		}

		//4、关闭资源
		producer.close();
	}

 

收藏 打印