package com.izhaow.distributed.event;

import com.alibaba.fastjson.JSON;
import com.google.common.base.Joiner;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import com.izhaow.distributed.event.bean.EventPackage;
import com.izhaow.distributed.event.bind.EventMethodCache;
import com.izhaow.distributed.event.bind.OnEvent;
import com.izhaow.distributed.event.config.CommonEventConfigBean;
import com.izhaow.distributed.util.SpringTargetObjUtil;
import com.izhaowo.code.base.event.AbstractEvent;
import com.izhaowo.code.base.event.EventRegisterAble;
import com.izhaowo.code.base.utils.AssertUtil;
import com.izhaowo.code.base.utils.StringUtil;
import java.lang.reflect.Method;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.log4j.Logger;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;

/* loaded from: input_file:com/izhaow/distributed/event/RemoteCommonEventManager.class */
public class RemoteCommonEventManager implements ApplicationContextAware, ApplicationRunner, EventRegisterAble {
    private static final String GROUP_CONSUMER_HEADER = "GROUP_CONSUMER_";
    private static final String GROUP_PRODUCER_HEADER = "GROUP_PRODUCER_";
    private static final String TOPIC = "common_event_topic";
    public static final String DEFAULT_CHARSET = "UTF-8";

    @Value("${spring.application.name}")
    private String serviceName;
    private CommonEventConfigBean commonEventConfigBean;
    private ApplicationContext context;
    private final Multimap<String, EventMethodCache> call = ArrayListMultimap.create();
    private DefaultMQProducer producer;
    private static Logger logger = Logger.getLogger(RemoteCommonEventManager.class);
    private static final Set<String> TAG_SET = new HashSet();

    public RemoteCommonEventManager(CommonEventConfigBean commonEventConfigBean) {
        this.commonEventConfigBean = commonEventConfigBean;
    }

    public void registerEvent(AbstractEvent abstractEvent) {
        EventPackage eventPackage = new EventPackage();
        eventPackage.setAttach(abstractEvent.getTargetId());
        eventPackage.setName(abstractEvent.getClass().getName());
        try {
            logger.info("#event send:" + this.producer.send(new Message(TOPIC, abstractEvent.getClass().getSimpleName(), StringUtil.stringToBytes(JSON.toJSONString(eventPackage)))).toString());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.context = applicationContext;
    }

    public void run(ApplicationArguments applicationArguments) throws Exception {
        registerSelfListen();
        startEventConsumer();
        startEventProducer();
    }

    private boolean check(Method method) {
        if (method.getReturnType().equals(Void.TYPE) && method.getParameterCount() == 1) {
            return true;
        }
        logger.info("[注册事件方法只能有一个参数返回值为void]-[" + method.getName() + " 不满足]");
        return false;
    }

    private void registerSelfListen() {
        for (String str : this.context.getBeanDefinitionNames()) {
            try {
                Object bean = this.context.getBean(str);
                for (Method method : SpringTargetObjUtil.getTargetObj(bean).getClass().getMethods()) {
                    OnEvent onEvent = (OnEvent) method.getAnnotation(OnEvent.class);
                    if (onEvent != null && check(method)) {
                        TAG_SET.add(onEvent.event().getSimpleName());
                        this.call.put(onEvent.event().getName(), new EventMethodCache(onEvent.event(), bean, bean.getClass().getDeclaredMethod(method.getName(), method.getParameterTypes()), onEvent.transactionEvent()));
                        logger.info("[find register event method :" + method.getName() + "]");
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    private void startEventProducer() {
        new Thread(() -> {
            try {
                this.producer = new DefaultMQProducer(GROUP_PRODUCER_HEADER + this.serviceName.toUpperCase());
                this.producer.setNamesrvAddr(this.commonEventConfigBean.getHost());
                this.producer.start();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();
    }

    private void startEventConsumer() {
        new Thread(() -> {
            DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer(GROUP_CONSUMER_HEADER + this.serviceName.toUpperCase());
            defaultMQPushConsumer.setNamesrvAddr(this.commonEventConfigBean.getHost());
            defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            try {
                defaultMQPushConsumer.subscribe(TOPIC, MessageSelector.byTag(Joiner.on("||").join(TAG_SET)));
                defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() { // from class: com.izhaow.distributed.event.RemoteCommonEventManager.1
                    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                        if (list != null) {
                            Iterator<MessageExt> it = list.iterator();
                            while (it.hasNext()) {
                                EventPackage eventPackage = (EventPackage) JSON.parseObject(StringUtil.byteToString(it.next().getBody()), EventPackage.class);
                                List list2 = (List) RemoteCommonEventManager.this.call.get(eventPackage.getName());
                                if (!AssertUtil.isNull(list2)) {
                                    Iterator it2 = list2.iterator();
                                    while (it2.hasNext()) {
                                        try {
                                            ((EventMethodCache) it2.next()).run(Class.forName(eventPackage.getName()).getConstructor(String.class).newInstance(eventPackage.getAttach()));
                                        } catch (Exception e) {
                                            e.printStackTrace();
                                        }
                                    }
                                }
                            }
                        }
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                });
                defaultMQPushConsumer.start();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();
    }
}
