智能化状态管理:自动状态流转处理模块

目录

基本背景介绍

具体实现

基本数据准备

基本数据表

状态转换常量

状态转换注解

任务处理模版

各任务实现逻辑

开启比对任务进行处理

降噪字段处理任务处理

开启业务数据比对处理

业务数据比对处理

开始核对数据生成最终报告处理

核对数据生成最终报告处理

状态逻辑分发器

定时任务定义

总结


自动流转一般都是一个很大的处理系统,其中包含的处理内容是很庞大的,就这样一个大型系统的开发思路,我后面会抽空来分享一篇全局的处理和调度实现方式,本次仅针对一般如果我们需要对一些业务流程需要进行自动化处理思维的给出一个样例的自动状态流转处理模块的代码示例。如果有写的不对的地方,请留言指正!

基本背景介绍

假设我们需要一个自动的数据比对任务处理流程,基本的状态流转如下:

其中,任务创建、任务启动、任务暂停这几项开放接口交由用户手动决策,其他流程则按指定的方式直接进行自动化处理。大致模版如上,实际业务可按实际处理方式进行替换。

具体实现

基本数据准备

基本数据表

启动以上任务以及实际实现处理上,具体表暂时定义如下:

CREATE TABLE `compare_task` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键',
  `compare_task_name` varchar(512) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '比对任务名称',
  `replay_task_id` bigint(20) unsigned DEFAULT NULL COMMENT '回放任务ID',
  `status` int(10) unsigned DEFAULT NULL COMMENT '比对状态:-1-取消执行,0-任务创建;1-任务启动,2-降噪字段处理中,3-降噪字段处理完成,4-业务数据比对处理中-比对成功,5-业务数据比对处理完成,6-核对数据生成最终报告处理中,7-核对数据生成最终报告处理完成,8-比对失败',
  `failure_position` int(10) unsigned DEFAULT NULL COMMENT '中间失败停留状态记录',
  `failure_reason` varchar(200) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '失败原因',
  `noise_result` varchar(500) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '噪声数据结果记录',
  `compare_result` varchar(500) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '业务结果比对结果记录',
  `final_result` varchar(500) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '最终报告记录',
  `valid` int(11) DEFAULT '0' COMMENT ' 0当前在线 1已删除',
  `last_ping_time` int(11) NOT NULL DEFAULT '0' COMMENT '执行节点最后一次心跳时间',
  `version` int(11) NOT NULL DEFAULT '1' COMMENT '版本',
  `cname` varchar(32) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '创建人',
  `uname` varchar(32) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '更新人',
  `ctime` bigint(20) DEFAULT NULL COMMENT '创建时间',
  `utime` bigint(20) DEFAULT NULL COMMENT '修改时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='比对任务'

状态转换常量

为了实现以上基本的数据,我们先提供一个具体的状态转换常量表,具体代码如下:

/**
 * @author yanfengzhang
 * @description 比对相关常量
 * @date 2022/5/1  23:29
 */
public class CompareCons {
    /**
     * 比对基本状态信息
     */
    public static class Status {
        /**
         * 比对任务取消执行
         */
        public static final int CANCEL = -1;
        /**
         * 比对任务创建
         */
        public static final int CREATE = 0;
        /**
         * 比对任务启动
         */
        public static final int START = 1;
        /**
         * 降噪字段处理中
         */
        public static final int NOISE_REDUCING = 2;
        /**
         * 降噪字段处理完成
         */
        public static final int NOISE_REDUCED = 3;
        /**
         * 业务数据比对处理中
         */
        public static final int BIZ_COMPARING = 4;
        /**
         * 业务数据比对处理完成
         */
        public static final int BIZ_COMPARED = 5;
        /**
         * 核对数据生成最终报告处理中
         */
        public static final int GENERATE_REPORTING = 6;
        /**
         * 核对数据生成最终报告处理完成
         */
        public static final int GENERATE_REPORTED = 7;
        /**
         * 比对失败
         */
        public static final int FAILED = 8;
    }
}

状态转换注解

自动化根据状态进行统一管理,故各个处理器实际上需要表明自己需要处理的状态行为,具体注解定义如下:

/**
 * @author yanfengzhang
 * @description
 * @date 2022/5/1  23:33
 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Status {
    int status();
}

任务处理模版

基本任务处理的基类,包含通用逻辑:

  • 将提交的处理交由线程池管理。
  • 同时定义一个心跳关联到外部的Processor,当processor运行结束时,结束心跳。每个ping关联一个单独的ScheduledExecutorService,结束ping时直接shutdown线程池。
  • 每个processor在开始前需要有一定逻辑更新task的状态,否则可能导致任务被重复提交。

具体的代码实现逻辑如下:

/**
 * @author yanfengzhang
 * @description 任务处理的基类,包含任务处理的通用逻辑;
 * 核心逻辑:
 * 被提交的processor交由线程池执行;
 * 每个processor关联一个ping对象,ping实现心跳逻辑;
 * 每个processor在开始前需要有一定逻辑更新task的状态,否则可能导致任务被重复提交。
 * @date 2022/5/1  23:46
 */
public abstract class AbstractProcessor implements Runnable {

    private final Ping ping;

    private final CompareTaskPo value;

    private final Semaphore semaphore = new Semaphore(0);

    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractProcessor.class);

    private static final ThreadPoolExecutor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(8,
            16, 5, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new ThreadFactory() {
        private AtomicInteger threadCount = new AtomicInteger(1);

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "thread-processor-" + threadCount.getAndIncrement());
        }
    });

    protected AbstractProcessor(CompareTaskPo value) {
        ping = new Ping(this);
        this.value = value;
    }

    /**
     * 心跳。
     * 关联到外部的Processor,当processor运行结束时,结束心跳。
     * 每个ping关联一个单独的ScheduledExecutorService,结束ping时直接shutdown线程池。
     */
    class Ping implements Runnable {
        private WeakReference<AbstractProcessor> weakReference;
        private ReferenceQueue<AbstractProcessor> referenceQueue = new ReferenceQueue<>();
        private ScheduledExecutorService scheduleAtFixedRate = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "task-ping"));
        private CompareTaskMapper compareTaskMapper;

        Ping(AbstractProcessor processor) {
            weakReference = new WeakReference<>(processor, referenceQueue);
            compareTaskMapper = BeanFactoryUtil.getBean(CompareTaskMapper.class);
        }

        void ping() {
            if (referenceQueue.poll() != null) {
                /*兜底:当其关联的processor被垃圾回收后,结束心跳*/
                LOGGER.warn("【任务处理心跳】compareTaskId:{}的心跳被动结束", value.getId());
                scheduleAtFixedRate.shutdown();
            } else {
                try {
                    int curTime = (int) (System.currentTimeMillis() / 1000);
                    compareTaskMapper.updateLastPingTime(value.getId(), curTime);
                    LOGGER.warn("【任务处理心跳】compareTaskId:{}心跳正常,当前时间:{} processor:{}", value.getId(), curTime, weakReference.get());
                } catch (Exception e) {
                    LOGGER.error("【任务处理心跳】compareTaskId:{}心跳时间更新异常,exception:", value.getId(), e);
                }
            }
        }

        @Override
        public void run() {
            ping();
        }

        void start() {
            LOGGER.warn("【任务处理心跳】compareTaskId:{}心跳正常开启", value.getId());
            scheduleAtFixedRate.scheduleWithFixedDelay(this, 2, 2, TimeUnit.SECONDS);
        }

        void close() {
            LOGGER.warn("【任务处理心跳】compareTaskId:{}心跳正常结束", value.getId());
            scheduleAtFixedRate.shutdown();
        }
    }

    protected abstract boolean actualProcess(CompareTaskPo value);

    protected abstract void end(CompareTaskPo value);

    private void done() {
        ping.close();
        semaphore.release(1);
    }

    public final void process() {
        THREAD_POOL_EXECUTOR.submit(this);
    }

    public final boolean allowRecycle() {
        return semaphore.tryAcquire();
    }

    @Override
    public final void run() {
        this.ping.start();
        try {
            /*实际状态下任务处理内容成功后进行状态流转*/
            if (actualProcess(value)) {
                end(value);
            }
        } finally {
            done();
        }
    }
}

各任务实现逻辑

主要的任务处理流程如下几个重要处理器实现,其中状态可以由用户自动暂停更新状态,更新状态后相关流程被中断,该部分实现放置在定时任务中进行处理,具体见后面的代码,同时如果各任务中间有处理失败的内容,我们也会中断流程并记录具体失败的原因是什么好方便后续问题的定位。

开启比对任务进行处理

主要功能:用户创建比对任务没有问题后,点击任务启动,自动化处理流程开始,对相关业务数据进行分析验证,然后更新相关的状态,具体样例实现如下:

/**
 * @author yanfengzhang
 * @description 开启比对任务进行处理
 * @date 2022/5/2  00:05
 */
@Status(status = CompareCons.Status.START)
@Slf4j
public class StartCompareProcessor extends AbstractProcessor {

    private CompareTaskMapper compareTaskMapper;

    public StartCompareProcessor(CompareTaskPo value) {
        super(value);
        compareTaskMapper = BeanFactoryUtil.getBean(CompareTaskMapper.class);
    }

    @Override
    public boolean actualProcess(CompareTaskPo value) {
        log.info("开启比对任务进行处理:当前处理id为{}", value.getId());
        CompareTaskPo compareTaskPo = compareTaskMapper.selectById(value.getId());
        /*1检查数据正确性:对应的回放信息是否满足要求,如果不满足则直接中止比对任务*/
        return startCompareProcessorCheck(compareTaskPo);
    }

    /**
     * 检查数据正确性:对应的回放信息是否满足要求,如果不满足则直接中止比对任务
     * 如果没有问题,则认为已经成功
     *
     * @param compareTaskPo 比对任务信息
     * @return true-基本检查通过;false-检查不通过
     */
    private boolean startCompareProcessorCheck(CompareTaskPo compareTaskPo) {
        return true;
    }

    @Override
    public void end(CompareTaskPo value) {
        log.info("开启比对任务进行处理完成,待更新状态:当前处理id为{}", value.getId());
        try {
            /*更新状态为"降噪字段处理中"*/
            compareTaskMapper.updateStatus(value.getId(), CompareCons.Status.NOISE_REDUCING);
        } catch (Exception e) {
            log.info("开启比对任务进行处理完成异常异常异常异常:当前处理id为{},状态已更新为{}", value.getId(), CompareCons.Status.NOISE_REDUCING);
        }

        log.info("开启比对任务进行处理完成:当前处理id为{},状态已更新为{}", value.getId(), CompareCons.Status.NOISE_REDUCING);
    }
}

降噪字段处理任务处理

主要功能:假设我们通过比对两次master处理回放来分析得出一些噪声处理信息,比对处理噪声的主要代码如下:

/**
 * @author yanfengzhang
 * @description 降噪字段处理任务处理
 * @date 2022/5/2  00:29
 */
@Status(status = CompareCons.Status.NOISE_REDUCING)
@Slf4j
public class NoiseReduceProcessor extends AbstractProcessor {

    private CompareTaskMapper compareTaskMapper;

    public NoiseReduceProcessor(CompareTaskPo value) {
        super(value);
        compareTaskMapper = BeanFactoryUtil.getBean(CompareTaskMapper.class);
    }

    @Override
    public boolean actualProcess(CompareTaskPo value) {
        log.info("降噪字段处理任务处理:当前处理id为{}", value.getId());
        /*1.根据回放任务id来查看对应回放记录中的数据信息*/
        CompareTaskPo compareTaskPo = compareTaskMapper.selectById(value.getId());
        if (Objects.isNull(compareTaskPo)) {
            log.error("降噪字段处理任务处理异常:比对任务{}并不存在,请进行核对!", value.getId());
            compareTaskMapper.updateStatusAndFailure(value.getId(), CompareCons.Status.FAILED,
                    CompareCons.Status.NOISE_REDUCING, "比对任务并不存在!");
            return false;
        }

        ReplayTaskApplicationService replayTaskApplicationService = BeanFactoryUtil.getBean(ReplayTaskApplicationService.class);
        ReplayDataResultValue replayDataResultValue = replayTaskApplicationService.getBdfPathListByReplayTaskId(compareTaskPo.getReplayTaskId());
        if (Objects.isNull(replayDataResultValue) || StringUtils.isBlank(replayDataResultValue.getMasterFirstBdfPath())
                || StringUtils.isBlank(replayDataResultValue.getMasterSecondBdfPath())) {
            log.error("降噪字段处理任务处理异常:比对任务{}对应回放记录id相关数据文件数据并不存在,请进行核对!", value.getId());
            compareTaskMapper.updateStatusAndFailure(value.getId(), CompareCons.Status.FAILED,
                    CompareCons.Status.NOISE_REDUCING, "对应回放记录id相关数据文件数据并不存在或不完整!");
            return false;
        }

//        String masterFirstBdfPath = "/Users/yanfengzhang/Downloads/replay_data_10.dat.mfbdf.rpresult";
//        String masterSecondBdfPath = "/Users/yanfengzhang/Downloads/replay_data_10.dat.msbdf.rpresult";
        String masterFirstBdfPath = replayDataResultValue.getMasterFirstBdfPath();
        String masterSecondBdfPath = replayDataResultValue.getMasterSecondBdfPath();
        /*2.检查回放记录中两次master文件对应的条数是否一致*/
        Long masterFirstBdfLines = null;
        Long masterSecondBdfLines = null;
        try {
            masterFirstBdfLines = Files.lines(Paths.get(masterFirstBdfPath)).count();
            masterSecondBdfLines = Files.lines(Paths.get(masterSecondBdfPath)).count();
        } catch (Exception e) {
            log.error("降噪字段处理任务处理异常:比对任务{}对应回放记录中两次master回放文件读取异常!", value.getId());
            compareTaskMapper.updateStatusAndFailure(value.getId(), CompareCons.Status.FAILED, CompareCons.Status.NOISE_REDUCING, "两次master回放文件读取异常");
            return false;
        }
        if (!Objects.equals(masterFirstBdfLines, masterSecondBdfLines)) {
            log.error("降噪字段处理任务处理异常:比对任务{}对应回放记录中两次master文件数据条数并不一致!", value.getId());
            compareTaskMapper.updateStatusAndFailure(value.getId(), CompareCons.Status.FAILED, CompareCons.Status.NOISE_REDUCING, "两次master文件数据条数并不一致");
            return false;
        }

        /*3.文件各行进行数据对比并进行记录*/
        try {
            String compareMasterFile = "/Users/yanfengzhang/Downloads/" + value.getCompareTaskName() + "_" + value.getId() + "_降噪比对数据.txt";
            for (int i = 1; i < masterFirstBdfLines + 1; i++) {
                String masterFirstBdfStr = FileUtils.readAppointedLineNumber(masterFirstBdfPath, i);
                String masterSecondBdfStr = FileUtils.readAppointedLineNumber(masterSecondBdfPath, i);
                JsonNode diffInfo = JsonDealUtils.getCompareJsonResult(masterFirstBdfStr, masterSecondBdfStr);
                FileUtils.writeContent(compareMasterFile, diffInfo.toString());
            }
            compareTaskMapper.updateNoiseResult(value.getId(), compareMasterFile);
        } catch (Exception e) {
            log.error("降噪字段处理任务处理异常:比对任务{}生成噪声数据异常!", value.getId());
            compareTaskMapper.updateStatusAndFailure(value.getId(), CompareCons.Status.FAILED, CompareCons.Status.NOISE_REDUCING, "生成噪声数据异常");
            return false;
        }

        /*4.执行完毕无异常,进行状态变更*/
        return true;
    }

    @Override
    public void end(CompareTaskPo value) {
        log.info("降噪字段处理任务处理完成,待更新状态:当前处理id为{}", value.getId());
        /*更新状态为"降噪字段处理完成"*/
        compareTaskMapper.updateStatus(value.getId(), CompareCons.Status.NOISE_REDUCED);
        log.info("降噪字段处理任务处理完成:当前处理id为{},状态已更新为{}", value.getId(), CompareCons.Status.NOISE_REDUCED);
    }
}

开启业务数据比对处理

主要功能:没有其他检查数据内容的话,可以直接进行状态转换,我这边暂时忽略检查!

/**
 * @author yanfengzhang
 * @description 开启业务数据比对处理
 * @date 2022/5/2  00:36
 */
@Status(status = CompareCons.Status.NOISE_REDUCED)
@Slf4j
public class StartBizCompareProcessor extends AbstractProcessor {

    private CompareTaskMapper compareTaskMapper;

    public StartBizCompareProcessor(CompareTaskPo value) {
        super(value);
        compareTaskMapper = BeanFactoryUtil.getBean(CompareTaskMapper.class);
    }

    @Override
    public boolean actualProcess(CompareTaskPo value) {
        log.info("开启业务数据比对处理:当前处理id为{}", value.getId());
        /*该状态下当前不做任何处理,基本没有检查的相关启动条件*/
        return true;
    }

    @Override
    public void end(CompareTaskPo value) {
        log.info("开启业务数据比对处理完成,待更新状态:当前处理id为{}", value.getId());
        /*更新状态为"业务数据比对处理中"*/
        compareTaskMapper.updateStatus(value.getId(), CompareCons.Status.BIZ_COMPARING);
        log.info("开启业务数据比对处理完成:当前处理id为{},状态已更新为{}", value.getId(), CompareCons.Status.BIZ_COMPARING);
    }
}

业务数据比对处理

主要功能:对本次业务代码改动和master代码进行对比来分析对应的内容处理变化统计,具体代码如下:

/**
 * @author yanfengzhang
 * @description 业务数据比对处理
 * @date 2022/5/2  00:53
 */
@Status(status = CompareCons.Status.BIZ_COMPARING)
@Slf4j
public class BizCompareProcessor extends AbstractProcessor {

    private CompareTaskMapper compareTaskMapper;

    public BizCompareProcessor(CompareTaskPo value) {
        super(value);
        compareTaskMapper = BeanFactoryUtil.getBean(CompareTaskMapper.class);
    }

    @Override
    public boolean actualProcess(CompareTaskPo value) {
        log.info("开启业务数据比对处理处理:当前处理id为{}", value.getId());
        CompareTaskPo compareTaskPo = compareTaskMapper.selectById(value.getId());
        /*1.根据回放任务id来查看对应回放记录中的数据信息*/
        if (Objects.isNull(compareTaskPo)) {
            log.error("开启业务数据比对处理处理异常:比对任务{}并不存在,请进行核对!", value.getId());
            compareTaskMapper.updateStatusAndFailure(value.getId(), CompareCons.Status.FAILED,
                    CompareCons.Status.NOISE_REDUCING, "比对任务并不存在!");
            return false;
        }

        ReplayTaskApplicationService replayTaskApplicationService = BeanFactoryUtil.getBean(ReplayTaskApplicationService.class);
        ReplayDataResultValue replayDataResultValue = replayTaskApplicationService.getBdfPathListByReplayTaskId(compareTaskPo.getReplayTaskId());
        if (Objects.isNull(replayDataResultValue) || StringUtils.isBlank(replayDataResultValue.getMasterFirstBdfPath())
                || StringUtils.isBlank(replayDataResultValue.getFeatureBdfPath())) {
            log.error("开启业务数据比对处理处理异常:比对任务{}对应回放记录id相关数据文件数据并不存在,请进行核对!", value.getId());
            compareTaskMapper.updateStatusAndFailure(value.getId(), CompareCons.Status.FAILED,
                    CompareCons.Status.NOISE_REDUCING, "对应回放记录id相关数据文件数据并不存在或不完整!");
            return false;
        }

//        String masterFirstBdfPath = "/Users/yanfengzhang/Downloads/replay_data_10.dat.mfbdf.rpresult";
//        String featureBdfPath = "/Users/yanfengzhang/Downloads/replay_data_10.dat.fbdf.rpresult";
        String masterFirstBdfPath = replayDataResultValue.getMasterFirstBdfPath();
        String featureBdfPath = replayDataResultValue.getFeatureBdfPath();
        /*2.检查回放记录中master文件和dev文件对应的条数是否一致*/
        Long masterFirstBdfLines = null;
        Long featureBdfLines = null;
        try {
            masterFirstBdfLines = Files.lines(Paths.get(masterFirstBdfPath)).count();
            featureBdfLines = Files.lines(Paths.get(featureBdfPath)).count();
        } catch (Exception e) {
            log.error("比对任务{}对应回放记录中master回放文件或dev回放文件读取异常!", value.getId());
            compareTaskMapper.updateStatusAndFailure(value.getId(), CompareCons.Status.FAILED, CompareCons.Status.BIZ_COMPARING, "master回放文件或dev回放文件读取异常");
            return false;
        }
        if (!Objects.equals(masterFirstBdfLines, featureBdfLines)) {
            log.error("比对任务{}对应回放记录中master回放文件和dev回放文件数据条数并不一致!", value.getId());
            compareTaskMapper.updateStatusAndFailure(value.getId(), CompareCons.Status.FAILED, CompareCons.Status.BIZ_COMPARING, "master回放文件和dev回放文件数据条数并不一致");
            return false;
        }

        /*3.文件各行进行数据对比并进行记录*/
        try {
            String compareBizFile = "/Users/yanfengzhang/Downloads/" + value.getCompareTaskName() + "_" + value.getId() + "_业务比对数据.txt";
            for (int i = 1; i < masterFirstBdfLines + 1; i++) {
                String masterFirstBdfStr = FileUtils.readAppointedLineNumber(masterFirstBdfPath, i);
                String featureBdfStr = FileUtils.readAppointedLineNumber(featureBdfPath, i);
                JsonNode diffInfo = JsonDealUtils.getCompareJsonResult(masterFirstBdfStr, featureBdfStr);
                FileUtils.writeContent(compareBizFile, diffInfo.toString());
            }
            compareTaskMapper.updateCompareResult(value.getId(), compareBizFile);
        } catch (Exception e) {
            log.error("比对任务{}生成业务比对数据异常!", value.getId());
            compareTaskMapper.updateStatusAndFailure(value.getId(), CompareCons.Status.FAILED, CompareCons.Status.BIZ_COMPARING, "生成业务比对数据异常");
            return false;
        }

        /*4.执行完毕无异常,进行状态变更*/
        return true;
    }

    @Override
    public void end(CompareTaskPo value) {
        log.info("开启业务数据比对处理处理完成,待更新状态:当前处理id为{}", value.getId());
        /*更新状态为"业务数据比对处理完成"*/
        compareTaskMapper.updateStatus(value.getId(), CompareCons.Status.BIZ_COMPARED);
        log.info("开启业务数据比对处理处理完成:当前处理id为{},状态已更新为{}", value.getId(), CompareCons.Status.BIZ_COMPARED);
    }
}

开始核对数据生成最终报告处理

主要功能:没有其他检查数据内容的话,可以直接进行状态转换,我这边暂时忽略检查!

/**
 * @author yanfengzhang
 * @description 开始核对数据生成最终报告处理
 * @date 2022/5/2  00:59
 */
@Status(status = CompareCons.Status.BIZ_COMPARED)
@Slf4j
public class StartGenerateReportProcessor extends AbstractProcessor {

    private CompareTaskMapper compareTaskMapper;

    public StartGenerateReportProcessor(CompareTaskPo value) {
        super(value);
        compareTaskMapper = BeanFactoryUtil.getBean(CompareTaskMapper.class);
    }

    @Override
    public boolean actualProcess(CompareTaskPo value) {
        log.info("开始核对数据生成最终报告处理:当前处理id为{}", value.getId());
        /*该状态下当前不做任何处理,基本没有检查的相关启动条件(检查相关文件是否存在)*/
        return true;
    }

    @Override
    public void end(CompareTaskPo value) {
        log.info("开始核对数据生成最终报告处理完成,待更新状态:当前处理id为{}", value.getId());
        /*更新状态为"核对数据生成最终报告处理中"*/
        compareTaskMapper.updateStatus(value.getId(), CompareCons.Status.GENERATE_REPORTING);
        log.info("开始核对数据生成最终报告处理完成:当前处理id为{},状态已更新为{}", value.getId(), CompareCons.Status.GENERATE_REPORTING);
    }
}

核对数据生成最终报告处理

主要功能:结合前面处理生成的数据进行最终报告的比对任务生成报告,具体处理流程如下:

/**
 * @author yanfengzhang
 * @description 核对数据生成最终报告处理
 * @date 2022/5/2  01:20
 */
@Status(status = CompareCons.Status.GENERATE_REPORTING)
@Slf4j
public class GenerateReportProcessor extends AbstractProcessor {

    private CompareTaskMapper compareTaskMapper;

    public GenerateReportProcessor(CompareTaskPo value) {
        super(value);
        compareTaskMapper = BeanFactoryUtil.getBean(CompareTaskMapper.class);
    }

    @Override
    public boolean actualProcess(CompareTaskPo value) {
        log.info("开始核对数据生成最终报告处理:当前处理id为{}", value.getId());
        CompareTaskPo compareTaskPo = compareTaskMapper.selectById(value.getId());
        if (Objects.isNull(compareTaskPo)) {
            log.error("开始核对数据生成最终报告处理异常:比对任务{}并不存在,请进行核对!", value.getId());
            compareTaskMapper.updateStatusAndFailure(value.getId(), CompareCons.Status.FAILED,
                    CompareCons.Status.NOISE_REDUCING, "比对任务并不存在!");
            return false;
        }
        /*1.根据回放任务id来查看对应回放记录中的数据信息*/
        String compareBizResultPath = compareTaskPo.getCompareResult();
        String noiseResultPath = compareTaskPo.getNoiseResult();
        /*2.检查回放记录中master文件和dev文件对应的条数是否一致*/
        Long compareBizResultLines = null;
        Long noiseResultLines = null;
        try {
            compareBizResultLines = Files.lines(Paths.get(compareBizResultPath)).count();
            noiseResultLines = Files.lines(Paths.get(noiseResultPath)).count();
        } catch (Exception e) {
            log.error("比对任务{}对应核对数据生成最终报告读取文件异常!", value.getId());
            compareTaskMapper.updateStatusAndFailure(value.getId(), CompareCons.Status.FAILED, CompareCons.Status.GENERATE_REPORTING, "对应核对数据生成最终报告读取文件异常");
            return false;
        }
        if (!Objects.equals(compareBizResultLines, noiseResultLines)) {
            log.error("比对任务{}对应核对数据生成最终报告相关文件数据条数并不一致!", value.getId());
            compareTaskMapper.updateStatusAndFailure(value.getId(), CompareCons.Status.FAILED, CompareCons.Status.GENERATE_REPORTING, "对应核对数据生成最终报告相关文件数据条数并不一致");
            return false;
        }

        /*3.文件各行进行数据对比并进行记录*/
        try {
            String compareBizFile = "/Users/yanfengzhang/Downloads/" + value.getCompareTaskName() + "_" + value.getId() + "_最终结果报告.txt";
            for (int i = 1; i < compareBizResultLines + 1; i++) {
                String compareBizResultStr = FileUtils.readAppointedLineNumber(compareBizResultPath, i);
                String noiseResultStr = FileUtils.readAppointedLineNumber(noiseResultPath, i);
                List<CompareDataMeta> compareDataMetas = CompareDataResult.getCompareDataResult(noiseResultStr, compareBizResultStr);
                FileUtils.writeContent(compareBizFile, JSON.toJSONString(compareDataMetas));
            }
            compareTaskMapper.updateNoiseResult(value.getId(), compareBizFile);
        } catch (Exception e) {
            log.error("比对任务{}核对数据生成最终报告数据处理异常!", value.getId());
            compareTaskMapper.updateStatusAndFailure(value.getId(), CompareCons.Status.FAILED, CompareCons.Status.GENERATE_REPORTING, "核对数据生成最终报告数据处理异常");
            return false;
        }

        /*4.执行完毕无异常,进行状态变更*/
        return true;
    }

    @Override
    public void end(CompareTaskPo value) {
        log.info("开始核对数据生成最终报告处理完成,待更新状态:当前处理id为{}", value.getId());
        /*更新状态为"核对数据生成最终报告处理完成"*/
        compareTaskMapper.updateStatus(value.getId(), CompareCons.Status.GENERATE_REPORTED);
        log.info("开始核对数据生成最终报告处理完成:当前处理id为{},状态已更新为{}", value.getId(), CompareCons.Status.GENERATE_REPORTED);
    }
}

状态逻辑分发器

我们针对以上任务处理器,对实际业务处理进行分析并将其转发到相关的处理器上进行自动化处理,具体实现逻辑如下:

/**
 * @author yanfengzhang
 * @description 负责对task任务不同状态运行逻辑的分发。
 * @date 2022/5/2  01:44
 */
public class EventDispatcher {
    private static Map<Integer, Class> status2Processor = Maps.newHashMap();
    private static Set<AbstractProcessor> curProcessors = new HashSet<>();
    private static final Logger LOGGER = LoggerFactory.getLogger(EventDispatcher.class);

    static {
        Reflections reflections = new Reflections("com.sankuai.tsp.product.bsap.domain.compare.event.processor.impl");
        Set<Class<?>> classSet = reflections.getTypesAnnotatedWith(Status.class);
        for (Class<?> cl : classSet) {
            Annotation[] annotations = cl.getAnnotations();
            for (Annotation a : annotations) {
                if (a instanceof Status) {
                    Status status = (Status) a;
                    status2Processor.put(status.status(), cl);
                }
            }
        }
    }

    /**
     * dispatch方法目前只有cronServer线程调用,
     * 但是为了防止出现多线程调用导致的curProcessors被并发修改问题,所以用synchronized同步
     *
     * @param status        当前任务状态
     * @param compareTaskPo 比对任务消息数据
     * @return
     */
    public static synchronized boolean dispatch(int status, CompareTaskPo compareTaskPo) {
        AbstractProcessor processor = getInstance(status, compareTaskPo);
        if (processor != null) {
            curProcessors.add(processor);
            processor.process();
            return true;
        }
        return false;
    }

    private static AbstractProcessor getInstance(int status, CompareTaskPo compareTaskPo) {
        /*zyf:主动清理一次*/
        cleanDirty();
        if (containsStatus(status)) {
            try {
                Constructor constructor = status2Processor.get(status).getConstructor(CompareTaskPo.class);
                return (AbstractProcessor) constructor.newInstance(compareTaskPo);
            } catch (Exception ex) {
                LOGGER.error("EventDispatcher dispatcher getInstance error, exception:", ex);
            }
        }
        return null;
    }

    public static boolean containsStatus(int status) {
        return status2Processor.containsKey(status);
    }

    public static synchronized void cleanDirty() {
        curProcessors.removeIf(AbstractProcessor::allowRecycle);
    }

    public static int getTaskCount() {
        return curProcessors.size();
    }
}

定时任务定义

针对以上的内容,我们内部维护一个基本的定时器来完成实际的业务自动化流转处理,主要代码和业务处理如下:

/**
 * @author yanfengzhang
 * @description 定时任务:定时读取数据库中比对数据需要处理的task任务,并分发到响应的processor处理。
 * @date 2022/5/2  02:18
 */
@Component
@DependsOn("beanFactoryUtil")
public class CronServer implements InitializingBean {

    @Autowired
    private CompareTaskMapper compareTaskMapper;

    private static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor();
    private static final Logger LOGGER = LoggerFactory.getLogger(CronServer.class);

    @Override
    public void afterPropertiesSet() throws Exception {
        SCHEDULED_EXECUTOR_SERVICE.scheduleWithFixedDelay(new CompareCronTask(), 20, 3, TimeUnit.SECONDS);
    }

    class CompareCronTask implements Runnable {
        @Override
        public void run() {
            if (BsapCompareSwitch.cronServerPause()) {
                LOGGER.warn("--------------cron server pause--------------");
                return;
            }
            int taskCount = EventDispatcher.getTaskCount();
            /*清理已经完成的任务*/
            EventDispatcher.cleanDirty();
            LOGGER.warn("[--------当前正在运行的任务数量为:{}-------]", EventDispatcher.getTaskCount());
            if (taskCount != 0 && EventDispatcher.getTaskCount() == 0) {
                LOGGER.warn("[------------------------任务数量存在问题,主动进行gc处理中---------------------------]");
                System.gc();
            }
            int curSecond = (int) (System.currentTimeMillis() / 1000);
            try {
                List<CompareTaskPo> compareTaskPos = compareTaskMapper.selectCompareTaskPoByTimeRange(curSecond - 20);
                if (CollectionUtils.isEmpty(compareTaskPos)) {
                    return;
                }
                for (CompareTaskPo compareTaskPo : compareTaskPos) {
                    /*如果处理的内容不在我们规定的范围时直接跳出*/
                    if (!EventDispatcher.containsStatus(compareTaskPo.getStatus())) {
                        continue;
                    }
                    /**
                     * 思考:
                     * 尝试更新一下last_ping_update的时间,更新成功代表抢锁成功,然后执行任务。
                     * 如果更新成功但是执行失败,待后续CronServer运行时再次尝试。
                     * 每台服务器每次定时任务只运行一个任务,防止同一台服务器抢占多个任务导致压力过大、负载不均衡的问题。
                     * (由于目前任务运行周期在多台服务器是一致的,所以极端情况下可能会出现任务被一台机器抢占的情况,
                     * 后续可以考虑使不同机器的运行周期随机或者引入分布式任务分配(负载均衡)策略)
                     */
                    if (compareTaskMapper.updateLastPingTimeByVersion(compareTaskPo.getId(), curSecond - 15, compareTaskPo.getVersion()) > 0) {
                        compareTaskPo.setVersion(compareTaskPo.getVersion() + 1);
                        compareTaskPo.setLastPingTime(curSecond - 15);
                        if (EventDispatcher.dispatch(compareTaskPo.getStatus(), compareTaskPo)) {
                            LOGGER.warn("CronServer 提交一个任务,任务id为{}, 任务详细信息:{}", compareTaskPo.getId(), JSON.toJSON(compareTaskPo));
                            break;
                        }
                    }
                }
            } catch (Exception e) {
                LOGGER.error("server cron run catch an exception:", e);
            }
        }
    }
}

总结

整体上的大致模版实现已如上进行了简化,其中有不同的理解的可留言讨论,后续复杂系统的内容后续有时间在进行分享,谢谢!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/712746.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

小红书教程简化版,从0开始走向专业,小红书-主理人培养计划 (13节)

课程目录 1-小红书分析与拆解.mp4 2-小红书电商玩法.mp4 3-小红书基础信息设置10_1.mp4 4-小红书如何开店&#xff1f;.mp4 5-小红书店铺设置&#xff08;1&#xff09;.mp4 5-小红书店铺设置.mp4 6-小红书笔记制作与产品发布.mp4 7-小红书运营的文案与标题.mp4 8-小红…

Spring Boot 自定义Starter

自定义starter 创建pom项目 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0"xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.ap…

MySQL的三种重要的日志

日志 Mysql有三大日志系统 Undo Log&#xff08;回滚日志&#xff09;&#xff1a;记录修改前的数据&#xff0c;用于事务回滚和 MVCC&#xff08;多版本并发控制&#xff09;。 Redo Log&#xff08;重做日志&#xff09;&#xff1a;记录数据变更&#xff0c;用于崩溃恢复&…

XAMPP PHP-CGI 远程代码执行漏洞(CVE-2024-4577)

漏洞概述&#xff1a; PHP 是一种被广泛应用的开放源代码的多用途脚本语言&#xff0c;PHP-CGI 是 PHP 自带的 FastCGI 管理器。是一个实现了 CGI 协议的程序&#xff0c;用来解释 PHP 脚本的程序&#xff0c;2024 年 6 月 7 日&#xff0c;推特安全上 orange 公开了其漏洞细节…

基于Wireshark实现对FTP的抓包分析

基于Wireshark实现对FTP的抓包分析 前言一、虚拟机Win10环境配置二、FileZilla客户端的安装配置下载FileZilla客户端安装FileZilla 三、FileZilla Server安装下载FileZilla Server安装 四、实现对FTP的抓包前置工作实现抓包完成抓包 前言 推荐一个网站给想要了解或者学习人工智…

MySQL学习笔记-进阶篇-SQL优化

SQL优化 插入数据 insert优化 1&#xff09;批量插入 insert into tb_user values(1,Tom),(2,Cat),(3,Jerry); 2&#xff09;手动提交事务 mysql 默认是自动提交事务&#xff0c;这样会导致频繁的开启和提交事务&#xff0c;影响性能 start transaction insert into tb_us…

【面经总结】Java基础 - SPI

SPI 什么是 SPI&#xff1f; 提供给服务提供者去使用的一个接口 SPI 的优点 低耦合配置灵活多态性 SPI 的应用场景 JDBCSLF4J 日志

Pandas AI:最棒的大模型数据分析神器!

暑期实习基本结束了&#xff0c;校招即将开启。 不同以往的是&#xff0c;当前职场环境已不再是那个双向奔赴时代了。求职者在变多&#xff0c;HC 在变少&#xff0c;岗位要求还更高了。 最近&#xff0c;我们又陆续整理了很多大厂的面试题&#xff0c;帮助一些球友解惑答疑&…

C++ 46 之 关系运算符的重载

#include <iostream> #include <string> using namespace std;class Students06{ public:string m_name;int m_age;Students06(string name, int age){this->m_name name;this->m_age age;}// 重载了 bool operator(Students06 &stu){if(this->m_na…

java:spring actuator添加自定义endpoint

# 项目代码资源&#xff1a; 可能还在审核中&#xff0c;请等待。。。 https://download.csdn.net/download/chenhz2284/89437274 # 项目代码 【pom.xml】 <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId&…

Cocos2d-x 4.0 工程首次建立与编译(Mac m1)

Mac m1芯片下将cocos2d-x升级至4.0版本后&#xff0c;官方剔除了不同平台的工程以及变更了编译方式&#xff0c;直接使用cmake构建&#xff0c;需要做一些前置的准备工作。 环境准备&#xff1a; 项 版本 备注 MacOS10.3 or laterpython2.7.16(建议>2.7.10)cmake3.29.3Do…

Android 工程副总裁卸任

Android 工程副总裁卸任 Android工程副总裁Dave Burke宣布&#xff0c;他将辞去领导Android工程的职位&#xff0c;将重心转向“AI/生物”项目。不过&#xff0c;他并没有离开Alphabet&#xff0c;目前仍将担任Android系统开发顾问的角色。 Burke参与了Android系统的多个关键…

TCP三次握手的过程

一、什么是TCP TCP是面向连接的、可靠的、基于字节流的传输层通信协议。 二、TCP的头部格式 序列号:在建立连接时由计算机生成的随机数作为其初始值&#xff0c;通过SYN包传给接收端主机&#xff0c;每发送一次数据&#xff0c;就「累加」一次该「数据字节数」的大小。用来解…

155. 最小栈 力扣 python 空间换时间 o(1) 腾讯面试题

设计一个支持 push &#xff0c;pop &#xff0c;top 操作&#xff0c;并能在常数时间内检索到最小元素的栈。 实现 MinStack 类: MinStack() 初始化堆栈对象。void push(int val) 将元素val推入堆栈。void pop() 删除堆栈顶部的元素。int top() 获取堆栈顶部的元素。int get…

PCB设计简介

PCB电路板各层的含义 A. Signal And Plane Layers(S) 1. Signal Layers(信号层): 信号层主要用于布置电路板上的导线。Altium Designer提供了32个信号层&#xff0c;包括Top layer(顶层)&#xff0c;Bottom layer(底层)和32个内电层。 包括&#xff1a;Top layer(顶层),Bott…

CleanMyMac占用内存大吗 CleanMyMac如何释放内存空间

Mac OS上内存可谓是“寸土寸金”&#xff0c;每一M的内存都是真金白银换来的。为了有更充足的系统空间&#xff0c;有用户会使用系统清理和优化工具CleanMyMac&#xff0c;那么下面我们来看看CleanMyMac占用内存大吗&#xff0c;CleanMyMac如何释放内存空间的相关内容吧。 一、…

spring boot配置ssl证书,支持https访问

1. 阿里云官网下载证书,云控制台搜索ssl&#xff0c;点击进入。 2.点击免费证书&#xff0c;立即购买。 3. 点击创建证书&#xff0c;填写完证书申请后&#xff0c;等待证书签发。 4. 证书签发以后&#xff0c;点击下载证书&#xff0c;spring boot选tomcat服务器类型的。 5. …

Linux应用编程 - i2c-dev操作I2C

嵌入式Linux操作I2C设备&#xff0c;我们一般会在内核态编写I2C驱动程序。另外还能在用户空间编写I2C程序&#xff0c;下面介绍相关代码的实现。 i2c-dev框架在内核中封装了I2C通信所需要的所有通信细节&#xff0c;I2C适配器会在/dev目录下创建字符设备&#xff0c;例如&#…

如何避免销售飞单私单!教你如何巧妙避开陷阱,业绩飙升!

明明投入了大量的时间和精力&#xff0c;客户却悄无声息地消失了&#xff1f;或是突然有一天&#xff0c;你发现原本属于你的订单被同事悄悄抢走&#xff1f;这背后&#xff0c;很可能隐藏着销售飞单私单的陷阱。今天&#xff0c;就让我们一起探讨如何巧妙避开这些陷阱&#xf…

TCP与UDP案例

udp不会做拆分整合什么的 多大就是多大