宜人贷蜂巢API网关技术解密之Netty使用实践

2021-02-08    分类: 网站建设

宜人贷蜂巢团队,由Michael创立于2013年,通过使用互联网科技手段助力金融生态和谐健康发展。自成立起一直致力于多维度数据闭环平台建设。目前团队规模超过百人,涵盖征信、

图1 - API网关项目框架

图中描绘了API网关系统的处理流程,以及与服务注册发现、日志分析、报警系统、各类爬虫的关系。其中API网关系统接收请求,对请求进行编解码、鉴权、限流、加解密,再基于Eureka服务注册发现模块,将请求发送到有效的服务节点上;网关及抓取系统的日志,会被收集到elk平台中,做业务分析及报警处理。

二、BIO vs NIO

API网关承载数倍于爬虫的流量,提升服务器的并发处理能力、缩短系统的响应时间,通信模型的选择是至关重要的,是选择BIO,还是NIO?

1. Streamvs Buffer & 阻塞 vs 非阻塞

BIO是面向流的,io的读写,每次只能处理一个或者多个bytes,如果数据没有读写完成,线程将一直等待于此,而不能暂时跳过io或者等待io读写完成异步通知,线程滞留在io读写上,不能充分利用机器有限的线程资源,造成server的吞吐量较低,见图2。而NIO与此不同,面向Buffer,线程不需要滞留在io读写上,采用操作系统的epoll模式,在io数据准备好了,才由线程来处理,见图3。

NioEvenrLoopGroup的创建,具体执行过程是执行类MultithreadEventExecutorGroup的构造方法:

  1. /**  
  2.  * Create a new instance.  
  3.  *  
  4.  * @param nThreads          the number of threads that will be used by this instance.  
  5.  * @param executor          the Executor to use, or {@code null} if the default should be used.  
  6.  * @param chooserFactory    the {@link EventExecutorChooserFactory} to use.  
  7.  * @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call  
  8.  */  
  9. protected MultithreadEventExecutorGroup(int nThreads, Executor executor,  
  10.                                         EventExecutorChooserFactory chooserFactory, Object... args) {  
  11.     if (nThreads <= 0) {  
  12.         throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));  
  13.     }  
  14.     if (executor == null) {  
  15.         executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());  
  16.     }  
  17.     children = new EventExecutor[nThreads];  
  18.     for (int i = 0; i < nThreads; i ++) {  
  19.         boolean success = false;  
  20.         try {  
  21.             children[i] = newChild(executor, args);  
  22.             success = true;  
  23.         } catch (Exception e) {   
  24.             throw new IllegalStateException("failed to create a child event loop", e);  
  25.         } finally {  
  26.             if (!success) {  
  27.                 for (int j = 0; j < i; j ++) {  
  28.                     children[j].shutdownGracefully();  
  29.                 }  
  30.                 for (int j = 0; j < i; j ++) { 
  31.                      EventExecutor e = children[j]; 
  32.                      try { 
  33.                          while (!e.isTerminated()) {  
  34.                             e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);  
  35.                         }  
  36.                     } catch (InterruptedException interrupted) {  
  37.                         // Let the caller handle the interruption.  
  38.                         Thread.currentThread().interrupt();  
  39.                         break;  
  40.                     }  
  41.                 }  
  42.             }  
  43.         }  
  44.     }  
  45.     chooser = chooserFactory.newChooser(children);  
  46.     final FutureListener<Object> terminationListener = new FutureListener<Object>() {  
  47.         @Override  
  48.         public void operationComplete(Future<Object> future) throws Exception {  
  49.             if (terminatedChildren.incrementAndGet() == children.length) {  
  50.                 terminationFuture.setSuccess(null);  
  51.             }  
  52.         }  
  53.     };  
  54.     for (EventExecutor e: children) {  
  55.         e.terminationFuture().addListener(terminationListener);  
  56.     }  
  57.     Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);  
  58.     Collections.addAll(childrenSet, children);  
  59.     readonlyChildren = Collections.unmodifiableSet(childrenSet);  
  60. 其中,创建细节见下:

    • 线程池中的线程数nThreads必须大于0;
    • 如果executor为null,创建默认executor,executor用于创建线程(newChild方法使用executor对象);
    • 依次创建线程池中的每一个线程即NioEventLoop,如果其中有一个创建失败,将关闭之前创建的所有线程;
    • chooser为线程池选择器,用来选择下一个EventExecutor,可以理解为,用来选择一个线程来执行task。

    chooser的创建细节,见下:

    DefaultEventExecutorChooserFactory根据线程数创建具体的EventExecutorChooser,线程数如果等于2^n,可使用按位与替代取模运算,节省cpu的计算资源,见源码:

    1. @SuppressWarnings("unchecked")  
    2. @Override  
    3. public EventExecutorChooser newChooser(EventExecutor[] executors) {  
    4.     if (isPowerOfTwo(executors.length)) {  
    5.         return new PowerOfTowEventExecutorChooser(executors);  
    6.     } else {  
    7.         return new GenericEventExecutorChooser(executors);  
    8.     }  
    9. }   
    10.     private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser {  
    11.         private final AtomicInteger idx = new AtomicInteger();  
    12.         private final EventExecutor[] executors;   
    13.  
    14.         PowerOfTowEventExecutorChooser(EventExecutor[] executors) {  
    15.             this.executors = executors;  
    16.         }   
    17.  
    18.         @Override  
    19.         public EventExecutor next() {  
    20.             return executors[idx.getAndIncrement() & executors.length - 1];  
    21.         }  
    22.     }   
    23.  
    24.     private static final class GenericEventExecutorChooser implements EventExecutorChooser {  
    25.         private final AtomicInteger idx = new AtomicInteger();  
    26.         private final EventExecutor[] executors;   
    27.  
    28.         GenericEventExecutorChooser(EventExecutor[] executors) {  
    29.             this.executors = executors;  
    30.         }   
    31.  
    32.         @Override  
    33.         public EventExecutor next() {  
    34.             return executors[Math.abs(idx.getAndIncrement() % executors.length)];  
    35.         }  
    36.     } 

    newChild(executor, args)的创建细节,见下:

    MultithreadEventExecutorGroup的newChild方法是一个抽象方法,故使用NioEventLoopGroup的newChild方法,即调用NioEventLoop的构造函数:

    1. @Override  
    2.     protected EventLoop newChild(Executor executor, Object... args) throws Exception {  
    3.         return new NioEventLoop(this, executor, (SelectorProvider) args[0], 
    4.             ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);  
    5.     } 

    在这里先看下NioEventLoop的类层次关系:

    创建任务队列tailTasks(内部为有界的LinkedBlockingQueue):

    创建线程的任务队列taskQueue(内部为有界的LinkedBlockingQueue),以及任务过多防止系统宕机的拒绝策略rejectedHandler。

    其中tailTasks和taskQueue均是任务队列,而优先级不同,taskQueue的优先级高于tailTasks,定时任务的优先级高于taskQueue。

    五、ServerBootstrap初始化及启动

    了解了Netty线程池NioEvenrLoopGroup的创建过程后,下面看下API网关服务ServerBootstrap的是如何使用线程池引入服务中,为高并发访问服务的。

    API网关ServerBootstrap初始化及启动代码,见下:

    1. serverBootstrap = new ServerBootstrap();  
    2. bossGroup = new NioEventLoopGroup(config.getBossGroupThreads());  
    3. workerGroup = new NioEventLoopGroup(config.getWorkerGroupThreads());   
    4.  
    5. serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)  
    6.         .option(ChannelOption.TCP_NODELAY, config.isTcpNoDelay())  
    7.         .option(ChannelOption.SO_BACKLOG, config.getBacklogSize())  
    8.         .option(ChannelOption.SO_KEEPALIVE, config.isSoKeepAlive())  
    9.         // Memory pooled  
    10.         .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)  
    11.         .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)  
    12.         .childHandler(channelInitializer);    
    13.  
    14. ChannelFuture future = serverBootstrap.bind(config.getPort()).sync();  
    15. log.info("API-gateway started on port: {}", config.getPort());  
    16. future.channel().closeFuture().sync(); 

    API网关系统使用netty自带的线程池,共有三组线程池,分别为bossGroup、workerGroup和executorGroup(使用在channelInitializer中,本文暂不作介绍)。其中,bossGroup用于接收客户端的TCP连接,workerGroup用于处理I/O、执行系统task和定时任务,executorGroup用于处理网关业务加解密、限流、路由,及将请求转发给后端的抓取服务等业务操作。

    六、Channel与线程池的绑定

    ServerBootstrap初始化后,通过调用bind(port)方法启动Server,bind的调用链如下:

    1. AbstractBootstrap.bind ->AbstractBootstrap.doBind -> AbstractBootstrap.initAndRegister 

    其中,ChannelFuture regFuture = config().group().register(channel);中的group()方法返回bossGroup,而channel在serverBootstrap的初始化过程指定channel为NioServerSocketChannel.class,至此将NioServerSocketChannel与bossGroup绑定到一起,bossGroup负责客户端连接的建立。那么NioSocketChannel是如何与workerGroup绑定到一起的?

    调用链AbstractBootstrap.initAndRegister -> AbstractBootstrap. init-> ServerBootstrap.init ->ServerBootstrapAcceptor.ServerBootstrapAcceptor ->ServerBootstrapAcceptor.channelRead:

    1. public void channelRead(ChannelHandlerContext ctx, Object msg) {  
    2.     final Channel child = (Channel) msg;  
    3.     child.pipeline().addLast(childHandler);  
    4.     for (Entry<ChannelOption<?>, Object> e: childOptions) {  
    5.         try {  
    6.             if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {  
    7.                 logger.warn("Unknown channel option: " + e);  
    8.             }  
    9.         } catch (Throwable t) {  
    10.             logger.warn("Failed to set a channel option: " + child, t); 
    11.         }  
    12.     }  
    13.     for (Entry<AttributeKey<?>, Object> e: childAttrs) {  
    14.         child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());  
    15.     } 
    16.  
    17.     try {  
    18.         childGroup.register(child).addListener(new ChannelFutureListener() {  
    19.             @Override  
    20.             public void operationComplete(ChannelFuture future) throws Exception {  
    21.                 if (!future.isSuccess()) { 
    22.                      forceClose(child, future.cause());  
    23.                 }  
    24.             }  
    25.         });  
    26.     } catch (Throwable t) {  
    27.         forceClose(child, t);  
    28.     }  

    其中,childGroup.register(child)就是将NioSocketChannel与workderGroup绑定到一起,那又是什么触发了ServerBootstrapAcceptor的channelRead方法?

    其实当一个 client 连接到 server 时,Java 底层的 NIO ServerSocketChannel 会有一个SelectionKey.OP_ACCEPT 就绪事件,接着就会调用到 NioServerSocketChannel.doReadMessages方法。

    1. @Override  
    2. protected int doReadMessages(List<Object> buf) throws Exception {  
    3.     SocketChannel ch = javaChannel().accept();  
    4.     try {  
    5.         if (ch != null) {  
    6.             buf.add(new NioSocketChannel(this, ch));  
    7.             return 1;  
    8.         }  
    9.     } catch (Throwable t) {          … 
    10.  
    11.     }  
    12.     return 0;  

    javaChannel().accept() 会获取到客户端新连接的SocketChannel,实例化为一个 NioSocketChannel, 并且传入 NioServerSocketChannel 对象(即 this),由此可知, 我们创建的这个NioSocketChannel 的父 Channel 就是 NioServerSocketChannel 实例 。

    接下来就经由 Netty 的 ChannelPipeline 机制,将读取事件逐级发送到各个 handler 中,于是就会触发前面我们提到的 ServerBootstrapAcceptor.channelRead 方法啦。

    至此,分析了Netty线程池的初始化、ServerBootstrap的启动及channel与线程池的绑定过程,能够看出Netty中线程池的优雅设计,使用不同的线程池负责连接的建立、IO读写等,为API网关项目的高并发访问提供了技术基础。

    七、总结

    网站题目:宜人贷蜂巢API网关技术解密之Netty使用实践
    文章分享:/news46/99846.html

    成都网站建设公司_创新互联,为您提供面包屑导航定制开发动态网站网站改版做网站网页设计公司

    广告

    声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联

    微信小程序开发