Hmily(3)

Hmily(3)5.提供端的方法也需要Hmily注解,当然也会有确认取消方法,执行切面方法DubboHmilyTransactionInterceptor#interceptor这个时候的context不会为空,转成对象HmilyTransactionContext,HmilyTransactionAspectServiceImpl#invoke找出合适的处理类HmilyTransactionFactorySe…

大家好,又见面了,我是你们的朋友全栈君。

5. 提供端的方法也需要Hmily注解,当然也会有确认取消方法,执行切面方法DubboHmilyTransactionInterceptor#interceptor这个时候的context不会为空,转成对象HmilyTransactionContext,HmilyTransactionAspectServiceImpl#invoke找出合适的处理类HmilyTransactionFactoryServiceImpl#factoryOf即ParticipantHmilyTransactionHandler

public Object handler(final ProceedingJoinPoint point, final HmilyTransactionContext context) throws Throwable {
        HmilyTransaction hmilyTransaction = null;
        HmilyTransaction currentTransaction;
        switch (HmilyActionEnum.acquireByCode(context.getAction())) {
            case TRYING:
                try {
                    hmilyTransaction = hmilyTransactionExecutor.preTryParticipant(context, point);
                    final Object proceed = point.proceed();
                    hmilyTransaction.setStatus(HmilyActionEnum.TRYING.getCode());
                    //update log status to try
                    hmilyTransactionExecutor.updateStatus(hmilyTransaction);
                    return proceed;
                } catch (Throwable throwable) {
                    //if exception ,delete log.
                    hmilyTransactionExecutor.deleteTransaction(hmilyTransaction);
                    throw throwable;
                } finally {
                    HmilyTransactionContextLocal.getInstance().remove();
                }
            case CONFIRMING:
                currentTransaction = HmilyTransactionGuavaCacheManager
                        .getInstance().getHmilyTransaction(context.getTransId());
                hmilyTransactionExecutor.confirm(currentTransaction);
                break;
            case CANCELING:
                currentTransaction = HmilyTransactionGuavaCacheManager
                        .getInstance().getHmilyTransaction(context.getTransId());
                hmilyTransactionExecutor.cancel(currentTransaction);
                break;
            default:
                break;
        }
        Method method = ((MethodSignature) (point.getSignature())).getMethod();
        return DefaultValueUtils.getDefaultValue(method.getReturnType());
    }

刚开始时TRYING,参与者执行tey方法,构建HmilyTransaction,并且保存在Guava内存缓存中。然后发布保存事件保存在本服务所在的数据库中。最后保存上下文到ThreadLocal中返回。执行本地的业务方法,最后更新事务状态,清除ThreadLocal返回。

 public HmilyTransaction preTryParticipant(final HmilyTransactionContext context, final ProceedingJoinPoint point) {
        LogUtil.debug(LOGGER, "participant hmily transaction start..:{}", context::toString);
        final HmilyTransaction hmilyTransaction = buildHmilyTransaction(point, HmilyRoleEnum.PROVIDER.getCode(), context.getTransId());
        //cache by guava
        HmilyTransactionGuavaCacheManager.getInstance().cacheHmilyTransaction(hmilyTransaction);
        //publishEvent
        hmilyTransactionEventPublisher.publishEvent(hmilyTransaction, EventTypeEnum.SAVE.getCode());
        //Nested transaction support
        context.setRole(HmilyRoleEnum.LOCAL.getCode());
        HmilyTransactionContextLocal.getInstance().set(context);
        return hmilyTransaction;
    }

6. 继续回到消费方的StarterHmilyTransactionHandler,因为远程的rpc已经调用完毕。也就是returnValue = point.proceed();执行完毕,更新本地事务状态为TRYING(1, “try阶段完成”),开始执行确认操作,如果发生异常则执行取消操作,两者类似,只是操作不一样。都是通过线程池来异步执行,HmilyTransactionExecutor#confirm,更新本地事务状态,因为在每次在执行dubbo的filter的时候都会把参与方的信息记录下来,即updateParticipant,所以这块就用反射操作调用该确认方法,这个也是个RPC调用,之前的流程也会再来一遍。请求到达提供端,首先从HmilyTransactionGuavaCacheManager中获取事务信息,如果没有的话就会从数据库中查询。最后执行相应的操作。

 public void confirm(final HmilyTransaction currentTransaction) throws HmilyRuntimeException {
        LogUtil.debug(LOGGER, () -> "hmily transaction confirm .......!start");
        if (Objects.isNull(currentTransaction) || CollectionUtils.isEmpty(currentTransaction.getHmilyParticipants())) {
            return;
        }
        currentTransaction.setStatus(HmilyActionEnum.CONFIRMING.getCode());
        updateStatus(currentTransaction);
        final List<HmilyParticipant> hmilyParticipants = currentTransaction.getHmilyParticipants();
        List<HmilyParticipant> failList = Lists.newArrayListWithCapacity(hmilyParticipants.size());
        boolean success = true;
        if (CollectionUtils.isNotEmpty(hmilyParticipants)) {
            for (HmilyParticipant hmilyParticipant : hmilyParticipants) {
                try {
                    HmilyTransactionContext context = new HmilyTransactionContext();
                    context.setAction(HmilyActionEnum.CONFIRMING.getCode());
                    context.setRole(HmilyRoleEnum.START.getCode());
                    context.setTransId(hmilyParticipant.getTransId());
                    HmilyTransactionContextLocal.getInstance().set(context);
                    executeParticipantMethod(hmilyParticipant.getConfirmHmilyInvocation());
                } catch (Exception e) {
                    LogUtil.error(LOGGER, "execute confirm :{}", () -> e);
                    success = false;
                    failList.add(hmilyParticipant);
                } finally {
                    HmilyTransactionContextLocal.getInstance().remove();
                }
            }
            executeHandler(success, currentTransaction, failList);
        }
    }

7. 如果出现非一致性异常的话,需要保证事务的事务的最后一致性,通过HmilyTransactionSelfRecoveryScheduled定时程序来实现,获取延迟多长时间后的事务信息,只要为了防止并发的时候,刚新增的数据被执行.判断事务信息的角色,如果是提供者并且状态是try刚开始的话,说明本地事务都执行失败等,也不会影响消费方,直接删除日志即可,判断重试次数是否达到上限,判断分布式事务模式为TCC还是CC,如果事务角色是提供者,重试只能由消费执行。最后更新重试次数,继续执行确认或者取消方法。HmilyTransactionRecoveryService具体方法和HmilyTransactionExecutor类似

public void onApplicationEvent(final ContextRefreshedEvent event) {
        hmilyCoordinatorRepository = SpringBeanUtils.getInstance().getBean(HmilyCoordinatorRepository.class);
        this.scheduledExecutorService =
                new ScheduledThreadPoolExecutor(1,
                        HmilyThreadFactory.create("hmily-transaction-self-recovery", true));
        hmilyTransactionRecoveryService = new HmilyTransactionRecoveryService(hmilyCoordinatorRepository);
        selfRecovery();
    }

 /**
     * if have some exception by schedule execute hmily transaction log.
     */
    private void selfRecovery() {
        scheduledExecutorService
                .scheduleWithFixedDelay(() -> {
                    LogUtil.info(LOGGER, "self recovery execute delayTime:{}", () -> hmilyConfig.getScheduledDelay());
                    try {
                        final List<HmilyTransaction> hmilyTransactions = hmilyCoordinatorRepository.listAllByDelay(acquireData());
                        if (CollectionUtils.isEmpty(hmilyTransactions)) {
                            return;
                        }
                        for (HmilyTransaction hmilyTransaction : hmilyTransactions) {
                            // if the try is not completed, no compensation will be provided (to prevent various exceptions in the try phase)
                            if (hmilyTransaction.getRole() == HmilyRoleEnum.PROVIDER.getCode()
                                    && hmilyTransaction.getStatus() == HmilyActionEnum.PRE_TRY.getCode()) {
                                hmilyCoordinatorRepository.remove(hmilyTransaction.getTransId());
                                continue;
                            }
                            if (hmilyTransaction.getRetriedCount() > hmilyConfig.getRetryMax()) {
                                LogUtil.error(LOGGER, "This transaction exceeds the maximum number of retries and no retries will occur:{}", () -> hmilyTransaction);
                                continue;
                            }
                            if (Objects.equals(hmilyTransaction.getPattern(), PatternEnum.CC.getCode())
                                    && hmilyTransaction.getStatus() == HmilyActionEnum.TRYING.getCode()) {
                                continue;
                            }
                            // if the transaction role is the provider, and the number of retries in the scope class cannot be executed, only by the initiator
                            if (hmilyTransaction.getRole() == HmilyRoleEnum.PROVIDER.getCode()
                                    && (hmilyTransaction.getCreateTime().getTime()
                                    + hmilyConfig.getRecoverDelayTime() * hmilyConfig.getLoadFactor() * 1000
                                    > System.currentTimeMillis())) {
                                continue;
                            }
                            try {
                                hmilyTransaction.setRetriedCount(hmilyTransaction.getRetriedCount() + 1);
                                final int rows = hmilyCoordinatorRepository.update(hmilyTransaction);
                                // determine that rows>0 is executed to prevent concurrency when the business side is in cluster mode
                                if (rows > 0) {
                                    if (hmilyTransaction.getStatus() == HmilyActionEnum.TRYING.getCode()
                                            || hmilyTransaction.getStatus() == HmilyActionEnum.PRE_TRY.getCode()
                                            || hmilyTransaction.getStatus() == HmilyActionEnum.CANCELING.getCode()) {
                                        hmilyTransactionRecoveryService.cancel(hmilyTransaction);
                                    } else if (hmilyTransaction.getStatus() == HmilyActionEnum.CONFIRMING.getCode()) {
                                        hmilyTransactionRecoveryService.confirm(hmilyTransaction);
                                    }
                                }
                            } catch (Exception e) {
                                e.printStackTrace();
                                LogUtil.error(LOGGER, "execute recover exception:{}", e::getMessage);
                            }
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }, hmilyConfig.getScheduledInitDelay(), hmilyConfig.getScheduledDelay(), TimeUnit.SECONDS);

    }

    private Date acquireData() {
        return new Date(LocalDateTime.now().atZone(ZoneId.systemDefault())
                .toInstant().toEpochMilli() - (hmilyConfig.getRecoverDelayTime() * 1000));
    }

 

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/143204.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • Win10 CMD命令大全与超好用的快捷键

    Win10 CMD命令大全与超好用的快捷键一、WindowsCMD命令大全按组合键Win(Windows图标键)+R键打开运行窗口,输入“cmd”按回车即可打开cmd命令提示符在窗口右击选择属性可进行个性化设置~  1.calc:启动计算器  2.appwiz.cpl:程序和功能  3.certmgr.msc:证书管理实用程序  4.charmap:启动字符映射表  5.chkdsk.e…

    2022年5月30日
    36
  • linux 查看文件夹大小「建议收藏」

    linux 查看文件夹大小「建议收藏」最简单的查看方法可以使用ls-ll、ls-lh命令进行查看,当使用ls-ll,会显示成字节大小,而ls-lh会以KB、MB等为单位进行显示,这样比较直观一些。 通过命令du-h–max-depth=1*,可以查看当前目录下各文件、文件夹的大小,这个比较实用。 查询当前目录总大小可以使用du-sh,其中s代表统计汇总的意思,即只输出一个总和大小。…

    2022年10月27日
    0
  • linux查看进程下的线程_linux查看线程状态

    linux查看进程下的线程_linux查看线程状态鉴于linux下线程的广泛使用我们怎么查看某个进程拥有的线程id了现在很多服务的设计主进程->子进程->线程(比如mysql,varnish)主进程负责侦听网络上的连接并把连接发

    2022年8月3日
    26
  • php获取server端mac和clientmac的地址[通俗易懂]

    php获取server端mac和clientmac的地址

    2022年1月22日
    40
  • RSA算法原理——(3)RSA加解密过程及公式论证

    RSA算法原理——(3)RSA加解密过程及公式论证上期(RSA简介及基础数论知识)为大家介绍了:互质、欧拉函数、欧拉定理、模反元素这四个数论的知识点,而这四个知识点是理解RSA加密算法的基石,忘了的同学可以快速的过一遍。一、目前常见加密算法简介二、RSA算法介绍及数论知识介绍三、RSA加解密过程及公式论证二、RSA加解密过程及公式论证今天的内容主要分为三个部分:rsa密钥生成过程:讲解如何生成公钥和私钥rs…

    2022年5月29日
    36
  • js如何将json字符串转成json对象_前端json字符串转json对象

    js如何将json字符串转成json对象_前端json字符串转json对象vardata=[{"id":1,"startTime":"2017-12-1210:36:50","endTime":"2018-02-0200:00:00","value":"0.26","jobCode":"zd_test_02_171212103650"

    2022年10月7日
    0

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号