18°

系统设计(2)

基于之前写的文章:https://my.oschina.net/u/257801/blog/3080141;在公司业务迭代过程中,该系统也重构了多个版本;就此,对系统的演变进行记录;

之前的流程很简单:

通过入参路由到具体的flow(有多个flow,每个flow由不同的handler组成)
               |
               |
               |
执行flow中的execute方法(flow负责调度每个handler,其实就是循环调用每个handler的process方法)
               |
               |
               |
执行完handler的方法后,将结果以mq的形势调用上游系统,当然如果是同步返回的话,就如同责任链模式,直接往回返就行了;

但随着业务的发展,对handler有不同维度的新增

public enum HandlerType {
    /**
     * 当handler节点是异步请求外部系统,需接收mq返回的结果,根据结果判断接入到流程的节点被认为是接入节点
     */
    ACCESS("接入节点", 1),
    /**
     * 在每个flow初始化时定义的handler节点
     */
    PROCESS("流程节点", 2);
    private String desc;
    private int code;
HandlerType(String desc, int code) {
    this.desc = desc;
    this.code = code;
}

}

public enum FeatureType {
    NORMAL("普通节点", 1), // 根据初始化的顺序已经指定上游节点和下游节点
    OVER("结束节点", 2), // 指定节点是结束节点,如需异步请求外部系统节点,等待mq消息返回;或者最后一个节点
    CONDITION("条件节点", 3); // 需根据该功能节点产生的数据动态判断下个节点,或者是不是结束节点

    private String desc;
    private int code;

    FeatureType(String desc, int code) {
        this.desc = desc;
        this.code = code;
    }
}

我们为了不影响现有的功能handler节点同时,对每个handler新增静态的和动态的特性;

  • 静态特性:当前handler是属于普通节点/结束节点/条件节点;
  • 动态特性:当前节点是条件节点时,执行条件语句,获取到下个handler节点的beanName;或者执行条件语句后,当前节点变成结束节点;

这时我们对每个功能节点handler进行了包装,由一个DefaultHandlerContext进行封装;

同时之前设计的flow是通过while循环调度每个handler执行的顺序,这种设计比较单一,扩展短板比较明显;我们对每个DefaultHandlerContext新增了两个节点,分别是prev和next节点;

普通节点即可指定上游节点和下游节点,结束节点没有下游节点,条件节点:通过执行完条件语句判断下一个节点beanName;在每个flow种包含属于自身的handlerPipeLine,并初始化;

// 默认初始化pipeLine,每个flow都有相同的start、terminate节点       
HandlerContextPipeLine pipeLine = new HandlerContextPipeLine(new DefaultHandlerContext(start), new DefaultHandlerContext(terminate, FeatureType.OVER));

// 条件节点,通过handler结果获取下个节点的beanName private Function<Boolean, String> inTransitFun = (existOrder) -> existOrder ? "preMqSend" : "fraudQuery";

    pipeLine
            .add(new DefaultHandlerContext&lt;&gt;(HandlerUtil.getHandler("inTransitOrder"), inTransitFun)) // 条件节点, 条件语句inTransitFun
            .add(new DefaultHandlerContext&lt;&gt;(HandlerUtil.getHandler("fraudQuery"), FeatureType.OVER)) // 结束节点 —— 调用外部系统接口
            .add(new DefaultHandlerContext&lt;&gt;(HandlerUtil.getHandler("fraudReceiver"), fraudReceiverFun)) // 条件节点
            .add(new DefaultHandlerContext&lt;&gt;(HandlerUtil.getHandler("prePaymentQuery"), prePaymentFun))
            .add(new DefaultHandlerContext(HandlerUtil.getHandler("quotaMatch"))) // 普通节点
            .add(new DefaultHandlerContext&lt;&gt;(HandlerUtil.getHandler("discriOver"), discrFun))
            .add(new DefaultHandlerContext(HandlerUtil.getHandler("guaranteeQuery"), FeatureType.OVER))
            .add(new DefaultHandlerContext&lt;&gt;(HandlerUtil.getHandler("guaranteeReceive"), guaFun))
            .add(new DefaultHandlerContext(HandlerUtil.getHandler("preMqSend")));</code></pre> 

HandlerPipeLine:

import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;

public class HandlerContextPipeLine { // 当前flow流程包含的所有DefaultHandlerContext private Map<String, DefaultHandlerContext> contextMap = new HashMap<String, DefaultHandlerContext>() {{ // 包含NULL_OBJECT put(null, DefaultHandlerContext.NULL_OBJECT); }};

// 根据beanName查找DefaultHandlerContext
private Function&lt;String, DefaultHandlerContext&gt; handlerFun =
        (beanName) -&gt; contextMap.getOrDefault(beanName, DefaultHandlerContext.NULL_OBJECT);

// 流程中的开始节点
private DefaultHandlerContext head;
// 流程中的结束节点
private DefaultHandlerContext end;

/**
 * HandlerContextPipeLine 构造函数
 * @param head
 * @param end
 */
public HandlerContextPipeLine(DefaultHandlerContext head, DefaultHandlerContext end) {
    this.head = head;
    this.end = end;

    contextMap.put(head.getBeanName(), head);
    contextMap.put(end.getBeanName(), end);
    head.setNext(end);
    end.setPrev(head);
}

/**
 * 新增节点时: 当前节点是普通节点时,设置当前节点的next节点为end
 * ----------上游节点是普通节点时,设置当前节点为上游节点的next节点
 *
 * @param handlerContext
 * @return
 */
public HandlerContextPipeLine add(DefaultHandlerContext handlerContext) {
    DefaultHandlerContext preContext = end.getPrev();
    end.setPrev(handlerContext);
    if (handlerContext.getFeatureType() == FeatureType.NORMAL) {
        handlerContext.setNext(end);
    }

    handlerContext.setPrev(preContext);
    if (preContext.getFeatureType() == FeatureType.NORMAL) {
        preContext.setNext(handlerContext);
    }

    contextMap.put(handlerContext.getBeanName(), handlerContext);
    return this;
}

/**
 * 流程的开始节点
 *
 * @param reqParam
 * @param dataUtil
 */
public void start(ReqParam reqParam, DataUtil dataUtil) {
    head.execute(reqParam, dataUtil, handlerFun);
}

/**
 * 根据beanName查询获得DefaultHandlerContext
 *
 * @param beanName
 * @param reqParam
 * @param dataUtil
 */
public void access(String beanName, ReqParam reqParam, DataUtil dataUtil) {
    DefaultHandlerContext accessHandler = handlerFun.apply(beanName);
    accessHandler.execute(reqParam, dataUtil, handlerFun);
}

/**
 * mq接收数据接入流程
 *
 * @param beanName access name
 * @param func     要执行的func
 * @param reqParam
 * @param dataUtil
 */
public void access(String beanName, Function&lt;ReqParam, String&gt; func, ReqParam reqParam, DataUtil dataUtil) {
    DefaultHandlerContext&lt;ReqParam&gt; handlerContext = new DefaultHandlerContext&lt;&gt;(func, beanName);
    handlerContext.execute(reqParam, dataUtil, handlerFun);
}

}

DefaultHndlerContext类:

@Data
@Slf4j
public abstract class AbstractHandlerContext<T> implements ProcessHandler<T> {
  ...
    /**
     * 执行DefaultHandlerContext
     * 1.当前DefaultHandlerContext是条件节点时,执行功能handler节点;以返回结果作为入参执行条件语句获得next handler的beanName;通过beanName获得next DefaultHandlerContext
     * 2.当前不是条件节点时,直接执行功能节点的方法即可
     * 3.如果下一个节点是NULL_OBJECT时,则当前节点为结束节点,返回即可
     * 4.如果next节点不是结束节点,则执行next节点的execute方法
     * @param reqParam
     * @param dataUtil
     * @param handlerFun
     */
    void execute(ReqParam reqParam, DataUtil dataUtil, Function<String, DefaultHandlerContext> handlerFun) {
        if (FeatureType.CONDITION == featureType) {
            next = handlerFun.apply(conditionFun.apply(this.process(reqParam)));
        } else {
            this.process(reqParam);
        }
    // 判断该handler处理完是否终结
    if (next == NULL_OBJECT) {
        return;
    }

    this.next.execute(reqParam, dataUtil, handlerFun);
}

... }

以上解释了初始化每个flow的所有DefaultContextHandler;并为每个功能handler wrapper增添了静态/动态特性;同时它的调度模式也做了相应的调优,对扩展较之前会友好;

接入节点:

接入节点换一种表达方式,可以称为虚拟节点,它承接着mq异步消息的接收,并对结果判断接入到流程中的具体哪个节点,同时根据不同的flow,处理的逻辑还不尽相同;由此可知道,接入的逻辑判断和具体的flow也有关联;因此每个flow对接入都有自己的逻辑;但可把默认的接入写在基类中,个性化的,子类自己复写就好;

    Map<String, Map<String, Function>> accessMap = new HashMap<String, Map<String, Function>>() {{
        // 反欺诈
        put("fraudAccess",
                new HashMap<String, Function>() {{
                    put("pass", (obj) -> "fraudReceiver");
                }});
    // 担保策略
    put("guaPloyAccess",
            new HashMap&lt;String, Function&gt;() {{
                put("pass", (obj) -&gt; "guaranteeReceive");
            }});

    // 资方策略
    put("tenantAccess",
            new HashMap&lt;String, Function&gt;() {{
                put("pass", (obj) -&gt; "guaranteeQuery");
            }});

    // 额度匹配(决策、电审、提额、担保)
    put("matchAccess",
            new HashMap&lt;String, Function&gt;() {{
                put("pass", (obj) -&gt; "quotaMatch");
                put("reject", (obj) -&gt; "allSceneFinished");
                put("cancel", (obj) -&gt; "allSceneFinished");
            }});
}};

/**
 * 从mq接入流程内
 *
 * @param access
 * @param reqParam
 * @param type
 */
@Override
public void access(FlowAccess access, String type, ReqParam reqParam) {
    String funcKey = access.getBeanName();
    Function func = null;
    if (accessMap.containsKey(funcKey)) {
        func = accessMap.get(funcKey).get(type);
    } else if (access instanceof MatchQuotaAccess) {
        func = accessMap.get("matchAccess").get(type);
    }

    Assert.notNull(func, access.getBeanName() + "func should not null");
    pipeLine.access(funcKey, func, reqParam, getDataUtil());
}</code></pre> 

接入的类调用自身所属于的flow流程内的access方法,根据type类型pass/reject/cancel获取不同的条件语句;

调用pipeline中的access方法;组装接入节点DefaultHandlerContext;调用DefaultHandlerContext的execute方法;

因为接入节点内的handler是null的,所以只需要执行接入节点中的条件语句获取到next节点的beanName,并通过beanName获取到该flow中的DefaultHandlerContext;由此接入流程内;

以上则是系统的现状,基本能满足当前业务,能支持一定程度上的扩展;

-------------------------------------------------------------- 问题点 ----------------------------------------------------------

在重构的过程中遇到的问题,因为每个功能handler bean内都有个属性beanName;是通过实现BeanPostProcess方式进行创建代理注入的;

    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
        if (bean.getClass().isAnnotationPresent(Handler.class)) {
            Object proxy = new Proxy(bean, beanName).proxy();
            HandlerUtil.processHandlerMap.put(beanName, (ProcessHandler) proxy);
            return proxy;
        }
    if (bean.getClass().isAnnotationPresent(Access.class)) {
        Object proxy = new Proxy(bean, beanName).proxy();
        HandlerUtil.accessMap.put(beanName, (FlowAccess) proxy);
        return proxy;
    }
    return bean;
}</code></pre> 

但在baseFlow实现InitializingBean接口中初始化HandlerPipeLine时,需通过beanName获取相应的handler代理类进行组装DefaultHandlerContext;所以在此之前需获取所有实现ProcessHandler接口的功能类(bean, handlerProxy)集合;

在BaseFlow中通过@Resource注入了head和terminated功能节点构造HandlerPipeLine;所以会优先实例化,才能产生相应的handler代理类,但并不能保证当前全量的ProcessHandler接口实现类都已经实例化;当某个handler还没有实例化时,会导致在组装DefaultHandlerContext时,handler获取的是null;整个链条是无效的;

现在只是偷巧的解决;通过@Autowired方式去注入head和terminated;因为用的是类型注入,所以上下文会保证执行baseflow中的afterPropertiesSet方法时,将所有的processHandler接口实现类都实例化;但并没有从spring加载的链路上去做根本解决;

2.在DefaultHandlerContext定义的泛型是和所包含的handler一致的;导致在扩展的时候出现一定的局限性,得后续想比较好的方式消除;

本文由【随】发布于开源中国,原文链接:https://my.oschina.net/u/257801/blog/3160108

全部评论: 0

    我有话说: