Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

出现了线程上下文流转不成功 #698

Open
kevinWangSheng opened this issue Sep 19, 2024 · 0 comments
Open

出现了线程上下文流转不成功 #698

kevinWangSheng opened this issue Sep 19, 2024 · 0 comments

Comments

@kevinWangSheng
Copy link

kevinWangSheng commented Sep 19, 2024

bg:在使用plusar进行处理消息消费的时候,消息的具体处理之前会使用该TransmittableThreadLocal进行移除和设置。
但是现在发现了一个上下文失效的情况:

消费代码:

@PulsarConsumer(topic = "cepf_platform_warehouse_shein",
            subscriptionName = "cepf_warehouse",
            serverName = "amsPulsar",
            clazz = byte[].class,
            initialPosition = SubscriptionInitialPosition.Latest,
            batchAckMode = BatchAckMode.MANUAL,
            serialization = Serialization.BYTE,
            maxRedeliverCount = 2,
            deadLetterTopic = "cepf_platform_warehouse_shein_dlq"
    )
    public void consumeMsg(PulsarMessage<byte[]> message ){
        String dataJson = new String(message.getValue());
        log.info("消费消息:{}", dataJson);
        JSONObject data = JSON.parseObject(dataJson);
        String messageTypeCode = data.getString("messageTypeCode");
        String messageId = data.getString("messageId");
        JSONObject messageContent = data.getJSONObject("messageContent");
        IMessageMgmtService messageMgmtService = MessageHandleAdapter.getMessageMgmtService(messageTypeCode);
        try {

            if(Objects.isNull(messageMgmtService)) {
                log.warn("messageTypeCode:{}不支持消费该messageTypeCode.messageId:{}", messageTypeCode, message.getMessageId());
                return;
            }

            Map<String, String> extParam = Maps.newHashMap();
            extParam.put("version", data.getString("createTime"));
            extParam.put("messageId", messageId);
            messageMgmtService.processMessage(messageContent, extParam);
            log.info("【{}】消费成功,messageId:{}", messageTypeCode,  message.getMessageId());
            return;
        } catch (Exception e) {
            //注意:需要区分什么类型的消息加入重试队列
            log.error("{}-消费失败加入重试队列pulsar:{}, messageId:{}", messageTypeCode, message.getMessageId(), messageId, e);
            throw e;
        }
    }

执行消费的逻辑:
具体的这段代码的逻辑:messageMgmtService.processMessage(messageContent, extParam);

    @Override
    public void processMessage(JSONObject messageContent, Map<String, String> extParam) {
        RLock rLock = null;
        try{
            //1.解析推送源数据
            WarehouseOriginalDataBO originalDataBO = this.parse(messageContent, getOriginalDataClass());
            originalDataBO.setVersion(extParam.get("version"));
            originalDataBO.setPlatformCode(getPlatformCode().getCode());
            originalDataBO.setItemId(getUniqueId(originalDataBO));
            originalDataBO.setBusinessType(getBusinessType().getType());

            //处理线程上线文companyId
            AkRequest request = AkRequestContext.getRequest();
            if(Objects.isNull(request)) {
                request = new AkRequest();
            }
            request.setCompanyId(String.valueOf(originalDataBO.getCompanyId()));
            AkRequestContext.put(request);

            //1.1校验一下data数据内是否都合法
            JSON rawResp = originalDataBO.getRawResp();
            if(Objects.isNull(rawResp)) {
                log.warn("[采集]推送库存消息不合法,raw resp 不能为null.biz-code:{},msg:{}", getPlatformCode().getName(), messageContent.toJSONString());
                return;
            }

            Data data = parseData(originalDataBO);

            //校验
            if(!isValidData(data)) {
                log.warn("[采集]推送库存消息不合法,raw resp 数据不合法biz-code:{},msg:{}",getPlatformCode().getName(), messageContent.toJSONString());
                return;
            }

            //3.校验数据版本号
            while (this.checkDataVersion(originalDataBO)) {
                //3.加锁
                rLock = getRLock(redisLockPrefix, String.valueOf(originalDataBO.getCompanyId()),
                        String.valueOf(originalDataBO.getStoreId()), originalDataBO.getItemId());

                if(rLock.tryLock(WAIT_TIME_OUT, LEASE_TIME_OUT, TimeUnit.SECONDS)) {

                    try {
                        //2.先落原始表
                        warehouseOriginalDataService.store(originalDataBO, getStock());
                        //4.进行业务操作
                        boolean deal = this.deal(messageContent, originalDataBO, data, extParam);

                        if(deal) {
                            //5.执行成功再更新redis最新的版本号
                            this.setVersion(originalDataBO);
                        }
                    } finally {
                        //6.释放锁
                        if(Objects.nonNull(rLock)) {
                            if(rLock.isLocked()) {
                                rLock.unlock();
                            }
                        }
                    }

                }
            }

        } catch (IllegalArgumentException e) {
            //1、消息的不合法,可以直接打出异常,不进行处理,消息也不需要重试,直接抛出堆栈信息不然不好排查问题
            log.warn("[采集]推送库存消息不合法,msg:{}",messageContent.toJSONString(), e);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
    
    在这里进行了上下文的设置//处理线程上线文companyId
            AkRequest request = AkRequestContext.getRequest();
            if(Objects.isNull(request)) {
                request = new AkRequest();
            }
            request.setCompanyId(String.valueOf(originalDataBO.getCompanyId()));
            AkRequestContext.put(request);


put 方法会先remove掉然后在putpublic static void put(AkRequest request) {
        if (Objects.nonNull(get())) {
            remove();
        }

        if (request != null) {
            requests.set(request);
        }

    }
    remoe方法public static void remove() {
        requests.remove();
    }

业务代码@Override
    public boolean deal(JSONObject msgContent, WarehouseOriginalDataBO warehouseOriginalDataBO,
                        SheinPurchaseOrderMsgBO orderMsgBO, Map<String, String> extParam) {
        return sheinPurchaseOrderService.dealPlatformPurchaseOrder(orderMsgBO);
    }



@Override
    @Transactional(rollbackFor = Exception.class)
    @RedissonLock(key = "#bo.orderNo",waitTime = 1000)
    public Boolean dealPlatformPurchaseOrder(SheinPurchaseOrderMsgBO bo) {
        // 1. 判断平台采购单是否在数据库中已存在 按什么唯一键 采购单号
        Long companyId = bo.getCompanyId();
        String orderNo = bo.getOrderNo();
        Long storeId = bo.getStoreId();
        SheinPurchaseOrderPO exist = sheinPurchaseOrderDAO.findByPurchaseOrderNo(companyId, storeId, orderNo);

        // 2. 平台采购单转换成po
        SheinPurchaseOrderPO sheinPurchaseOrderPO = convertBoToPo(bo);

        Boolean statusChange = false;
        if (Objects.nonNull(exist)) {
            sheinPurchaseOrderPO.setId(exist.getId());
            // TODO 如果状态更新才设置更新时间吗
            if (!Objects.equals(exist.getStatus(), bo.getStatus())) {
                sheinPurchaseOrderPO.setStatusChangeDate(new Date());
                statusChange = true;
            }
            // 生成唯一id
        } else {
            sheinPurchaseOrderPO.setStatusChangeDate(new Date());
        }

        // 3. 平台采购单行转换成 订单商品行数据
        List<SheinPurchaseOrderItemPO> needSaveItems = convertBoToItemPO(bo);

        // 4. 更新入库
        // 4.1 主表数据入库
        boolean orderSaveResult = sheinPurchaseOrderDAO.saveOrUpdate(sheinPurchaseOrderPO);

        // 4.2 item数据入库
        needSaveItems.forEach(r -> r.setOrderId(sheinPurchaseOrderPO.getId()));
        boolean itemSaveResult = saveItemPos(companyId, sheinPurchaseOrderPO.getId(), needSaveItems);

        // 4.3 保存变更日志
        // 插入采购单状态变更日志
        saveOperationLog(sheinPurchaseOrderPO, statusChange);

        // 4.3 新增配对关系
        try {
            List<String> mskuList = new ArrayList<>();
            if (CollectionUtils.isNotEmpty(needSaveItems)) {
                mskuList = needSaveItems.stream().map(SheinPurchaseOrderItemPO::getMsku).distinct().collect(Collectors.toList());
            }
            addPair(companyId, storeId, mskuList, PlatformCodeEnum.FBSheinManage.getCode());
        } catch (Exception exception) {
            log.info("采购单号:{} 新增配对失败", bo.getOrderNo(), exception);
        }

        return orderSaveResult & itemSaveResult;
    }
    
    会在RequestUtil这里用到这个线程的上下文public static Long getCompanyId() {
        String companyIdStr = AkRequestContext.getCompanyId();
        log.info("AkRequestContext 基础平台获取companyId:{}", companyIdStr);
        if (StrUtil.isNotBlank(companyIdStr)) {
            return Long.parseLong(companyIdStr);
        } else {
            HttpServletRequest request = getRequest();
            String headerCompanyId = request.getHeader(AK_COMPANY_ID_KEY);
            log.info("HttpServletRequest 请求头获取companyId:{}", headerCompanyId);
            if (StrUtil.isNotBlank(headerCompanyId)) {
                return Long.parseLong(headerCompanyId);
            } else {
                log.error("从线程的ThreadLocal中和请求header信息中获取不到companyId,现在给出默认值NON_COMPANY_ID请检查是否因为线程切换导致。threadName:{}", Thread.currentThread().getName());
                throw new BusinessException("获取不到企业ID");
            }
        }
    }

在数据库查询的时候会使用他作为分片进行分库分表。

然后现在日志是有问题的,具体问题如下:

这里的一条数据消费,他的ccompanyId是:901380674060402688
image

但是在执行后面变成了这个:901248346235397632

image

然后那个对应的901248346235397632也是一条消费逻辑,可以看到时间在48.993他进来了:
image

然后第一条也是48.993进来消费的,第二条覆盖了第一条,请问这个是ttl本身的原因呢还是什么?

使用的是plusar进行消息执行,也没看到IO阻塞切换线程,这个是什么原因呢?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant