Netty核心概念之ChannelHandler&Pipeline&ChannelHandlerContext

語言: CN / TW / HK

概述

之前學習的Reactor模型只是懂得了伺服器端如何把一個連線放到workergroup中處理,那麼真正處理某一個具體的請求的過程是什麼樣的呢

基本概念

  1. 我們知道資料的讀取都是通過channel來完成的,Netty在channel的IO事件中定義了許多的生命週期函式,Netty將這些生命週期函式封裝在Handler中,通過責任鏈的模式封裝在了一個 pipeline中。當 channel中觸發了對應的IO事件,就會呼叫pipeline中的頭或者尾的一個handler,至於是否傳遞到下一級,由對應的handler判斷.
  2. Netty把IO事件分成兩類,一個是Netty從網路中讀取資料,成為InBound。一個是Netty把資料寫到網路中,成為OutBound。Handler也被分成ChannelInBoundHandler和ChannelOutBoundHandler,他們各自有著不同的生命週期函式
  3. pipeline是要給雙向連結串列,有著頭尾兩個指標,inbound事件是從head節點往後傳播,outbound事件是從tail節點往前傳播
  4. 為了能讓handler更加的靈活,Netty給handler外面包了一層的 HandlerContext. context允許handler在生命週期方法中直接改變資料的流向,比如讀完資料就可以讓 context在把資料寫出去,那麼資料的流向就從 inbound變成了outbound

InboundHandler

public interface ChannelInboundHandler extends ChannelHandler {

    # channel 註冊到eventLoop
    void channelRegistered(ChannelHandlerContext ctx) throws Exception;

    # channel從eventloop中取消註冊
    void channelUnregistered(ChannelHandlerContext ctx) throws Exception;

    # channel已經建立了連線
    void channelActive(ChannelHandlerContext ctx) throws Exception;

    # channel結束了連線
    void channelInactive(ChannelHandlerContext ctx) throws Exception;

    # channel從網路中讀到了資料
    void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;

    # channel讀取完資料
    void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
    
    # 讀取資料中發生了異常呼叫該方法
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}
複製程式碼

我們來看看-生命週期的呼叫原始碼。

  1. channelRegistered: 在AbstractChannel方法中register方法中
private void register0(ChannelPromise promise) {
    try {
        # 這裡就是讓 channel註冊到eventloop並且註冊到selector中的方法
        doRegister();
        # 註冊結束之後就會呼叫 pipeline的fireChannelRegistered方法
        pipeline.fireChannelRegistered();
      
        if (isActive()) {
            if (firstRegistration) {
                # 如果是啟用狀態還會呼叫 fireChannelActive 方法
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {

            }
        }
    } catch (Throwable t) {
      
    }
}
複製程式碼

那麼我們來看看pipeline的原始碼

@Override
public final ChannelPipeline fireChannelRegistered() {
    # 傳了一個pipeline 的頭結點進去,說明是從頭開始傳播的
    AbstractChannelHandlerContext.invokeChannelRegistered(head);
    return this;
}
複製程式碼

pipeline呼叫了 context的靜態方法,下面的都是context內部的呼叫關係

# AbstractChannelHandlerContext 的 invokeChannelRegistered 方法
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        # 又呼叫頭結點的 invokeChannelRegistered 方法
        next.invokeChannelRegistered();
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRegistered();
            }
        });
    }
}

private void invokeChannelRegistered() {
    if (invokeHandler()) {
        try {
            # 拿到head節點的 handler呼叫handler的鉤子函式
            ((ChannelInboundHandler) handler()).channelRegistered(this);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireChannelRegistered();
    }
}
複製程式碼

最後呼叫到了就是我們自定義的handler的鉤子函式,特別注意如果我們在這個函式裡面沒有呼叫super.channelRegistered(ctx);那麼轉播就會終止

@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    System.out.println("channelRegistered----Inbound1");
    super.channelRegistered(ctx);
}
複製程式碼

如果我們繼續轉播的話, context就會找到下一個inbound的context然後再執行方法。

@Override
public ChannelHandlerContext fireChannelRegistered() {
    invokeChannelRegistered(findContextInbound());
    return this;
}

private AbstractChannelHandlerContext findContextInbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.next;
    } while (!ctx.inbound);
    return ctx;
}
複製程式碼

outboundHandler

public interface ChannelOutboundHandler extends ChannelHandler {
    # 當繫結埠成功後觸發
    void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;
    # 客戶端連線到伺服器觸發
    void connect(
            ChannelHandlerContext ctx, SocketAddress remoteAddress,
            SocketAddress localAddress, ChannelPromise promise) throws Exception;

    # 斷開連線觸發
    void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
   
    void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
    
    void read(ChannelHandlerContext ctx) throws Exception;

    void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;

    void flush(ChannelHandlerContext ctx) throws Exception;
}
複製程式碼

outbound和inbound就是觸發的時機不同,基本上用不太到,以後有用到再來補。

ChannelInitializer

在 BootstrapServer中,我們往往會建立一個 ChannelInitializer的 ChildHandler。然後在 initChannel方法中對新來的 channel的pipeline中新增handler。,我們來看看這個流程

  1. 在serverBootStrap中存放了ChannelInitializer
public static void main(String[] args) throws Exception {

    new ServerBootstrap()
            .group(new NioEventLoopGroup(), new NioEventLoopGroup())
            .channel(NioServerSocketChannel.class)
            # 儲存childHandler
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new Inbound1());
                    ch.pipeline().addLast(new OutBound1());
                    ch.pipeline().addLast(new Inbound2());
                    ch.pipeline().addLast(new OutBound2());
                    ch.pipeline().addLast(new Inbound3());
                    ch.pipeline().addLast(new OutBound3());
                }
            }).bind(9090).sync().channel().closeFuture().sync();
}
複製程式碼
  1. 當客戶端連線服務端的時候會觸發 ServerBootStrap的 channelRead方法
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    # 這就是客戶端channel
    final Channel child = (Channel) msg;
    # 給客戶端Channel添加了剛才寫的ChannelInitializer
    child.pipeline().addLast(childHandler);
}
複製程式碼
  1. 然後就是reactor的一系列東西,這個channel被bossgroup添加了一個handler後丟到了workergroup,然後準備註冊到workergroup的channel的時候
private void register0(ChannelPromise promise) {
    try {
        # 這裡就是讓 channel註冊到eventloop並且註冊到selector中的方法
        doRegister();
        # 呼叫pipeline的新增handler的方法
        pipeline.invokeHandlerAddedIfNeeded();

        # 註冊結束之後就會呼叫 pipeline的fireChannelRegistered方法
        pipeline.fireChannelRegistered();
      
        if (isActive()) {
            if (firstRegistration) {
                # 如果是啟用狀態還會呼叫 fireChannelActive 方法
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {

            }
        }
    } catch (Throwable t) {
      
    }
}
複製程式碼
  1. 然後最後掉到pipeline的 callHandlerAdded0方法,裡面拿到handler執行handlerAdded方法

5. pipeline中的方法

總結

這幾個流程就大體的梳理完了,總結一下

  1. 在建立ServerBootStrap的時候傳了一個initHandler
  2. 在server端收到客戶端連線的時候就把這個initHandler新增到這個客戶端channel的pipeline中
  3. 然後客戶端準備註冊到eventLoop的時候會去呼叫一下pipeline中的inithandler,然後這時候客戶端的pipeline就新增到了使用者自定義的handler。
  4. 然後把這個初始化的handler給remove掉

最後

如果你覺得此文對你有一丁點幫助,點個贊。或者可以加入我的開發交流群:1025263163相互學習,我們會有專業的技術答疑解惑

如果你覺得這篇文章對你有點用的話,麻煩請給我們的開源專案點點star:http://github.crmeb.net/u/defu不勝感激 !

PHP學習手冊:http://doc.crmeb.com
技術交流論壇:http://q.crmeb.com