`
bastengao
  • 浏览: 149371 次
  • 性别: Icon_minigender_1
  • 来自: 西安
社区版块
存档分类
最新评论

netty frame 封包解决方法

    博客分类:
  • java
阅读更多
最近有做毕业设计,需要大量用到网络相关的东西,之前也用socket 写过一些东西,但是感觉直接用socket太费事了。所以就利用现成的框架工具,来做了。找一些东西,最后选择用netty。 netty 是一个异步网络编程框架。他在发送数据和接收数据都是异步的。他提供了许多扩展,利用他可以省好多事。
利用socket或者基于流的传输协议会出现一些问题。netty的官方教程中也提到了。原文是这样的:
引用
In a stream-based transport such as TCP/IP, received data is stored into a socket receive buffer. Unfortunately, the buffer of a stream-based transport is not a queue of packets but a queue of bytes. It means, even if you sent two messages as two independent packets, an operating system will not treat them as two messages but as just a bunch of bytes. Therefore, there is no guarantee that what you read is exactly what your remote peer wrote. For example, let us assume that the TCP/IP stack of an operating system has received three packets:

+-----+-----+-----+
| ABC | DEF | GHI |
+-----+-----+-----+

Because of this general property of a stream-based protocol, there's high chance of reading them in the following fragmented form in your application:

+----+-------+---+---+
| AB | CDEFG | H | I |
+----+-------+---+---+

Therefore, a receiving part, regardless it is server-side or client-side, should defrag the received data into one or more meaningful frames that could be easily understood by the application logic. In case of the example above, the received data should be framed like the following:

+-----+-----+-----+
| ABC | DEF | GHI |
+-----+-----+-----+


小弟不才,原文大概的意思是这样的:
引用
在例如 TCP/IP 这样基于流的传输过程中,接收到的数据是存储在socket接收缓冲区中。不幸的是,基于流的缓冲区里的顺序不是按照包的顺序,而是按照字节的顺序。也就是说,即使你通过两个独立的包发送了两个消息,但操作系统不会把他们当做两个独立的消息,而是字节串。因此,你读取到的是与远端写的一致性是无法保证的。例如,让我们假设有这样的操作系统已经接收到的TCP/IP数据包的栈:

+-----+-----+-----+
| ABC | DEF | GHI |
+-----+-----+-----+

因为基于流协议的通常特性,很可能应用程序以下面的顺序读取数据:

+----+-------+---+---+
| AB | CDEFG | H | I |
+----+-------+---+---+

因此,在接收数据的时候,无论是服务端还是客户端,都应该将接收到的数据组织成有意义的帧(frame),从而更简单的处理应用逻辑。之前的例子,接收到的数据应该被封成这样的帧:

+-----+-----+-----+
| ABC | DEF | GHI |
+-----+-----+-----+



我在开发过程中也确实遇到这样的问题,netty的官方教程也在接下来讲了解决方法。但可能为让读者更深刻的理解netty,用的是比较底层的api。其实netty应对 拆包与组包,已经有解决方法。netty 有一个包 org.jboss.netty.handler.codec.frame ,这个包里面有三种解决方法,都是对 netty 的 codec 进行的扩展。其中 1.DelimiterBasedFrameDecoder 是利用分隔符来进行包的界定;2.FixedLengthFrameDecoder 是利用固定的长度来进行包的界定;3.LengthFieldBasedFrameDecoder 和 LengthFieldPrepender 是利用在发送数据的时候在里面加上头字段,头字段里面包含了包的长度。
这三种方面我用到了其中两种(DelimiterBasedFrameDecoder 和 LengthFieldBasedFrameDecoder)。

举一个 LengthFieldBasedFrameDecoder 的例子
服务器端
ServerBootstrap serverBootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));
        serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            @Override
            public ChannelPipeline getPipeline() throws Exception {
                ChannelPipeline pipeline = org.jboss.netty.channel.Channels.pipeline();
                pipeline.addLast(Constants.LOGGING_HANDLER, new LoggingHandler());
                pipeline.addLast(Constants.UP_FRAME_HANDLER, new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 2, 0, 2));
                pipeline.addLast(Constants.DOWN_FRAME_HANDLER, new LengthFieldPrepender(2, false));
                //add customer handler
                pipeline.addLast("myHandler", new ServerMessageHandler());
                return pipeline;
            }
        });
        serverBootstrap.bind(new InetSocketAddress(9999));


客户端
 ClientBootstrap clientBootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));
        clientBootstrap.setPipelineFactory(new ChannelPipelineFactory() {

            @Override
            public ChannelPipeline getPipeline() throws Exception {
                ChannelPipeline pipeline = org.jboss.netty.channel.Channels.pipeline();
                pipeline.addLast(Constants.LOGGING_HANDLER, new LoggingHandler());
                pipeline.addLast(Constants.UP_FRAME_HANDLER, new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 2, 0, 2));
                pipeline.addLast(Constants.DOWN_FRAME_HANDLER, new LengthFieldPrepender(2, false));
                //add customer handler
                clientMessageHandler = new ClientMessageHandler();
                pipeline.addLast("myHandler", clientMessageHandler);
                return pipeline;
            }
        });

        clientBootstrap.connect(new InetSocketAddress(9999));
2
0
分享到:
评论
1 楼 beykery 2013-12-27  
嗯分析的不错。我们也是采用类似LengthFieldBasedFrameDecoder 方式分包的,不过我们重写了,因为觉得源代码还是处理的太复杂,这个逻辑其实是这样:
   public ByteBuf translateFrame(ByteBuf readBuffer) throws LimitExedeedException, InvalidDataException
    {
        while (readBuffer.isReadable())
        {
            switch (status)
            {
                case STATUS_H:
                    h = readBuffer.readByte();
                    status = STATUS_L;
                    break;
                case STATUS_L:
                    l = readBuffer.readByte();
                    final int blen = (0x0000ff00 & (h <<) | (0x000000ff & l);
                    if (context != null)
                    {
                        if (blen <= 0 || blen > maxFrameSize)
                        {
                            throw new LimitExedeedException("帧长度非法:" + h + "/" + l + ":" + blen);
                        }
                    }
                    incompleteframe = PooledByteBufAllocator.DEFAULT.buffer(blen + 8 + 2);
                    incompleteframe.writeShort(blen);
                    status = STATUS_C;
                    break;
                case STATUS_C:
                    int len = incompleteframe.writableBytes() - 8;
                    len = len < readBuffer.readableBytes() ? len : readBuffer.readableBytes();
                    //incompleteframe.writeBytes(readBuffer, len);
                    if (readBuffer.hasMemoryAddress())
                    {
                        PlatformDependent.copyMemory(readBuffer.memoryAddress() + readBuffer.readerIndex(), incompleteframe.memoryAddress() + incompleteframe.writerIndex(), len);
                    } else if (readBuffer.hasArray())
                    {
                        PlatformDependent.copyMemory(readBuffer.array(), readBuffer.arrayOffset() + readBuffer.readerIndex(), incompleteframe.memoryAddress() + incompleteframe.writerIndex(), len);
                    }
                    incompleteframe.writerIndex(incompleteframe.writerIndex() + len);
                    readBuffer.readerIndex(readBuffer.readerIndex() + len);
                    if ((incompleteframe.writableBytes() - <= 0)
                    {
                        status = STATUS_H;
                        return incompleteframe;
                    }
                    break;
            }
        }
        return null;
    }

相关推荐

Global site tag (gtag.js) - Google Analytics