package org.dataone.service.cn.replication.auditor.v1.controller;

import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import org.apache.log4j.Logger;
import org.dataone.cn.dao.DaoFactory;
import org.dataone.cn.dao.ReplicationDao;
import org.dataone.cn.dao.exceptions.DataAccessException;
import org.dataone.cn.hazelcast.HazelcastClientFactory;
import org.dataone.service.types.v1.Identifier;

/* loaded from: input_file:org/dataone/service/cn/replication/auditor/v1/controller/AbstractReplicationAuditor.class */
public abstract class AbstractReplicationAuditor implements Runnable {
    private static Logger log = Logger.getLogger(AbstractReplicationAuditor.class.getName());
    protected ReplicationDao replicationDao = DaoFactory.getReplicationDao();
    private ExecutorService executorService = Executors.newFixedThreadPool(getTaskPoolSize());

    protected abstract String getLockName();

    protected abstract Date calculateAuditDate();

    protected abstract List<Identifier> getPidsToAudit(Date date, int i, int i2) throws DataAccessException;

    protected abstract Callable<String> newAuditTask(List<Identifier> list, Date date);

    protected abstract int getMaxPages();

    protected abstract int getTaskPoolSize();

    protected abstract int getPageSize();

    protected abstract int getPidsPerTaskSize();

    protected abstract boolean shouldRunAudit();

    protected abstract long getFutureExecutionWaitTimeSeconds();

    @Override // java.lang.Runnable
    public void run() {
        auditReplication();
    }

    public void auditReplication() {
        if (shouldRunAudit()) {
            Lock processingLock = getProcessingLock();
            try {
                if (tryLock(processingLock)) {
                    Date calculateAuditDate = calculateAuditDate();
                    List<Identifier> list = null;
                    for (int i = 1; i < getMaxPages(); i++) {
                        try {
                            list = getPidsToAudit(calculateAuditDate, i, getPageSize());
                        } catch (DataAccessException e) {
                            log.error("Unable to retrieve replicas by date using replication dao for audit date: " + calculateAuditDate.toString() + ".", e);
                        }
                        if (list.size() == 0) {
                            break;
                        }
                        auditPids(list, calculateAuditDate);
                    }
                }
            } finally {
                releaseLock(processingLock);
            }
        }
    }

    protected boolean tryLock(Lock lock) {
        if (lock == null) {
            return false;
        }
        return lock.tryLock();
    }

    protected void releaseLock(Lock lock) {
        if (lock != null) {
            lock.unlock();
        }
    }

    protected Lock getProcessingLock() {
        return HazelcastClientFactory.getProcessingClient().getLock(getLockName());
    }

    private void auditPids(List<Identifier> list, Date date) {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        ArrayList arrayList3 = new ArrayList();
        ArrayList arrayList4 = new ArrayList();
        Iterator<Identifier> it2 = list.iterator();
        while (it2.hasNext()) {
            arrayList.add(it2.next());
            if (arrayList.size() >= getPidsPerTaskSize()) {
                arrayList2.add(newAuditTask(arrayList, date));
                arrayList.clear();
            }
            if (arrayList2.size() >= getTaskPoolSize()) {
                submitTasks(arrayList2, arrayList3, arrayList4);
            }
            if (!arrayList4.isEmpty()) {
                handleFutures(arrayList4);
            }
        }
        if (arrayList2.size() > 0) {
            submitTasks(arrayList2, arrayList3, arrayList4);
        }
        handleFutures(arrayList3);
    }

    private void submitTasks(List<Callable<String>> list, List<Future> list2, List<Future> list3) {
        list3.clear();
        list3.addAll(list2);
        list2.clear();
        Iterator<Callable<String>> it2 = list.iterator();
        while (it2.hasNext()) {
            submitTask(list2, it2.next());
        }
        list.clear();
    }

    private void submitTask(List<Future> list, Callable<String> callable) {
        Future future = null;
        try {
            future = this.executorService.submit(callable);
        } catch (RejectedExecutionException e) {
            log.error("Unable to submit tasks to executor service. ", e);
            log.error("Sleeping for 10 seconds, trying again");
            try {
                Thread.sleep(10000L);
            } catch (InterruptedException e2) {
                log.error("sleep interrupted.", e2);
            }
            try {
                future = this.executorService.submit(callable);
            } catch (RejectedExecutionException e3) {
                log.error("Still unable to submit tasks to executor service, failing. ", e3);
            }
        }
        if (future != null) {
            list.add(future);
        }
    }

    private void handleFutures(List<Future> list) {
        Iterator<Future> it2 = list.iterator();
        while (it2.hasNext()) {
            handleFuture(it2.next());
        }
    }

    private void handleFuture(Future future) {
        boolean z = false;
        boolean z2 = false;
        while (!z) {
            try {
                String str = (String) future.get(getFutureExecutionWaitTimeSeconds(), TimeUnit.SECONDS);
                if (str != null) {
                    log.debug("Replica audit task completed with result: " + str);
                }
            } catch (InterruptedException e) {
                log.error("Replica audit task interrupted, cancelling.", e);
                future.cancel(true);
            } catch (CancellationException e2) {
                log.error("Replica audit task cancelled.", e2);
            } catch (ExecutionException e3) {
                log.error("Replica audit task threw exception during execution. ", e3);
            } catch (TimeoutException e4) {
                if (z2) {
                    log.error("Replica audit task timed out twice, cancelling.");
                    future.cancel(true);
                } else {
                    log.debug("Replica audit task timed out.  waiting another" + getFutureExecutionWaitTimeSeconds() + " seconds.");
                    z2 = true;
                }
            }
            z = future.isDone();
        }
    }
}
