拦截器
public class TimeInterceptor implements ProducerInterceptor<String, String>{
//设置信息
public void configure(Map<String, ?> configs) {
}
//业务逻辑
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record){
return new ProdecerRecord<String, String>(
record.topic(),
record.partition(),
record.key(),
System.currentTimeMillis() + \"-\" +record.value());
}
//应答,发送失败调用
public void onAcknowledgement(Record data data, Exception exception){
}
//释放资源
public void close(){
}
}
{
//配置生产者属性(指定多个参数)
Properties prop = new Properties();
//省略不写
-----
//消息在发送前必须序列化
prop.put(\"key.serializer\",\"org.apache.kafka.common.serialization.StringSerializer\");
prop.put(\"value.serializer\",\"org.apache.kafka.common.serialization.StringSerializer\");
//拦截器
ArrayList<String> inList = new ArrayList<String>();
inList.add(\"com.terry.kafka.TimeInterceptor\");
prop.put(\"ProducerConfig.In=NTERCEPTOR_CLASS_CONFIG\",inList);
//2、实例化producer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);
//3、发送消息
for(int i = 0; i < 99; i++) {
producer.send(new ProducerRecord<String, String>(\"test\", i),new CallBack(){
public void onCompletion(Record data data, Exception exception) {
//如果 data不为null,拿到当前的数据偏移量与分区
if( data != null) {
Sout( data.topic() + \"------\" + data.offset() + \"------\" + data.partition());
}
}
});
}
//4、关闭资源
producer.close();
}
继续阅读与本文标签相同的文章
上一篇 :
商用大幕开启,5G下一个爆发点在哪?
下一篇 :
阿里巴巴AI再破世界纪录 阅读理解精准率首超人类
-
用SolidWorks建模一个首尾相连的Z形圆环
2026-05-18栏目: 教程
-
海南台风灾害影响评估三维模拟系统投入业务试运行
2026-05-18栏目: 教程
-
第六届世界互联网大会:实现5G网络全覆盖
2026-05-18栏目: 教程
-
网站不稳定和服务器没有关系么?
2026-05-18栏目: 教程
-
首座装配式3D打印“赵州桥”建成
2026-05-18栏目: 教程
