1、netty的编码和解码,在数据传输的时候,考虑数据安全,数据完整性都是很有必要的。这里主要是介绍netty3和netty5的编解码方式。其实从StringEncoder和StringDecoder中也可以获取源码的编解码规则。然后改变成自己的编解码规则也是可以的。
2、netty3和netty5的编解码方式还是存在一定差别的。个人感觉netty5来的更加实用和方便。
3、netty3的编解码规则
1)数据编码规则(我这里只是用于显示,数据规则很简单)
包头+模块+数据(请求编解码)
包头+模块+状态+数据(响应编解码)
2)目录

3)请求和相应对象
package com.troy.data.domain;//请求数据public class Request { //模块类型 private int model; //数据 private byte[] data; public int getModel() { return model; } public void setModel(int model) { this.model = model; } public byte[] getData() { return data; } public void setData(byte[] data) { this.data = data; } @Override public String toString() { return "Request{" + "model=" + model + ", data=" + new String(data) + '}'; }}
package com.troy.data.domain;import java.util.Arrays;//响应数据public class Response { //模块类型 private int model; //状态码 private int status; //数据 private byte[] data; public int getModel() { return model; } public void setModel(int model) { this.model = model; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public byte[] getData() { return data; } public void setData(byte[] data) { this.data = data; } @Override public String toString() { return "Response{" + "model=" + model + ", status=" + status + ", data=" + new String(data) + '}'; }}
4)常量(这里的包头,因为是固定的就写了一个常量)
package com.troy.data.constant;//常量public class ConsantUtil { //固定常量用于数据拼接,确认 public static final int PACKAGE_HEADER = -32523523;}
5)请求编解码
package com.troy.data.codec;import com.troy.data.constant.ConsantUtil;import com.troy.data.domain.Request;import org.jboss.netty.buffer.ChannelBuffer;import org.jboss.netty.buffer.ChannelBuffers;import org.jboss.netty.channel.Channel;import org.jboss.netty.channel.ChannelHandlerContext;import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;public class RequestEncode extends OneToOneEncoder { protected encode(ChannelHandlerContext channelHandlerContext, Channel channel, o) throws Exception { Request request = (Request) o; ChannelBuffer channelBuffer = ChannelBuffers.dynamicBuffer(); channelBuffer.writeInt(ConsantUtil.PACKAGE_HEADER); channelBuffer.writeInt(ConsantUtil.PACKAGE_HEADER); channelBuffer.writeBytes(request.getData()); return channelBuffer; }}
package com.troy.data.codec;import com.troy.data.constant.ConsantUtil;import com.troy.data.domain.Request;import org.jboss.netty.buffer.ChannelBuffer;import org.jboss.netty.channel.Channel;import org.jboss.netty.channel.ChannelHandlerContext;import org.jboss.netty.handler.codec.oneone.OneToOneDecoder;//请求数据解码public class RequestDecode extends OneToOneDecoder { protected decode(ChannelHandlerContext channelHandlerContext, Channel channel, o) throws Exception { ChannelBuffer channelBuffer = (ChannelBuffer) o; if (ConsantUtil.PACKAGE_HEADER == channelBuffer.readInt()) { Request request = new Request(); request.setModel(channelBuffer.readInt()); byte[] bytes = new byte[channelBuffer.readableBytes()]; channelBuffer.readBytes(bytes); request.setData(bytes); return request; } return null; }}
6)响应编解码
package com.troy.data.codec;import com.troy.data.constant.ConsantUtil;import com.troy.data.domain.Response;import org.jboss.netty.buffer.ChannelBuffer;import org.jboss.netty.buffer.ChannelBuffers;import org.jboss.netty.channel.Channel;import org.jboss.netty.channel.ChannelHandlerContext;import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;//响应编码器public class ResponseEncode extends OneToOneEncoder { protected encode(ChannelHandlerContext channelHandlerContext, Channel channel, o) throws Exception { Response response = (Response) o; ChannelBuffer channelBuffer = ChannelBuffers.dynamicBuffer(); channelBuffer.writeInt(ConsantUtil.PACKAGE_HEADER); channelBuffer.writeInt(response.getModel()); channelBuffer.writeInt(response.getStatus()); channelBuffer.writeBytes(response.getData()); return channelBuffer; }}
package com.troy.data.codec;import com.troy.data.constant.ConsantUtil;import com.troy.data.domain.Response;import org.jboss.netty.buffer.ChannelBuffer;import org.jboss.netty.channel.Channel;import org.jboss.netty.channel.ChannelHandlerContext;import org.jboss.netty.handler.codec.oneone.OneToOneDecoder;//响应解码public class ResponseDecode extends OneToOneDecoder{ protected decode(ChannelHandlerContext channelHandlerContext, Channel channel, o) throws Exception { ChannelBuffer channelBuffer = (ChannelBuffer) o; if (ConsantUtil.PACKAGE_HEADER == channelBuffer.readInt()) { Response response = new Response(); response.setModel(channelBuffer.readInt()); response.setStatus(channelBuffer.readInt()); byte[] bytes = new byte[channelBuffer.readableBytes()]; channelBuffer.readBytes(bytes); response.setData(bytes); return response; } return null; }}
7)设置对应的管道编解码就可以了
a、客户端
//设置管道工厂 clientBootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() throws Exception { ChannelPipeline channelPipeline = Channels.pipeline(); channelPipeline.addLast("decode",new RequestEncode()); channelPipeline.addLast("encode",new ResponseDecode()); channelPipeline.addLast("client",new ClientHandler()); return channelPipeline; } });
@Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { Response response = (Response) e.getMessage(); System.out.println(response.toString()); super.messageReceived(ctx, e); }
b、服务端
//设置管道流 serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() throws Exception { ChannelPipeline channelPipeline = Channels.pipeline(); //添加处理方式 channelPipeline.addLast("idle",new IdleStateHandler(new HashedWheelTimer(),60,60,60)); channelPipeline.addLast("decode",new RequestDecode()); channelPipeline.addLast("encode",new ResponseEncode()); channelPipeline.addLast("server",new ServerHandler()); return channelPipeline; } });
@Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { Request request = (Request) e.getMessage(); System.out.println("client:"+request.toString()); Response response = new Response(); response.setModel(1); response.setStatus(1); response.setData("hello client".getBytes()); ctx.getChannel().write(response); super.messageReceived(ctx, e); }
4、netty5的编解码规则
1)数据结构、目录结构、对象、常量都是一样。
2)编解码的编写方式有些不一样
a、请求编解码
package com.troy.data.codec;import com.troy.data.constant.ConsantUtil;import com.troy.data.domain.Request;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.MessageToMessageEncoder;import java.util.List;@ChannelHandler.Sharablepublic class RequestEncode extends MessageToMessageEncoder<Request> { protected void encode(ChannelHandlerContext channelHandlerContext, Request request, List< > list) throws Exception { ByteBuf byteBuf = Unpooled.buffer(); byteBuf.writeInt(ConsantUtil.PACKAGE_HEADER); byteBuf.writeInt(request.getModel()); byteBuf.writeBytes(request.getData()); list.add(byteBuf); }}
package com.troy.data.codec;import com.troy.data.constant.ConsantUtil;import com.troy.data.domain.Request;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.MessageToMessageDecoder;import java.util.List;//请求数据解码@ChannelHandler.Sharablepublic class RequestDecode extends MessageToMessageDecoder<ByteBuf>{ protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List< > list) throws Exception { if (ConsantUtil.PACKAGE_HEADER == byteBuf.readInt()) { //当数据超过指定值的时候跳过这部分数据 if (byteBuf.readableBytes() > 2048) { byteBuf.skipBytes(byteBuf.readableBytes()); } //一个字节一个字节的读取,知道读取到包头 while(true) { byteBuf.markReaderIndex(); if (ConsantUtil.PACKAGE_HEADER == byteBuf.readInt()) { break; } byteBuf.resetReaderIndex(); byteBuf.readByte(); } Request request = new Request(); request.setModel(byteBuf.readInt()); byte[] bytes = new byte[byteBuf.readableBytes()]; byteBuf.readBytes(bytes); request.setData(bytes); list.add(request); } }}
b、响应编解码
package com.troy.data.codec;import com.troy.data.constant.ConsantUtil;import com.troy.data.domain.Response;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.MessageToMessageEncoder;import java.util.List;//响应编码器@ChannelHandler.Sharablepublic class ResponseEncode extends MessageToMessageEncoder<Response> { protected void encode(ChannelHandlerContext channelHandlerContext, Response response, List< > list) throws Exception { ByteBuf byteBuf = Unpooled.buffer(); byteBuf.writeInt(ConsantUtil.PACKAGE_HEADER); byteBuf.writeInt(response.getModel()); byteBuf.writeInt(response.getStatus()); byteBuf.writeBytes(response.getData()); list.add(byteBuf); }}
package com.troy.data.codec;import com.troy.data.constant.ConsantUtil;import com.troy.data.domain.Response;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.MessageToMessageDecoder;import java.util.List;//响应解码@ChannelHandler.Sharablepublic class ResponseDecode extends MessageToMessageDecoder<ByteBuf>{ protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List< > list) throws Exception { if (ConsantUtil.PACKAGE_HEADER == byteBuf.readInt()) { //当数据超过指定值的时候跳过这部分数据 if (byteBuf.readableBytes() > 2048) { byteBuf.skipBytes(byteBuf.readableBytes()); } //一个字节一个字节的读取,知道读取到包头 while(true) { byteBuf.markReaderIndex(); if (ConsantUtil.PACKAGE_HEADER == byteBuf.readInt()) { break; } byteBuf.resetReaderIndex(); byteBuf.readByte(); } Response response = new Response(); response.setModel(byteBuf.readInt()); response.setStatus(byteBuf.readInt()); byte[] bytes = new byte[byteBuf.readableBytes()]; byteBuf.readBytes(bytes); response.setData(bytes); list.add(response); } }}
3)处理上面基本上都是一样的。
5、netty的编解码,主要目的就是处理通讯问题,对数据进行自定义处理!
继续阅读与本文标签相同的文章
上一篇 :
netty之心跳机制
下一篇 :
netty之管道处理流程
-
Java之解压流(ZipInputStream)
2026-05-26栏目: 教程
-
jetty之嵌入式开发
2026-05-26栏目: 教程
-
java之压缩流(ZipOutputStream)
2026-05-26栏目: 教程
-
java之JMS
2026-05-26栏目: 教程
-
activemq的启动方式
2026-05-26栏目: 教程
