RocketMQ源码系列(3) — 基于Netty的网络通信

语言: CN / TW / HK

theme: cyanosis

专栏:RocketMQ 源码系列

对 Netty 网络通信不熟悉的可以先看下 Netty 系列

RocketMQ 网络通信

远程通信模块

RocketMQ 中 NameServer 与 Broker、Broker 与 Client 间网络通信相关代码集中在 rocketmq-remoting 模块下,模块主要包含基于 Netty 封装的服务器、客户端、网络通信协议等。这篇文章就来分析下 RocketMQ 是如何基于 Netty 来实现网络通信的,以后在开发类似的Client/Server通信时可以依样画葫芦。

因为是基于 Netty 的封装,从模块的类命名很容易理解其中的一些核心组件:

  • NettyRemotingServer:Netty 服务器

  • NettyRemotingClient:Netty 客户端

  • NettyRequestProcessor:Netty 网络请求处理器

  • NettyEncoder/NettyDecoder:编码器/解码器

  • NettyServerConfig/NettyClientConfig:Netty 服务端/客户端配置

  • RemotingCommand:通信协议封装

  • RocketMQSerializable:序列化

image.png

远程服务器

NamesrvController 中初始化创建了远程服务器 RemotingServer,实现类为 NettyRemotingServer,并注册了默认的请求处理器 DefaultRequestProcessor,从这可以看出 RocketMQ 是基于 Netty 进行RPC网络通信的,这两个组件就是 NameServer 处理客户端请求的核心所在。

```java // 创建 Netty 远程通信服务器,就是初始化 ServerBootstrap this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);

// 业务处理线程池,默认线程数 8 个 this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));

// 注册默认处理器和线程池 this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor); ```

从类结构图来看,Netty 服务端为 NettyRemotingServer,客户端为 NettyRemotingClient,他们都继承自 NettyRemotingAbstract 抽象基类,实现了 RemotingService 接口,NettyRemotingServer 还实现了 RemotingServer 接口。

image.png

RemotingService 主要提供了远程服务的启动、停止、注册RPC钩子函数三个接口。

java public interface RemotingService { // 启动 void start(); // 关闭 void shutdown(); // 注册钩子函数 void registerRPCHook(RPCHook rpcHook); }

RemotingServer 继承自 RemotingService,主要提供了注册请求处理器、接口调用等基础接口。

从接口定义可以看出,NamesrvController 中向 RemotingServer 注册了默认的请求处理器,还支持针对某个请求注册特定的处理器和线程池,这样可以针对不同的业务场景使用不同的处理器和线程池。

从 invokeXxx 方法可以看出,请求的执行支持三种模式:

  • invokeSync:同步调用,同步等待请求的结果,直到超时
  • invokeAsync:异步调用,注册一个回调,请求完成后执行回调
  • invokeOneway:直接发送一个请求而不关心响应,没有回调

```java public interface RemotingServer extends RemotingService { // 针对某个请求,注册处理器以及线程池 void registerProcessor(final int requestCode, final NettyRequestProcessor processor, final ExecutorService executor);

// 针对所有请求注册默认处理器
void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor);

// 返回本地监听的端口号
int localListenPort();

// 根据请求码获取对应的处理器和线程池
Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode);

// 同步调用
RemotingCommand invokeSync(final Channel channel, final RemotingCommand request, final long timeoutMillis);

// 异步调用,并注册一个执行回调
void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis, final InvokeCallback invokeCallback);

// 仅发送一个请求,不关心响应
void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis);

} ```

网络通信抽象

RocketMQ 抽象了一个 Netty 远程通信抽象基类 NettyRemotingAbstract,它对 Netty 服务器程序进行了封装,负责请求和响应的处理分发、执行,下面就先来看看这个基类的功能。

NettyRemotingAbstract

1、成员属性

NettyRemotingAbstract 有如下属性:

  • semaphoreOneway:Oneway 请求类型的信号量,用来限制 Oneway 请求类型的并发度
  • semaphoreAsync:Async 请求类型的信号量,用来限制 Async 请求类型的并发度
  • responseTable:发起请求之后,将等待请求以及回调等操作封装到 ResponseFuture 中,在响应回来之后可以接着进行后续操作。responseTable 就存储了请求编码与 ResponseFuture 的关系。
  • processorTable:这个表存储了请求编码与之对应的处理器和线程池
  • nettyEventExecutor:Netty 事件执行器
  • defaultRequestProcessor:请求默认处理器,如果 processorTable 没有特定的处理器,就使用这个默认处理器。
  • sslContext:SSL 上下文。
  • rpcHooks:RPC 回调钩子函数。

注意这些成员属性都是 protected 范围的,也就是子类可见,有些组件的设置是在子类进行的,比如注册处理器和线程池,注册RPC钩子函数、加载 SslContext 等。

NettyRemotingAbstract 默认构造方法需传入 Oneway 和 Async 请求类型的许可证数量,用来构造 semaphoreOneway 和 semaphoreAsync 两个信号量。

```java package org.apache.rocketmq.remoting.netty;

public abstract class NettyRemotingAbstract { // 限制正在进行的 OneWay 请求数,保护系统内存占用 protected final Semaphore semaphoreOneway; // 限制正在进行的异步请求数,保护系统内存占用 protected final Semaphore semaphoreAsync; // 缓存所有正在进行的请求,等待请求的响应 protected final ConcurrentMap responseTable = new ConcurrentHashMap(256); // 请求编码对应的处理器和线程池 protected final HashMap> processorTable = new HashMap>(64); // Netty 事件执行器 protected final NettyEventExecutor nettyEventExecutor = new NettyEventExecutor(); // 默认处理器和线程池 protected Pair defaultRequestProcessor; // SSL/TLS 加密通信 protected volatile SslContext sslContext; // 自定义 RPC 钩子回调 protected List rpcHooks = new ArrayList();

/**
 * @param permitsOneway one-way 请求的许可证数量.
 * @param permitsAsync  异步请求的许可证数量.
 */
public NettyRemotingAbstract(final int permitsOneway, final int permitsAsync) {
    // 创建两个信号量
    this.semaphoreOneway = new Semaphore(permitsOneway, true);
    this.semaphoreAsync = new Semaphore(permitsAsync, true);
}

} ```

2、抽象方法

NettyRemotingAbstract 有两个抽象方法:

  • getChannelEventListener:子类返回一个 Channel 的事件监听器
  • getCallbackExecutor:子类返回一个回调执行的线程池

```java public abstract class NettyRemotingAbstract {

// 抽象方法:获取网络连接事件监听器
public abstract ChannelEventListener getChannelEventListener();

// 抽象方法:获取回调的线程池
public abstract ExecutorService getCallbackExecutor();

} ```

3、处理消息接收

processMessageReceived 方法处理消息请求和响应,入参有两个:

  • ChannelHandlerContext:就是 Netty 中的连接上下文,通过它可以拿到当前的 Channel、触发管道读写事件、写回响应数据等。
  • RemotingCommand:请求数据的封装,是 RocketMQ 中的通信协议。其在 Netty 网络通道中传输时,会进行序列化、反序列化以及编解码。

通过请求的类型来看,有 REQUEST_COMMANDRESPONSE_COMMAND 两种,就是请求和响应。

java public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { final RemotingCommand cmd = msg; if (cmd != null) { switch (cmd.getType()) { case REQUEST_COMMAND: // 处理请求 processRequestCommand(ctx, cmd); break; case RESPONSE_COMMAND: // 处理响应 processResponseCommand(ctx, cmd); break; default: break; } } }

RemotingCommand

RemotingCommand 是 RocketMQ 请求、响应的对象载体,可以理解成是 RocketMQ 定义的网络通信协议。在发送请求时,会根据当前请求和数据构建一个 RemotingCommand 对象,然后经过序列化、编码成字节码,再经过 Netty 的 Channel 发送出去,另一端则从 Channel 读取字节码,经过解码、反序列化成 RemotingCommand。

RemotingCommand 主要包含如下属性:

  • code:RocketMQ 中每个请求都会有一个对应的编码,请求码的枚举定义在 RequestCode 中。
  • opaque:每个请求的ID,通过内存的一个原子计数器 requestId 自增。
  • flag:类型标识,有 请求、响应、Oneway 三种类型
  • extFields:扩展字段
  • body:请求主体数据的字节数组

可以通过它的静态方法 createRequestCommand 和 createResponseCommand 来分别创建请求和响应类型的 RemotingCommand。

```java public class RemotingCommand { private static AtomicInteger requestId = new AtomicInteger(0);

// 请求编号
private int code;
// 编程语言
private LanguageCode language = LanguageCode.JAVA;
// 版本号
private int version = 0;
// 请求ID
private int opaque = requestId.getAndIncrement();
// 类型
private int flag = 0;
// 备注
private String remark;
// 扩展字段
private HashMap<String, String> extFields;
// 自定义header
private transient CommandCustomHeader customHeader;
// 请求的消息体序列化成字节
private transient byte[] body;

// 创建请求对象
public static RemotingCommand createRequestCommand(int code, CommandCustomHeader customHeader) {
    RemotingCommand cmd = new RemotingCommand();
    cmd.setCode(code); // 请求编码
    cmd.customHeader = customHeader; // 请定义请求头
    setCmdVersion(cmd);
    return cmd;
}

// 创建响应
public static RemotingCommand createResponseCommand(int code, String remark, Class<? extends CommandCustomHeader> classHeader) {
    RemotingCommand cmd = new RemotingCommand();
    cmd.markResponseType(); // 设置为响应类型(默认为请求类型)
    cmd.setCode(code); // 请求编码
    cmd.setRemark(remark);
    setCmdVersion(cmd);
    // 设置响应头
    if (classHeader != null) {
        CommandCustomHeader objectHeader = classHeader.getDeclaredConstructor().newInstance();
        cmd.customHeader = objectHeader;
    }
    return cmd;
}

} ```

消息请求处理

1、主体流程

在接收到客户端的请求后,将由 processRequestCommand 来处理请求,主要流程如下:

  • 从处理器表processorTable获取请求编码对应的处理器和线程池,没有则使用默认的处理器和线程池
  • 接着将请求和响应的处理封装成一个 Runnable
  • 判断是否拒绝请求,是的就返回系统繁忙的编码 SYSTEM_BUSY,将响应写入 Channel。从这可以看出,将数据响应回客户端是通过 ctx.writeAndFlush() 来完成
  • 然后将前面封装的 Runnable,以及 ctx、cmd 封装成一个 RequestTask
  • 最后将 RequestTask 提交到线程池里去执行,如果线程池满了被拒绝请求,则响应系统繁忙。

```java public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) { // 支持不同的请求设置不同的处理器和线程池 final Pair matched = this.processorTable.get(cmd.getCode()); // 没有特定的处理器则使用默认的处理器 final Pair pair = null == matched ? this.defaultRequestProcessor : matched; // 请求ID final int opaque = cmd.getOpaque();

// 封装 Runnable
Runnable run = new Runnable() {...};

// 是否拒绝请求
if (pair.getObject1().rejectRequest()) {
    final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
            "[REJECTREQUEST]system busy, start flow control for a while");
    response.setOpaque(opaque);
    ctx.writeAndFlush(response);
    return;
}

try {
    // 封装一个请求任务
    final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
    // 提交到线程池异步执行
    pair.getObject2().submit(requestTask);
} catch (RejectedExecutionException e) {
    // 请求拒绝
    if (!cmd.isOnewayRPC()) {
        final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                "[OVERLOAD]system busy, start flow control for a while");
        response.setOpaque(opaque);
        ctx.writeAndFlush(response);
    }
}

} ```

2、请求处理

请求处理的主要逻辑封装在 Runnable 中,主要逻辑如下:

  • 读取 Channel 远程通信地址
  • 执行前置 RPC 钩子函数
  • 封装一个响应回调 RemotingResponseCallback,在处理完请求后将调用这个回调。这个回调会先执行后置 RPC 钩子函数,然后对非 Oneway 类型请求,将响应通过 Channel 写回调用者。
  • 最后使用处理器来处理请求,如果是异步处理器则异步处理请求;如果是同步处理器,则同步处理请求,然后同步执行回调。

从这里可以看出,请求执行完成后响应客户端是在 RemotingResponseCallback 回调中完成的,对于 Oneway 类型的请求,由于只执行请求不关心响应,因此不会响应调用者。

而在请求前或请求完成后,想要对请求做一些定制化可以注册自定义 RPCHook 钩子函数来完成。

java Runnable run = new Runnable() { @Override public void run() { try { // 获取通道远程Broker地址 String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); // 请求之前执行RPC钩子 doBeforeRpcHooks(remoteAddr, cmd); // 封装一个响应回调 final RemotingResponseCallback callback = new RemotingResponseCallback() { @Override public void callback(RemotingCommand response) { // 请求之后执行PRC钩子 doAfterRpcHooks(remoteAddr, cmd, response); // 非 Oneway 请求 if (!cmd.isOnewayRPC()) { if (response != null) { response.setOpaque(opaque); // 设置请求ID response.markResponseType(); // 标记为响应类型 response.setSerializeTypeCurrentRPC(cmd.getSerializeTypeCurrentRPC()); // 写回响应 ctx.writeAndFlush(response); } } } }; if (pair.getObject1() instanceof AsyncNettyRequestProcessor) { // 异步处理器,并注册回调 AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor) pair.getObject1(); processor.asyncProcessRequest(ctx, cmd, callback); } else { // 同步处理再执行回调 NettyRequestProcessor processor = pair.getObject1(); RemotingCommand response = processor.processRequest(ctx, cmd); callback.callback(response); } } catch (Throwable e) { //... } } };

消息响应处理

1、响应处理

在接收服务端的响应后,将由 processResponseCommand 来处理响应,主要流程如下:

  • 从响应表 responseTable 中获取对应 ResponseFuture,这是在执行请求时存起来的,然后将其从 responseTable 表中移除。
  • 如果是异步调用,一般会设置一个执行回调,这时就会去执行回调函数
  • 如果是同步调用,就会调用 putResponse 通知等待方响应回来了。

```java public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) { // 请求ID final int opaque = cmd.getOpaque(); // 获取响应 Future final ResponseFuture responseFuture = responseTable.get(opaque); if (responseFuture != null) { responseFuture.setResponseCommand(cmd); // 移除 responseTable.remove(opaque);

    // 异步调用
    if (responseFuture.getInvokeCallback() != null) {
        executeInvokeCallback(responseFuture);
    } 
    // 同步调用
    else {
        responseFuture.putResponse(cmd);
        responseFuture.release();
    }
} else {
    log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
}

} ```

请求调用方式

NettyRemotingAbstract 提供了三个执行请求的方法,可以实现同步调用、异步调用和 Oneway 请求,客户端封装好 RemotingCommand 后,就可以调用这三个方法发起远程调用。

1、同步调用

同步调用的主要流程如下:

  • 将当前请求ID、Channel 封装一个 ResponseFuture,然后放入响应表 responseTable 中。
  • 将请求数据写入 Channel,并添加一个 Channel 监听器,在请求完成后设置 ResponseFuture 的状态
  • 然后调用 responseFuture.waitResponse() 开始等待响应,直到响应回来或者超时,最后从 responseTable 表中移除当前请求

同步执行就是在主线程中同步调用服务端接口,然后同步等待响应结果直到超时。

```java // 同步调用 public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) { // 请求ID final int opaque = request.getOpaque(); try { // 封装响应 Future final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null); // 暂存到响应表 this.responseTable.put(opaque, responseFuture);

    // 把数据写回客户端,并添加一个监听器
    channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
        // 请求完成后的事件监听
        @Override
        public void operationComplete(ChannelFuture f) throws Exception {
            if (f.isSuccess()) {
                // 请求成功
                responseFuture.setSendRequestOK(true);
                return;
            } else {
                // 请求失败
                responseFuture.setSendRequestOK(false);
            }

            // 请求失败
            responseTable.remove(opaque);
            responseFuture.setCause(f.cause());
            responseFuture.putResponse(null);
        }
    });

    // 同步等待响应直到完成或超时
    RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
    final SocketAddress addr = channel.remoteAddress();
    if (null == responseCommand) {
        // 请求超时
        if (responseFuture.isSendRequestOK()) {
            throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis, responseFuture.getCause());
        } else {
            // 请求发送异常
            throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
        }
    }

    return responseCommand;
} finally {
    // 移除 ResponseFuture
    this.responseTable.remove(opaque);
}

} ```

2、异步执行

异步执行的流程如下:

  • 先通过异步信号量semaphoreAsync获取到一个许可证,如果并发太高获取不到信号量,就可能超时
  • 拿到信号量许可证后,封装一个信号量释放器 SemaphoreReleaseOnlyOnce,它主要的作用是后面释放这个许可证
  • 接着将 Channel、请求ID、执行回调、信号量释放器等封装到 ResponseFuture 中,然后放到响应表 responseTable 中。
  • 之后就是数据发送到服务端,然后注册一个监听器,在请求完成后设置请求状态。
  • 请求完成后,最后释放资源,其实就是在释放信号量的许可证。

异步执行会通过一个信号量来控制异步执行的并发度(默认64),异步执行有回调,它被封装到 ResponseFuture,响应回来后会进入 processResponseCommand 中处理,在里面就会调用 responseFuture.executeInvokeCallback() 来执行这个回调。

```java public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis, final InvokeCallback invokeCallback) { final int opaque = request.getOpaque(); // 请求ID // 获取信号量(默认64个信号量) boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); if (acquired) { // 封装信号量释放 final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);

    // 封装响应 Future,请求回来后再执行回调
    final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
    // 暂存到响应表
    this.responseTable.put(opaque, responseFuture);
    try {
        // 向服务端写数据
        channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture f) throws Exception {
                if (f.isSuccess()) { // 请求成功
                    responseFuture.setSendRequestOK(true);
                    return;
                }
                // 请求失败
                requestFail(opaque);
                log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
            }
        });
    } catch (Exception e) {
        responseFuture.release(); // 释放信号量
        throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
    }
} else {
    // ...
    throw new RemotingTimeoutException(info);
}

} ```

3、Oneway 执行

Oneway 就是发起一起请求,只管发送,不需要响应,不需要执行回调。流程也比较简单:

  • 将请求标记为 Oneway
  • 通过Oneway信号量semaphoreOneway获取到一个许可证,封装信号量释放器
  • 向 Channel 写入数据,发送到服务端
  • 最后释放信号量许可证

java public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) { // 标记为 oneway 请求 request.markOnewayRPC(); // 获取 oneway 信号量 boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); if (acquired) { // 封装信号量释放 final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway); try { // 写入数据 channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { once.release(); if (!f.isSuccess()) { log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed."); } } }); } catch (Exception e) { once.release(); throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e); } } else { // .... throw new RemotingTimeoutException(info); } }

基于信号量的资源控制

一般如果想要控制并发、资源隔离,可以通过线程池或者信号量来实现,NettyRemotingAbstract 中使用信号量来控制异步请求和Oneway请求的并发。

同步执行 invokeAsyncImpl 是在主线程中执行并同步等待响应结果,因此主线程的线程数就是最大的并发度,无需控制并发。

1、异步执行信号量

异步执行 invokeAsyncImpl 由于是异步等待执行结果,要保存 ResponseFuture 到响应表 responseTable 中。为了避免内存资源占用过大,使用了一个信号量 semaphoreAsync 来控制并发度,信号量许可证数量默认为 64。

java boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);

为了释放这个许可证,RocketMQ 将信号量封装到 SemaphoreReleaseOnlyOnce 中,它会保证只会释放一个许可证给信号量,这个是通过原子类 AtomicBoolean 来控制的。

```java public class SemaphoreReleaseOnlyOnce { private final AtomicBoolean released = new AtomicBoolean(false); private final Semaphore semaphore;

public SemaphoreReleaseOnlyOnce(Semaphore semaphore) {
    this.semaphore = semaphore;
}

public void release() {
    if (this.semaphore != null) {
        if (this.released.compareAndSet(false, true)) {
            this.semaphore.release();
        }
    }
}

} ```

SemaphoreReleaseOnlyOnce 会放入 ResponseFuture 中,在 processResponseCommand 中处理响应时,就会去释放这个许可证。

```java responseFuture.release();

// 释放许可证 public void release() { if (this.once != null) { this.once.release(); } } ```

从上面的流程可以看出,异步请求的信号量并发控制是从发起请求,直到响应回来之后才会释放许可证。

2、Oneway执行信号量

Oneway 执行 invokeOnewayImpl 是发送请求后就不关注响应结果,也不需要保存 ResponseFuture,照理来说无需控制并发度,不过 Oneway 请求也通过一个信号量 semaphoreOneway 来控制并发度,信号量许可证数量默认是 256,这个并发度比异步执行的高很多。

java boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);

Oneway执行也会将创建一个 SemaphoreReleaseOnlyOnce,不过它的释放是在发送请求完成后就释放。

channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { once.release(); // 释放许可证 } });

等待-通知机制

同步执行或异步执行会将当前 Channel、请求ID 等封装创建一个 ResponseFuture,然后将其放入响应表 responseTable 中,在响应回来之后再取出来做后续的处理。

1、ResponseFuture

ResponseFuture 主要有请求ID、Channel、执行回调等属性,其使用 CountDownLatch 来实现等待-通知的效果。

```java public class ResponseFuture { // 请求ID private final int opaque; // 请求的网络通道 private final Channel processChannel; // 超时时间 private final long timeoutMillis; // 回调函数 private final InvokeCallback invokeCallback; // 开始时间 private final long beginTimestamp = System.currentTimeMillis(); // 计数器 private final CountDownLatch countDownLatch = new CountDownLatch(1);

// 支持 Semaphore 仅释放一次的组件
private final SemaphoreReleaseOnlyOnce once;

// 仅执行回调的标识
private final AtomicBoolean executeCallbackOnlyOnce = new AtomicBoolean(false);
// 请求命令
private volatile RemotingCommand responseCommand;
// 请求是否发送成功
private volatile boolean sendRequestOK = true;
// RPC 请求一次
private volatile Throwable cause;

} ```

2、同步等待-通知

同步 invokeSyncImpl 执行时,创建好 ResponseFuture 放入 responseTable 中,然后就调用 responseFuture.waitResponse 开始等待响应结果。

java // 封装响应 Future final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null); // 暂存到响应表 this.responseTable.put(opaque, responseFuture); //... // 同步等待响应直到完成或超时 RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);

可以看到它是通过 CountDownLatch 来等待,CountDownLatch 的计数为 1,只要另一个地方调用了 countDown 这边就会收到通知,然后返回 responseCommand。

java public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException { this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS); return this.responseCommand; }

而这个通知的操作就是在 processResponseCommand 中,可以看到响应回来之后,会从 responseTable 中取出 ResponseFuture,并设置 RemotingCommand,对于同步调用,就会调用 ResponseFuture 的 putResponse 和 release 方法。

```java public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) { // 请求ID final int opaque = cmd.getOpaque(); // 获取响应 Future final ResponseFuture responseFuture = responseTable.get(opaque); // 设置 RemotingCommand responseFuture.setResponseCommand(cmd); // 移除 ResponseFuture responseTable.remove(opaque);

// 同步执行没有回调,异步要执行回调
if (responseFuture.getInvokeCallback() != null) {
    executeInvokeCallback(responseFuture);
}
// 同步执行,响应回来,放入响应
else {
    responseFuture.putResponse(cmd);
    responseFuture.release();
}

} ```

可以看到,putResponse 内就是调用 countDownLatch.countDown() 通知等待结束。

java public void putResponse(final RemotingCommand responseCommand) { this.responseCommand = responseCommand; // 响应回来后计数器减 1(减为 0) this.countDownLatch.countDown(); }

从上面分析就可以知道 RocketMQ 使用 CountDownLatch 来实现同步调用中的等待-通知机制。

3、异步回调

异步 invokeAsyncImpl 执行时,创建好 ResponseFuture 并放入 responseTable 中,注意异步调用时会传入一个 InvokeCallback 执行回调,会一并放入 ResponseFuture 中。

java // 封装响应 Future,请求回来后再执行回调 final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once); // 暂存到响应表 this.responseTable.put(opaque, responseFuture);

响应回来后,在 processResponseCommand 中就会执行这个回调,如果子类 getCallbackExecutor() 能返回线程池,将使用线程池异步执行回调,执行完后再释放资源;如果没有线程池,或者异步执行错误,将在主线程同步执行回调,然后释放信号量。

```java private void executeInvokeCallback(final ResponseFuture responseFuture) { boolean runInThisThread = false; ExecutorService executor = this.getCallbackExecutor(); if (executor != null) { try { // 有回调线程池则异步执行回调 executor.submit(new Runnable() { @Override public void run() { try { responseFuture.executeInvokeCallback(); } finally { responseFuture.release(); } } }); } catch (Exception e) { // 线程池执行报错在主线程执行 runInThisThread = true; } } else { runInThisThread = true; }

// 没有线程池则同步回调
if (runInThisThread) {
    try {
        responseFuture.executeInvokeCallback();
    } finally {
        responseFuture.release();
    }
}

} ```

网络通信流程

通过前面的分析,几乎已经对 RocketMQ 的通信处理机制有个大概的认识了,下面通过一张图来总结下。

image.png