1、netty的编码和解码,在数据传输的时候,考虑数据安全,数据完整性都是很有必要的。这里主要是介绍netty3和netty5的编解码方式。其实从StringEncoder和StringDecoder中也可以获取源码的编解码规则。然后改变成自己的编解码规则也是可以的。

  2、netty3和netty5的编解码方式还是存在一定差别的。个人感觉netty5来的更加实用和方便。

  3、netty3的编解码规则

  1)数据编码规则(我这里只是用于显示,数据规则很简单)

  包头+模块+数据(请求编解码)

  包头+模块+状态+数据(响应编解码)

  2)目录

  

  3)请求和相应对象

package com.troy.data.domain;//请求数据public class Request {    //模块类型    private int model;    //数据    private byte[] data;    public int getModel() {        return model;    }    public void setModel(int model) {        this.model = model;    }    public byte[] getData() {        return data;    }    public void setData(byte[] data) {        this.data = data;    }    @Override    public String toString() {        return "Request{" +                "model=" + model +                ", data=" + new String(data) +                '}';    }}
package com.troy.data.domain;import java.util.Arrays;//响应数据public class Response {    //模块类型    private int model;    //状态码    private int status;    //数据    private byte[] data;    public int getModel() {        return model;    }    public void setModel(int model) {        this.model = model;    }    public int getStatus() {        return status;    }    public void setStatus(int status) {        this.status = status;    }    public byte[] getData() {        return data;    }    public void setData(byte[] data) {        this.data = data;    }    @Override    public String toString() {        return "Response{" +                "model=" + model +                ", status=" + status +                ", data=" + new String(data) +                '}';    }}

  4)常量(这里的包头,因为是固定的就写了一个常量)

package com.troy.data.constant;//常量public class ConsantUtil {    //固定常量用于数据拼接,确认    public static final int PACKAGE_HEADER = -32523523;}

  5)请求编解码

package com.troy.data.codec;import com.troy.data.constant.ConsantUtil;import com.troy.data.domain.Request;import org.jboss.netty.buffer.ChannelBuffer;import org.jboss.netty.buffer.ChannelBuffers;import org.jboss.netty.channel.Channel;import org.jboss.netty.channel.ChannelHandlerContext;import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;public class RequestEncode extends OneToOneEncoder {    protected   encode(ChannelHandlerContext channelHandlerContext, Channel channel,   o) throws Exception {        Request request = (Request) o;        ChannelBuffer channelBuffer = ChannelBuffers.dynamicBuffer();        channelBuffer.writeInt(ConsantUtil.PACKAGE_HEADER);        channelBuffer.writeInt(ConsantUtil.PACKAGE_HEADER);        channelBuffer.writeBytes(request.getData());        return channelBuffer;    }}
package com.troy.data.codec;import com.troy.data.constant.ConsantUtil;import com.troy.data.domain.Request;import org.jboss.netty.buffer.ChannelBuffer;import org.jboss.netty.channel.Channel;import org.jboss.netty.channel.ChannelHandlerContext;import org.jboss.netty.handler.codec.oneone.OneToOneDecoder;//请求数据解码public class RequestDecode extends OneToOneDecoder {    protected   decode(ChannelHandlerContext channelHandlerContext, Channel channel,   o) throws Exception {        ChannelBuffer channelBuffer = (ChannelBuffer) o;        if (ConsantUtil.PACKAGE_HEADER == channelBuffer.readInt()) {            Request request = new Request();            request.setModel(channelBuffer.readInt());            byte[] bytes = new byte[channelBuffer.readableBytes()];            channelBuffer.readBytes(bytes);            request.setData(bytes);            return request;        }        return null;    }}

  6)响应编解码

package com.troy.data.codec;import com.troy.data.constant.ConsantUtil;import com.troy.data.domain.Response;import org.jboss.netty.buffer.ChannelBuffer;import org.jboss.netty.buffer.ChannelBuffers;import org.jboss.netty.channel.Channel;import org.jboss.netty.channel.ChannelHandlerContext;import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;//响应编码器public class ResponseEncode extends OneToOneEncoder {    protected   encode(ChannelHandlerContext channelHandlerContext, Channel channel,   o) throws Exception {        Response response = (Response) o;        ChannelBuffer channelBuffer = ChannelBuffers.dynamicBuffer();        channelBuffer.writeInt(ConsantUtil.PACKAGE_HEADER);        channelBuffer.writeInt(response.getModel());        channelBuffer.writeInt(response.getStatus());        channelBuffer.writeBytes(response.getData());        return channelBuffer;    }}
package com.troy.data.codec;import com.troy.data.constant.ConsantUtil;import com.troy.data.domain.Response;import org.jboss.netty.buffer.ChannelBuffer;import org.jboss.netty.channel.Channel;import org.jboss.netty.channel.ChannelHandlerContext;import org.jboss.netty.handler.codec.oneone.OneToOneDecoder;//响应解码public class ResponseDecode extends OneToOneDecoder{    protected   decode(ChannelHandlerContext channelHandlerContext, Channel channel,   o) throws Exception {            ChannelBuffer channelBuffer = (ChannelBuffer) o;        if (ConsantUtil.PACKAGE_HEADER == channelBuffer.readInt()) {            Response response = new Response();            response.setModel(channelBuffer.readInt());            response.setStatus(channelBuffer.readInt());            byte[] bytes = new byte[channelBuffer.readableBytes()];            channelBuffer.readBytes(bytes);            response.setData(bytes);            return response;        }        return null;    }}

  7)设置对应的管道编解码就可以了

  a、客户端

     //设置管道工厂        clientBootstrap.setPipelineFactory(new ChannelPipelineFactory() {            public ChannelPipeline getPipeline() throws Exception {                ChannelPipeline channelPipeline = Channels.pipeline();                channelPipeline.addLast("decode",new RequestEncode());                channelPipeline.addLast("encode",new ResponseDecode());                channelPipeline.addLast("client",new ClientHandler());                return channelPipeline;            }        });
  @Override    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {        Response response = (Response) e.getMessage();        System.out.println(response.toString());        super.messageReceived(ctx, e);    }

  b、服务端

        //设置管道流        serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() {            public ChannelPipeline getPipeline() throws Exception {                ChannelPipeline channelPipeline = Channels.pipeline();                //添加处理方式                channelPipeline.addLast("idle",new IdleStateHandler(new HashedWheelTimer(),60,60,60));                channelPipeline.addLast("decode",new RequestDecode());                channelPipeline.addLast("encode",new ResponseEncode());                channelPipeline.addLast("server",new ServerHandler());                return channelPipeline;            }        });        
  @Override    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {        Request request = (Request) e.getMessage();        System.out.println("client:"+request.toString());        Response response = new Response();        response.setModel(1);        response.setStatus(1);        response.setData("hello client".getBytes());        ctx.getChannel().write(response);        super.messageReceived(ctx, e);    }

  4、netty5的编解码规则

  1)数据结构、目录结构、对象、常量都是一样。

  2)编解码的编写方式有些不一样

  a、请求编解码

package com.troy.data.codec;import com.troy.data.constant.ConsantUtil;import com.troy.data.domain.Request;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.MessageToMessageEncoder;import java.util.List;@ChannelHandler.Sharablepublic class RequestEncode extends MessageToMessageEncoder<Request> {    protected void encode(ChannelHandlerContext channelHandlerContext, Request request, List< > list) throws Exception {        ByteBuf byteBuf = Unpooled.buffer();        byteBuf.writeInt(ConsantUtil.PACKAGE_HEADER);        byteBuf.writeInt(request.getModel());        byteBuf.writeBytes(request.getData());        list.add(byteBuf);    }}
package com.troy.data.codec;import com.troy.data.constant.ConsantUtil;import com.troy.data.domain.Request;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.MessageToMessageDecoder;import java.util.List;//请求数据解码@ChannelHandler.Sharablepublic class RequestDecode extends MessageToMessageDecoder<ByteBuf>{    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List< > list) throws Exception {        if (ConsantUtil.PACKAGE_HEADER == byteBuf.readInt()) {            //当数据超过指定值的时候跳过这部分数据            if (byteBuf.readableBytes() > 2048) {                byteBuf.skipBytes(byteBuf.readableBytes());            }            //一个字节一个字节的读取,知道读取到包头            while(true) {                byteBuf.markReaderIndex();                if (ConsantUtil.PACKAGE_HEADER == byteBuf.readInt()) {                    break;                }                byteBuf.resetReaderIndex();                byteBuf.readByte();            }            Request request = new Request();            request.setModel(byteBuf.readInt());            byte[] bytes = new byte[byteBuf.readableBytes()];            byteBuf.readBytes(bytes);            request.setData(bytes);            list.add(request);        }    }}

  b、响应编解码

package com.troy.data.codec;import com.troy.data.constant.ConsantUtil;import com.troy.data.domain.Response;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.MessageToMessageEncoder;import java.util.List;//响应编码器@ChannelHandler.Sharablepublic class ResponseEncode extends MessageToMessageEncoder<Response> {    protected void encode(ChannelHandlerContext channelHandlerContext, Response response, List< > list) throws Exception {        ByteBuf byteBuf = Unpooled.buffer();        byteBuf.writeInt(ConsantUtil.PACKAGE_HEADER);        byteBuf.writeInt(response.getModel());        byteBuf.writeInt(response.getStatus());        byteBuf.writeBytes(response.getData());        list.add(byteBuf);    }}
package com.troy.data.codec;import com.troy.data.constant.ConsantUtil;import com.troy.data.domain.Response;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.MessageToMessageDecoder;import java.util.List;//响应解码@ChannelHandler.Sharablepublic class ResponseDecode extends MessageToMessageDecoder<ByteBuf>{    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List< > list) throws Exception {        if (ConsantUtil.PACKAGE_HEADER == byteBuf.readInt()) {            //当数据超过指定值的时候跳过这部分数据            if (byteBuf.readableBytes() > 2048) {                byteBuf.skipBytes(byteBuf.readableBytes());            }            //一个字节一个字节的读取,知道读取到包头            while(true) {                byteBuf.markReaderIndex();                if (ConsantUtil.PACKAGE_HEADER == byteBuf.readInt()) {                    break;                }                byteBuf.resetReaderIndex();                byteBuf.readByte();            }            Response response = new Response();            response.setModel(byteBuf.readInt());            response.setStatus(byteBuf.readInt());            byte[] bytes = new byte[byteBuf.readableBytes()];            byteBuf.readBytes(bytes);            response.setData(bytes);            list.add(response);        }    }}

  3)处理上面基本上都是一样的。

  5、netty的编解码,主要目的就是处理通讯问题,对数据进行自定义处理!

   6、源码下载:https://pan.baidu.com/s/1nvUnmEt

收藏 打印