新浦京81707con > 功能介绍 > 数据处理流程

原标题:数据处理流程

浏览次数:118 时间:2019-11-02

Netty:数据管理流程,netty数据管理流程

Netty作为异步的、事件驱动二个互联网通讯框架,使用它能够扶助我们急忙支付高质量高可信性的互联网服务。

         为了越来越好的选拔Netty来缓和开拓中的难点,学习Netty是很有供给的。

         Netty未来主流有八个本子:Netty3、Netty4、Netty5。这四个本子中,变化最大的要数线程模型了,各版本的线程模型均不一致等。可是有好几是浮动一点都不大的,那正是Channel模型,由此数据管理流程也不会有太大的浮动。所以本篇就来讲一下Netty的数据管理流程,各版本的线程模型会后续说明。

 

Thinking in Netty

By 谢乐

    • Thinking in Netty
      • 写在眼下
      • Netty连忙开头
      • Netty的架构
      • Netty的线程模型
        • 1 服务端线程模型
        • 1 顾客端线程模型
      • Netty的API设计
      • Netty的通讯进程
      • Netty中的设计情势
      • 最后

Channel 模型

         关于Netty Channel的模子,做了八个轻松易行图:

 图片 1

 

一个Channel中归纳贰个Socket、八个ChannelPipeline。八个ChannelPipeline中有三个ChannelSink和八个ChannelHandler。ChannelHandler分为三种:UpstremHandler、DownstreamHandler。

 

甭管是读数据如故写多少都要透过Channel中的ChannelPipeline。读数据的进度是从Socket到ChannelPipeline,由ChannelPipeline交给里面包车型客车UpstreamHandler(大概叫做InBoundHandler)从下到上依次管理。写多少时,由要经过ChannelPipeline里面在DownStreamHandler(只怕是OutBoundHandler)由上到下风流倜傥一拍卖。

 

 

1. 写在前头

Netty的词根为net, 那么大家就已经估量到它与网络关于。官方对Netty的表达为:

Netty是意气风发种异步的依赖事件驱动的Java网络选用框架,可用以创设高质量的合计服务器与顾客端。

像笔者司以至任何网络厂家,比如Alibaba(Alibaba),照片墙等。有过多里边的中间件,分布式框架。而这一个制品或采取的底层超级多都用到了Netty或依照Netty的再付出产品。姑且不用再过多描述Netty是什么样啊,单是由于好奇的由来,作者便想生机勃勃探毕竟,本文不会波及到广大Netty的运用,譬如编解码,公约栈开采,本文是对Netty的平底原理,设计观念的追究。如若要求的话,请参见《Netty in Action>》, 《Netty权威指南》等图书。

Channel的创建

Channel首要分为两种:ServerSocketChannel、SocketSocketChannel。这里不对是或不是是NIO作区分。

ServerSocketChannel的创制在bind时自动实现,SocketChannel的始建由Netty扶持实现。无论是客商端在成立连接时, 依旧服务端选拔到客商端连接时,SocketChannel的创造都以由Netty补助成功

         在开立Channel时,就能够自动调用相应的ChannelPipeline创造器来成立了。在创制ChannelPipeline时,能够由客商配置相关的ChannelHandler。ServerSocketChannel可以由客户定制三个ChannelHandler,SocketChannel则足以由顾客定制多少个ChannelHandler。

 

在各版本中用来定制的措施恐怕是例外的。

 

 

 

2. Netty火速开端

从三个经文的Echo服务器德姆o开头.

package cn.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

/**
 * @author Xie le
 * @date 2016/4/13
 */
public class EchoServer {

    public static void main(String[] args) throws Exception {

        int port = 8080;
        new EchoServer().bind(port);
    }

    public void bind(int port) throws Exception {

        //配置线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {

            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .handler(new LoggingHandler(LogLevel.DEBUG))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {

                            ch.pipeline().addLast(new LineBasedFrameDecoder(128)) // DelimiterBasedFrameDecoder
                                    .addLast(new StringDecoder())
                                    .addLast(new EchoServerHandler());

                        }
                    });

            //绑定端口,同步等待
            ChannelFuture future = bootstrap.bind(port).sync();

            future.channel().closeFuture().sync();

        } finally {

            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }


    private class EchoServerHandler extends ChannelHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("Receive Client : ["   msg   "]");
            ctx.writeAndFlush(msg);
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
}

如上代码演示了Netty作为服务端的示范。运转EchoServer,然后经过Telnet 输入指令:

telnet 127.0.0.1 8080

然后观看Echo行为,即使代码相当的短,但是麻雀虽小。当然这里只是简短后生可畏窥而已,愈来愈多内容还在前边。

ChannelSink

其风流倜傥组件首要见于V3中

ChannelSink的有2个作用:

1、当现身非凡时,通过exceptionCaught向沿着Pipeline上传递ExceptionEvent,(channel.getPipeline().sendUpsteam(new DefaultExceptionEv ent(channel,ex)));

2、当三个Message只怕三个Events沿着Pipeline从上到下施行完全数的ChannelHandler管理后,进入ChannelSink的eventSunk方法。

 

 

上面就说一下Server端、Client端的Sink的功能:

ServerSocketPipelineSink eventSunk:

  • 当创建ServerSocket时,提交Boss任务。
  • 当收到到Socket时,创制SocketChannel,并将多个Worker职分交给Worker Executor。

 

ClientSocketPipelineSink eventSunk:

1)当在Pipeline上飘泊的是ChannelEvent时:

·即使是创设连接的event,将二个worker职责交给worker executor

·假诺是兴趣event,则注册兴轶事件。

2)当在Pipeline上漂泊的是message时:

  就要写的数量放到写队列中,然后调用NioWorker的writeFromUserCode方法。

 

 

3. Netty的架构

图片 2

这是 官互连网的Netty架构图。能够见见,Netty包蕴多少个部分:第一是传输服务,第二是切磋协理,第三是底层主题部分:具备灵活的事件模型,提供通用通讯API,还兼具零拷贝的字节缓冲手艺。

传输服务则含有有Socket和UDP通讯服务,HTTP信道,设想机管道服务。左券援救则含有HTTP和WebSocket共同商议,还会有压缩,大文件传输,GoogleProtobuf合同等。

ChannelHandler

Netty是由事件驱动的框架,任何操作操作都以由事件来驱动的。

ChannelHandler正是多少个伊芙nt Hander(事件管理器),它的法力是:

·管理IO事件(读写),比如读取数据,并解码。

         ·处理兴轶事件。

         ·交给下两个ChannelHandler管理。

 

 

4. Netty的线程模型

Netty的线程模型综合了Reactor线程模型,提起Reactor模型能够参见笔者的上后生可畏篇文章「Java IO模型&NIO」。
Netty的线程模型与文中的三种Reactor线程模型雷同,上面章节大家因而Netty服务端和客户端的线程管理流程图来介绍Netty的线程模型。供给证实的是,使用的本子是Netty 5,部分代码与Netty 4 有出入。

写多稀少两种办法能够接触

 图片 3

图片 4

 

 

 

上述呢,轻便了说了须臾间Channel中各组件的法力以致管理流程,知道那么些东西,已经得以扶植大家领略并使用Netty了。

 

Netty作为异步的、事件驱动一个网络通讯框架,使用它能够扶助大家超快支付高质量高可信性的互连网...

4.1 服务端线程模型

Netty中通用的做法是将服务端监听线程和IO线程分离,相似于Reactor四线程模型,它的办事原理如图:
图片 5
注:上图来源李碧华峰的文章,本节的求学思路源于该文,多谢他的享用。

客商日常经过Web服务器或Main程序来运行Netty服务(肖似于上述示范中的EchoServer代码)。Netty钦定多个线程池组EventLoopGroup 作为主从线程池, 多个EventLoopGroup 即为伊夫ntLoop线程组,负担处理EventLoop的创建和回笼。伊芙ntLoopGroup管理的线程数可以经过构造函数设置,若无安装,私下认可取-Dio.netty.eventLoopThreads,尽管该系统参数也未曾点名,则为可用的CPU内核数 × 2。
bossGroup线程组实际正是Acceptor线程池,担负管理顾客端的TCP连接央求,如若系统唯有一个劳务端端口必要监听,则提出bossGroup线程组线程数设置为1。
workerGroup是当真肩负I/O读写操作的线程组,通过ServerBootstrap的group方法举行安装,用于后续的Channel绑定。
但Netty5引进了PipeLine方式,在原始的模子上作了一些转移。下图是自身的精通:
图片 6
Netty从主NIO伊芙ntLoop线程池分配好二个线程来作为Acceptor线程,用以吸收接纳一连并监听网络事件。但ServerBootstrap在开首化时,会为开创好的Channel上予以一个管道ChannelPipeline,并把worker线程池绑定到该Pipeline上。Acceptor线程感知到一而再事件后,会先成立SocketChannel,然后经过Pipeline在workerGroup中精选三个伊芙ntLoop线程作为IO管理线程,负担网络新闻的读写,并把新建的SocketChannel登记到Selector上,以监听其余事件。
提起此地,供给先验证一下NIO的Server编制程序步骤,我们根据源码正是最棒的评释条件,直接show代码。

    /**
     * 获得一个ServerSocket通道,并对该通道做一些初始化的工作
     * @param port
     * @throws IOException
     */
    public NIOServer initServer(int port) throws IOException{

        //获得一个ServerSocket通道
        final ServerSocketChannel serverChannel = ServerSocketChannel.open();

        //设置通道为非阻塞
        serverChannel.configureBlocking(false);

        //将该通道对应的ServerSocket绑定到port端口
        serverChannel.socket().bind(new InetSocketAddress(port));

        //获得一个通道管理器
        this.selector =  Selector.open();

        /**
         * 将通道管理器和该通道绑定,并为该通道注册服务端接收客户端连接事件(SelectionKey.OP_ACCEPT)
         * 注册该事件后,当该事件到达时,selector.select()会返回,如果没有到达,selector.select()
         * 会一直阻塞
         *
         */
        serverChannel.register(this.selector, SelectionKey.OP_ACCEPT);

        serverChannel.validOps();

        return  this;
    }

NIO的Server编制程序平日会先初叶化四个ServerSocketChannel,表示服务端的SocketChannel,这些相似于BIO中的ServerSocket,其承当在服务端监听特定端口并作央求选择。然后再张开四个Selector,表示多路复用采用器,然后把ServerSocketChannel注册到Selector上,用以在ServerSocketChannel上接纳各类网络事件。
Netty作为NIO框架,虽付与了成千上万世故和便捷性,但其底层完结依旧注重于NIO,大家从Netty的代码去深入分析。
第一、创建ServerSocketChannel与注册
ServerBootstrap启动时,在channel()艺术中钦赐Channel类型为NioServerSocketChannel.class
然后在bind方法中会实例化该Channel,代码援引栈如下:

    bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
    ...

    public B channel(Class<? extends C> channelClass) {
        if (channelClass == null) {
            throw new NullPointerException("channelClass");
        }
        return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
    }
   AbstractBootstrap#initAndRegister
   final ChannelFuture initAndRegister() {
        final Channel channel = channelFactory().newChannel();
        try {
            init(channel);
        }
        ChannelFuture regFuture = group().register(channel);
        ....
    }

其次、ServerBootstrap通过group()和register方法把Channel注册给二个bossGroup中的线程,并以该线程作为Acceptor线程,然后监听服务端。ServerBootstrap选择多个Acceptor线程的首要逻辑如下:

MultithreadEventLoopGroup.java
public ChannelFuture register(Channel channel) {
        return next().register(channel);
}
MultithreadEventExecutorGroup.java
private final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
        @Override
        public EventExecutor next() {
            return children[childIndex.getAndIncrement() & children.length - 1];
        }
}

第三、然后在io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.Channel)方法中把Channel注册到Group中的Acceptor线程,由该Acceptor线程来把Channel注册到Selector上,并作监听。代码调用栈的注重逻辑如下:

io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.Channel)
channel.unsafe().register(this, promise);
protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(((NioEventLoop) eventLoop().unwrap()).selector, 0, this);
                return;
          }
....

第四、最终的登记实际上也是调用的java.nio.channels.spi.AbstractSelectableChannel#register主意,意为由NIO本人的API来完毕注册。

第五、Selector开端在Acceptor线程中在ServerChannel上监听网络事件。由于事先我们钦点的线程为Nio伊夫ntLoop,由此那些职务很有十分大大概在这里类中,大家中间看代码:

io.netty.channel.nio.NioEventLoop#run
 try {
            if (hasTasks()) {
                selectNow();
            } else {
                select(oldWakenUp);
           ....
  }
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
           ....
        try {
            int readyOps = k.readyOps();
            //读事件,接收连接
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
                if (!ch.isOpen()) {
                    // Connection already closed - no need to handle write.
                    return;
                }
            }
            .....
}

第六、管理读或抽出一连事件,调用了unsafe.read()艺术。由于服务端大家钦命准期NioServerSocketChannel, 而NioServerSocketChannel继承了AbstractNioMessageChannel, 所以unsafe的read方法在AbstractNioMessageChannel那么些类中。

private final class NioMessageUnsafe extends AbstractNioUnsafe {
     public void read() {
         ....
         for (;;) {
             int localRead = doReadMessages(readBuf);
         }
     }
}
再看看doReadMessages方法就知道,NioServerSocketChannel创建了SocketChannel,以建立与客户端的通道连接。
protected int doReadMessages(List<Object> buf) throws Exception {
        SocketChannel ch = javaChannel().accept();
        try {
            if (ch != null) {
                buf.add(new NioSocketChannel(this, ch));
                return 1;
            }
 }

第七、把新建的SocketChannel登记到workerGroup线程池中,以初始音信交互。
接触的输入在AbstractNioMessageChannel类的read方法中:

for (int i = 0; i < size; i   ) {
                    pipeline.fireChannelRead(readBuf.get(i));
 }

亟需证实的是,在ServerBootstrap的init办法中把workerGroup线程池绑定到了Pipeline中.

ch.pipeline().addLast(new ServerBootstrapAcceptor(
                        currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));

ServerBootstrapAcceptorchannelRead艺术则把新建的SocketChannel赋予到了childGroup中,
内部childGroup正是事先的workerGroup, 从当中接受四个I/O线程肩负互联网音讯的读写。

...
childGroup.register(child)
...

register会把新建的SocketChannel注册到Selector上,然后早先专门的学业的通讯。假诺有读事件光降,则又会接触processSelectedKey艺术,进而步向数据管理与互为进度。

本文由新浦京81707con发布于功能介绍,转载请注明出处:数据处理流程

关键词: 新浦京81707con 设计模式 Netty 线程模型 通信

上一篇:Haproxy构建高可用负载均衡集群,Keepalived实现高可

下一篇:没有了