package com.izhaowo.support.quartz;

import java.lang.Class;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.PersistJobDataAfterExecution;
import org.quartz.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.ObjectUtils;

@PersistJobDataAfterExecution
@DisallowConcurrentExecution
/* loaded from: input_file:com/izhaowo/support/quartz/AbstractConcurrentJob.class */
public abstract class AbstractConcurrentJob<T, K extends Class<T>, M> extends AbstractJob<T, K> {
    private static final Logger log = LoggerFactory.getLogger(AbstractConcurrentJob.class);
    private final ThreadLocal<LinkedBlockingQueue<M>> queueLocal;

    /* loaded from: input_file:com/izhaowo/support/quartz/AbstractConcurrentJob$SubTask.class */
    protected final class SubTask implements Callable<TaskResult<M>> {
        final TaskResult<M> result = new TaskResult<>();
        final LinkedBlockingQueue<M> queue;

        public SubTask(LinkedBlockingQueue<M> linkedBlockingQueue) {
            this.queue = linkedBlockingQueue;
        }

        @Override // java.util.concurrent.Callable
        public final TaskResult<M> call() throws Exception {
            while (true) {
                M poll = this.queue.poll();
                if (ObjectUtils.isEmpty(poll)) {
                    AbstractConcurrentJob.log.info("执行完毕 {}", Long.valueOf(Thread.currentThread().getId()));
                    return this.result;
                }
                try {
                    AbstractConcurrentJob.this.doSubTask(poll);
                    this.result.success(poll);
                } catch (Throwable th) {
                    AbstractConcurrentJob.log.error("处理失败", th);
                    this.result.fail(poll);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:com/izhaowo/support/quartz/AbstractConcurrentJob$TaskResult.class */
    public static final class TaskResult<Y> {
        final Collection<Y> fail = new ArrayList();
        final Collection<Y> success = new ArrayList();

        protected TaskResult() {
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void success(Y y) {
            this.success.add(y);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void fail(Y y) {
            this.fail.add(y);
        }

        public Collection<Y> getFail() {
            return this.fail;
        }

        public Collection<Y> getSuccess() {
            return this.success;
        }
    }

    public AbstractConcurrentJob(Scheduler scheduler, ScheduledThreadPoolExecutor scheduledThreadPoolExecutor) {
        super(scheduler, scheduledThreadPoolExecutor);
        this.queueLocal = new ThreadLocal<>();
    }

    protected TaskResult<M> startConcurrentTask(Collection<M> collection) throws InterruptedException {
        return startConcurrentTask(collection, Double.valueOf(Math.ceil((collection.size() * 1.0d) / getNumJobOfSubTask())).intValue());
    }

    protected TaskResult<M> startConcurrentTask(Collection<M> collection, int i) throws InterruptedException {
        if (!ObjectUtils.isEmpty(this.queueLocal.get())) {
            this.queueLocal.get().clear();
        }
        this.queueLocal.set(new LinkedBlockingQueue<>(collection));
        List list = (List) Stream.iterate(0, num -> {
            return Integer.valueOf(num.intValue() + 1);
        }).limit(i).map(num2 -> {
            return this.executor.submit(new SubTask(this.queueLocal.get()));
        }).collect(Collectors.toList());
        while (!this.queueLocal.get().isEmpty()) {
            Thread.sleep(100L);
        }
        log.info("所有任务分完 {}", getJobName());
        while (!((Boolean) list.stream().map((v0) -> {
            return v0.isDone();
        }).filter(bool -> {
            return !bool.booleanValue();
        }).findFirst().orElseGet(() -> {
            return true;
        })).booleanValue()) {
            Thread.sleep(100L);
        }
        log.info("所有任务完成 {}", getJobName());
        return (TaskResult) list.stream().map(future -> {
            try {
                return (TaskResult) future.get();
            } catch (Exception e) {
                return null;
            }
        }).filter(taskResult -> {
            return taskResult != null;
        }).reduce(new TaskResult(), (taskResult2, taskResult3) -> {
            taskResult2.success.addAll(taskResult3.success);
            taskResult2.fail.addAll(taskResult3.fail);
            return taskResult2;
        });
    }

    protected abstract void doSubTask(M m);

    protected int getNumJobOfSubTask() {
        return 10;
    }

    private ThreadLocal<LinkedBlockingQueue<M>> getQueueLocal() {
        return this.queueLocal;
    }
}
