39°

基于Netty和SpringBoot实现一个轻量级RPC框架-Server篇

前提

前置文章:

在前置的《基于Netty和SpringBoot实现一个轻量级RPC框架-协议篇》一文中已经定义了一个相对简单的RPC私有协议,并且实现了对应的编码和解码模块。这篇文章基于协议篇,完成Server端代码调用的编写。考虑到目前相对主流的IOC容器是Spring,这里选用了spring-boot-starter(非MVC容器,只是单纯管理Bean),依赖JDK1.8+

思路

首先RPC私有协议定义了Client端会传过来四个和服务调用息息相关的字符:接口全类名interfaceName、方法名methodName、方法参数签名字符串数组methodArgumentSignatures(可选,这个参数不是必须传入的)以及方法参数数组methodArguments(可选,空方法列表的时候不需要传入参数)。主要流程如下:

  • Server端的所有服务端(实现)类交由IOC容器托管。
  • Client端发起RPC请求。
  • 通过前面提到的最多四个参数,从Server服务实例的IOC容器中匹配出吻合度最高的一个方法java.lang.reflect.Method实例、该方法实例的宿主类以及宿主类对应的Bean实例,如果这一步匹配的目标方法超过1个或者为0个,可以直接返回异常信息。
  • 把前一步得到的Method实例、宿主类Bean实例,结合方法参数数组methodArguments进行反射调用,得到调用结果。
  • Server端把响应结果封装到payload通过私有协议发送回Client端。

Server端代码实现

为了暂时方便起见,部分数组入参被重新封装为ArrayList,实际上编写RPC框架的时候应该优先考虑性能问题,像JDK提供的集合类库等等应该尽可能少用(以ArrayList为例,扩容的时候存在底层Object[]拷贝,造成性能损失和额外的内存消耗),极尽可能使用基本类型和数组。

先定义方法匹配器MethodMatcher相关的类:

public interface MethodMatcher {
/**
 * 查找一个匹配度最高的方法信息
 *
 * @param input input
 * @return output
 */
MethodMatchOutput selectOneBestMatchMethod(MethodMatchInput input);

}

// 输入值 @EqualsAndHashCode @Data public class MethodMatchInput {

private String interfaceName;

private String methodName;

private List<String> methodArgumentSignatures;

private int methodArgumentArraySize;

}

// 输出值 @Data public class MethodMatchOutput {

/**
 * 目标方法实例
 */
private Method targetMethod;

/**
 * 目标实现类 - 这个有可能是被Cglib增强过的类型,是宿主类的子类,如果没有被Cglib增强过,那么它就是宿主类
 */
private Class<?> targetClass;

/**
 * 宿主类
 */
private Class<?> targetUserClass;

/**
 * 宿主类Bean实例
 */
private Object target;

/**
 * 方法参数类型列表
 */
private List<Class<?>> parameterTypes;

}

目标方法匹配的逻辑大致如下:

  1. 方法名称和方法实例的宿主类型一定作为匹配条件的一部分。
  2. 如果传入了参数签名列表,优先使用参数签名列表类型进行匹配。
  3. 如果没有传入参数签名列表,那么使用参数的数量进行匹配。
  4. 如果参数签名列表和参数列表都没有传入,那么只能通过方法名称和方法实例的宿主类型匹配。
  5. 考虑到方法匹配解析的过程相对耗时,需要把结果缓存起来。

分析至此,可以基于反射,编写一个抽象的方法匹配器BaseMethodMatcher,然后把获取宿主类信息的功能委托到子类:

public class MethodMatchException extends RuntimeException {
public MethodMatchException(String message) {
    super(message);
}

public MethodMatchException(String message, Throwable cause) {
    super(message, cause);
}

public MethodMatchException(Throwable cause) {
    super(cause);
}

}

@Data public class HostClassMethodInfo {

private Class<?> hostClass;
private Class<?> hostUserClass;
private Object hostTarget;

}

@Slf4j abstract class BaseMethodMatcher implements MethodMatcher {

private final ConcurrentMap<MethodMatchInput, MethodMatchOutput> cache = Maps.newConcurrentMap();

@Override
public MethodMatchOutput selectOneBestMatchMethod(MethodMatchInput input) {
    return cache.computeIfAbsent(input, in -> {
        try {
            MethodMatchOutput output = new MethodMatchOutput();
            Class<?> interfaceClass = Class.forName(in.getInterfaceName());
            // 获取宿主类信息
            HostClassMethodInfo info = findHostClassMethodInfo(interfaceClass);
            List<Method> targetMethods = Lists.newArrayList();
            ReflectionUtils.doWithMethods(info.getHostUserClass(), targetMethods::add, method -> {
                String methodName = method.getName();
                Class<?> declaringClass = method.getDeclaringClass();
                List<Class<?>> inputParameterTypes = Optional.ofNullable(in.getMethodArgumentSignatures())
                        .map(mas -> {
                            List<Class<?>> list = Lists.newArrayList();
                            mas.forEach(ma -> list.add(ClassUtils.resolveClassName(ma, null)));
                            return list;
                        }).orElse(Lists.newArrayList());
                output.setParameterTypes(inputParameterTypes);
                // 如果传入了参数签名列表,优先使用参数签名列表类型进行匹配
                if (!inputParameterTypes.isEmpty()) {
                    List<Class<?>> parameterTypes = Lists.newArrayList(method.getParameterTypes());
                    return Objects.equals(methodName, in.getMethodName()) &&
                            Objects.equals(info.getHostUserClass(), declaringClass) &&
                            Objects.equals(parameterTypes, inputParameterTypes);
                }
                // 如果没有传入参数签名列表,那么使用参数的数量进行匹配
                if (in.getMethodArgumentArraySize() > 0) {
                    List<Class<?>> parameterTypes = Lists.newArrayList(method.getParameterTypes());
                    return Objects.equals(methodName, in.getMethodName()) &&
                            Objects.equals(info.getHostUserClass(), declaringClass) &&
                            in.getMethodArgumentArraySize() == parameterTypes.size();

                }
                // 如果参数签名列表和参数列表都没有传入,那么只能通过方法名称和方法实例的宿主类型匹配
                return Objects.equals(methodName, in.getMethodName()) &&
                        Objects.equals(info.getHostUserClass(), declaringClass);

            });
            if (targetMethods.size() != 1) {
                throw new MethodMatchException(String.format("查找到目标方法数量不等于1,interface:%s,method:%s",
                        in.getInterfaceName(), in.getMethodName()));
            }
            Method targetMethod = targetMethods.get(0);
            output.setTargetClass(info.getHostClass());
            output.setTargetMethod(targetMethod);
            output.setTargetUserClass(info.getHostUserClass());
            output.setTarget(info.getHostTarget());
            return output;
        } catch (Exception e) {
            log.error("查找匹配度最高的方法失败,输入参数:{}", JSON.toJSONString(in), e);
            if (e instanceof MethodMatchException) {
                throw (MethodMatchException) e;
            } else {
                throw new MethodMatchException(e);
            }
        }
    });
}

/**
 * 获取宿主类的信息
 *
 * @param interfaceClass interfaceClass
 * @return HostClassMethodInfo
 */
abstract HostClassMethodInfo findHostClassMethodInfo(Class<?> interfaceClass);

}

接着,通过接口类型获取宿主类的功能就委托给Spring实现,从IOC容器中获取,定义SpringMethodMatcher

@Component
public class SpringMethodMatcher extends BaseMethodMatcher implements BeanFactoryAware {
private DefaultListableBeanFactory beanFactory;

@Override
public void setBeanFactory(@NonNull BeanFactory beanFactory) throws BeansException {
    this.beanFactory = (DefaultListableBeanFactory) beanFactory;
}

@Override
HostClassMethodInfo findHostClassMethodInfo(Class<?> interfaceClass) {
    HostClassMethodInfo info = new HostClassMethodInfo();
    // 从容器中通过接口类型获取对应的实现,实现必须只有一个
    Object bean = beanFactory.getBean(interfaceClass);
    info.setHostTarget(bean);
    info.setHostClass(bean.getClass());
    info.setHostUserClass(ClassUtils.getUserClass(bean.getClass()));
    return info;
}

}

至此,目标方法匹配的模块已经编写完毕,接下来需要处理方法参数列表的反序列化。编写协议的时候,笔者把方法参数列表methodArguments存放在Object数组中,传输的时候序列化为byte数组,经过协议解析之后,方法参数列表的实际类型为ByteBuf数组(这是因为Netty中的字节容器就是ByteBuf),那么需要考虑把ByteBuf数组转换为目标方法的参数类型实例。主要步骤如下:

  1. 如果方法参数列表为空,那么什么都不用做,也就是调用了无参数的方法。
  2. 如果方法参数列表不为空同时方法参数类型列表不为空,优先选用方法参数类型列表进行转换。
  3. 如果方法参数列表不为空同时方法参数类型列表为空,则使用Method#getParameterTypes()得到的方法参数列表类型进行转换。

定义一个方法参数转换器接口MethodArgumentConverter

public interface MethodArgumentConverter {
ArgumentConvertOutput convert(ArgumentConvertInput input);

}

@Data public class ArgumentConvertInput {

/**
 * 目标方法
 */
private Method method;

/**
 * 方法参数类型列表
 */
private List<Class<?>> parameterTypes;

/**
 * 方法参数列表
 */
private List<Object> arguments;

}

@Data public class ArgumentConvertOutput {

private Object[] arguments;

}

方法参数转换器的默认实现如下:

@Slf4j
@Component
public class DefaultMethodArgumentConverter implements MethodArgumentConverter {
private final Serializer serializer = FastJsonSerializer.X;

@Override
public ArgumentConvertOutput convert(ArgumentConvertInput input) {
    ArgumentConvertOutput output = new ArgumentConvertOutput();
    try {
        if (null == input.getArguments() || input.getArguments().isEmpty()) {
            output.setArguments(new Object[0]);
            return output;
        }
        List<Class<?>> inputParameterTypes = input.getParameterTypes();
        int size = inputParameterTypes.size();
        if (size > 0) {
            Object[] arguments = new Object[size];
            for (int i = 0; i < size; i++) {
                ByteBuf byteBuf = (ByteBuf) input.getArguments().get(i);
                int readableBytes = byteBuf.readableBytes();
                byte[] bytes = new byte[readableBytes];
                byteBuf.readBytes(bytes);
                arguments[i] = serializer.decode(bytes, inputParameterTypes.get(i));
                byteBuf.release();
            }
            output.setArguments(arguments);
            return output;
        }
        Class<?>[] parameterTypes = input.getMethod().getParameterTypes();
        int len = parameterTypes.length;
        Object[] arguments = new Object[len];
        for (int i = 0; i < len; i++) {
            ByteBuf byteBuf = (ByteBuf) input.getArguments().get(i);
            int readableBytes = byteBuf.readableBytes();
            byte[] bytes = new byte[readableBytes];
            byteBuf.readBytes(bytes);
            arguments[i] = serializer.decode(bytes, parameterTypes[i]);
            byteBuf.release();
        }
        output.setArguments(arguments);
        return output;
    } catch (Exception e) {
        throw new ArgumentConvertException(e);
    }
}

}

所有前置工作都完成了,现在编写一个Server端的入站处理器ServerHandler,暂时不做代码逻辑优化,只做实现,把反射调用的模块直接在此类中编写:

@Component
@Slf4j
public class ServerHandler extends SimpleChannelInboundHandler<RequestMessagePacket> {
@Autowired
private MethodMatcher methodMatcher;

@Autowired
private MethodArgumentConverter methodArgumentConverter;

@Override
protected void channelRead0(ChannelHandlerContext ctx, RequestMessagePacket packet) throws Exception {
    log.info("服务端接收到:{}", packet);
    MethodMatchInput input = new MethodMatchInput();
    input.setInterfaceName(packet.getInterfaceName());
    input.setMethodArgumentSignatures(Optional.ofNullable(packet.getMethodArgumentSignatures())
            .map(Lists::newArrayList).orElse(Lists.newArrayList()));
    input.setMethodName(packet.getMethodName());
    Object[] methodArguments = packet.getMethodArguments();
    input.setMethodArgumentArraySize(null != methodArguments ? methodArguments.length : 0);
    MethodMatchOutput output = methodMatcher.selectOneBestMatchMethod(input);
    log.info("查找目标实现方法成功,目标类:{},宿主类:{},宿主方法:{}",
            output.getTargetClass().getCanonicalName(),
            output.getTargetUserClass().getCanonicalName(),
            output.getTargetMethod().getName()
    );
    Method targetMethod = output.getTargetMethod();
    ArgumentConvertInput convertInput = new ArgumentConvertInput();
    convertInput.setArguments(input.getMethodArgumentArraySize() &gt; 0 ? Lists.newArrayList(methodArguments) : Lists.newArrayList());
    convertInput.setMethod(output.getTargetMethod());
    convertInput.setParameterTypes(output.getParameterTypes());
    ArgumentConvertOutput convertOutput = methodArgumentConverter.convert(convertInput);
    ReflectionUtils.makeAccessible(targetMethod);
    // 反射调用
    Object result = targetMethod.invoke(output.getTarget(), convertOutput.getArguments());
    ResponseMessagePacket response = new ResponseMessagePacket();
    response.setMagicNumber(packet.getMagicNumber());
    response.setVersion(packet.getVersion());
    response.setSerialNumber(packet.getSerialNumber());
    response.setAttachments(packet.getAttachments());
    response.setMessageType(MessageType.RESPONSE);
    response.setErrorCode(200L);
    response.setMessage("Success");
    response.setPayload(JSON.toJSONString(result));
    log.info("服务端输出:{}", JSON.toJSONString(response));
    ctx.writeAndFlush(response);
}

}

编写一个Server的启动类ServerApplication,在Spring容器启动之后,启动Netty服务:

@SpringBootApplication(scanBasePackages = "club.throwable.server")
@Slf4j
public class ServerApplication implements CommandLineRunner {
@Value("${netty.port:9092}")
private Integer nettyPort;

@Autowired
private ServerHandler serverHandler;

public static void main(String[] args) throws Exception {
    SpringApplication.run(ServerApplication.class, args);
}

@Override
public void run(String... args) throws Exception {
    int port = nettyPort;
    ServerBootstrap bootstrap = new ServerBootstrap();
    EventLoopGroup bossGroup = new NioEventLoopGroup();
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
        bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer&lt;SocketChannel&gt;() {

                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
                        ch.pipeline().addLast(new LengthFieldPrepender(4));
                        ch.pipeline().addLast(new RequestMessagePacketDecoder());
                        ch.pipeline().addLast(new ResponseMessagePacketEncoder(FastJsonSerializer.X));
                        ch.pipeline().addLast(serverHandler);
                    }
                });
        ChannelFuture future = bootstrap.bind(port).sync();
        log.info("启动NettyServer[{}]成功...", port);
        future.channel().closeFuture().sync();
    } finally {
        workerGroup.shutdownGracefully();
        bossGroup.shutdownGracefully();
    }
}

}

最后,编写契约包和契约实现:

- ch0-custom-rpc-protocol          项目根目录
  - club.throwable
    - utils                        工具类
    - protocol                     协议
    - exception                    异常
    - contract                     契约
      - HelloService               契约接口
    - server                       服务端
      - contract
        - DefaultHelloService      契约接口实现
public interface HelloService {
String sayHello(String name);

}

// 实现 @Service public class DefaultHelloService implements HelloService {

@Override
public String sayHello(String name) {
    return String.format("%s say hello!", name);
}

}

先启动服务端ServerApplication,再启动上一节提到的TestProtocolClient,输出结果:

// 服务端日志
2020-01-15 00:05:57.898  INFO 14420 --- [           main] club.throwable.server.ServerApplication  : 启动NettyServer[9092]成功...
2020-01-15 00:06:05.980  INFO 14420 --- [ntLoopGroup-3-1] club.throwable.server.ServerHandler      : 服务端接收到:RequestMessagePacket(interfaceName=club.throwable.contract.HelloService, methodName=sayHello, methodArgumentSignatures=[java.lang.String], methodArguments=[PooledUnsafeDirectByteBuf(ridx: 0, widx: 6, cap: 6/139)])
2020-01-15 00:06:07.448  INFO 14420 --- [ntLoopGroup-3-1] club.throwable.server.ServerHandler      : 查找目标实现方法成功,目标类:club.throwable.server.contract.DefaultHelloService,宿主类:club.throwable.server.contract.DefaultHelloService,宿主方法:sayHello
2020-01-15 00:06:07.521  INFO 14420 --- [ntLoopGroup-3-1] club.throwable.server.ServerHandler      : 服务端输出:{"attachments":{},"errorCode":200,"magicNumber":10086,"message":"Success","messageType":"RESPONSE","payload":"\"doge say hello!\"","serialNumber":"65f01b8e89bb479b8a36a60bd6519617","version":1}

// 客户端日志 00:06:05.891 [main] INFO club.throwable.protocol.TestProtocolClient - 启动NettyClient[9092]成功... ...省略... 00:06:13.197 [nioEventLoopGroup-2-1] INFO club.throwable.protocol.TestProtocolClient - 接收到来自服务端的响应消息,消息内容:{"attachments":{},"errorCode":200,"magicNumber":10086,"message":"Success","messageType":"RESPONSE","payload":""doge say hello!"","serialNumber":"65f01b8e89bb479b8a36a60bd6519617","version":1}

可见RPC调用成功。

小结

编写RPCServer端技巧在于处理目标方法和宿主类的查找,在转换方法参数的时候,需要考虑简化处理和提高效率,剩下的就是做好异常处理和模块封装。限于篇幅,后面会先分析Client端的处理,再分析心跳处理、服务端优化、甚至是对接注册中心等等,在NettySpringBoot等优秀框架的加持下编写一个RPC框架其实并不困难,困难的是性能优化和生态圈的支持。

Demo项目地址:

(本文完 c-1-d e-a-20200115)

本文转载自博客园,原文链接:https://www.cnblogs.com/throwable/p/12194713.html

全部评论: 0

    我有话说: