package com.izhaowo.cloud;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.parser.ParserConfig;
import com.alibaba.fastjson.util.TypeUtils;
import com.aliyun.mq.http.MQConsumer;
import com.aliyun.mq.http.common.ClientException;
import com.aliyun.mq.http.model.Message;
import java.lang.reflect.ParameterizedType;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.convert.ConverterNotFoundException;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.format.support.FormattingConversionService;
import org.springframework.util.ObjectUtils;

/* loaded from: input_file:com/izhaowo/cloud/MessageConsumer.class */
public class MessageConsumer implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(MessageConsumer.class);
    public static final String REDIS_MQ_MSG_KEY = "rmmk";
    final MQConsumer consumer;
    final FormattingConversionService conversionService;
    final StringRedisTemplate stringRedisTemplate;
    final ConsumerInfo info;
    final Class<?>[] basicDataTypes = {String.class, Number.class, Enum.class, Boolean.class, Character.class, Date.class, LocalDate.class, LocalDateTime.class, LocalTime.class};

    public MessageConsumer(MQConsumer mQConsumer, ConsumerInfo consumerInfo, FormattingConversionService formattingConversionService, StringRedisTemplate stringRedisTemplate) {
        this.consumer = mQConsumer;
        this.info = consumerInfo;
        this.conversionService = formattingConversionService;
        this.stringRedisTemplate = stringRedisTemplate;
    }

    @Override // java.lang.Runnable
    public void run() {
        while (true) {
            try {
                List consumeMessage = this.consumer.consumeMessage(1, 2);
                if (!ObjectUtils.isEmpty(consumeMessage)) {
                    Message message = (Message) consumeMessage.get(0);
                    log.info("\n---------------------------------------------\nMessage:{}-{}\n---------------------------------------------", message.getMessageTag(), message.getMessageBodyString());
                    String messageKey = message.getMessageKey();
                    if (ObjectUtils.isEmpty(messageKey)) {
                        messageKey = message.getMessageId();
                    }
                    String cacheKey = getCacheKey(messageKey);
                    String str = (String) this.stringRedisTemplate.opsForValue().getAndSet(cacheKey, "0");
                    if (ObjectUtils.isEmpty(str)) {
                        this.stringRedisTemplate.boundValueOps(cacheKey).expire(1L, TimeUnit.MINUTES);
                        handleMessage(message);
                    } else if (str.equals("0")) {
                        log.info("消息正在被其他服务处理{}", messageKey);
                    } else {
                        log.info("已经处理过，直接确认消息{}", messageKey);
                        ackMessage(message);
                    }
                }
            } catch (ClientException e) {
                log.info("shutdown mq: " + e.getMessage());
                return;
            } catch (Exception e2) {
                log.error("mq >>>> err", e2);
                return;
            }
        }
    }

    private String getCacheKey(String str) {
        return "rmmk_" + this.info.getGroupId() + "_" + str;
    }

    private void handleMessage(Message message) {
        try {
            try {
                if (((Boolean) this.info.getTargetMethod().invoke(this.info.getProxyBean(), convertBody(message.getMessageBodyString()))).booleanValue()) {
                    ackMessage(message);
                } else {
                    rejectMessage(message);
                }
            } catch (Exception e) {
                log.error("处理消息失败:" + e.getMessage());
                rejectMessage(message);
            }
        } catch (IllegalArgumentException e2) {
            rejectMessage(message);
        }
    }

    private void rejectMessage(Message message) {
        this.stringRedisTemplate.delete(getCacheKey(message.getMessageKey()));
    }

    private void ackMessage(Message message) {
        try {
            this.consumer.ackMessage(Collections.singletonList(message.getReceiptHandle()));
            log.info("message accepted: {}", message.getMessageId());
            this.stringRedisTemplate.boundValueOps(getCacheKey(message.getMessageKey())).expire(1L, TimeUnit.HOURS);
        } catch (Exception e) {
            log.error("err", e);
        }
    }

    private Object convertBody(String str) {
        if (ObjectUtils.isEmpty(str)) {
            return null;
        }
        Class<?> parameterType = this.info.getParameterType();
        try {
            return isBasicType(parameterType) ? TypeUtils.cast(str, parameterType, ParserConfig.getGlobalInstance()) : Collection.class.isAssignableFrom(parameterType) ? JSON.parseArray(str, (Class) ((ParameterizedType) parameterType.getGenericSuperclass()).getActualTypeArguments()[0]) : parameterType.isArray() ? JSON.parseArray(str, parameterType.getComponentType()).toArray() : JSON.parseObject(str, parameterType);
        } catch (JSONException e) {
            log.error("解析消息错误", e);
            try {
                return this.conversionService.convert(str, parameterType);
            } catch (ConverterNotFoundException e2) {
                log.error("解析消息错误", e2);
                log.error("无法转换数据类型{},{}", parameterType, str);
                throw new IllegalArgumentException("无法转换数据类型");
            }
        }
    }

    private boolean isBasicType(Class<?> cls) {
        for (Class<?> cls2 : this.basicDataTypes) {
            if (cls2.isAssignableFrom(cls)) {
                return true;
            }
        }
        return cls.isEnum();
    }

    protected MQConsumer getConsumer() {
        return this.consumer;
    }

    protected FormattingConversionService getConversionService() {
        return this.conversionService;
    }

    protected StringRedisTemplate getStringRedisTemplate() {
        return this.stringRedisTemplate;
    }

    protected ConsumerInfo getInfo() {
        return this.info;
    }

    protected Class<?>[] getBasicDataTypes() {
        return this.basicDataTypes;
    }
}
