package org.dataone.cn.batch.synchronization.type;

import com.hazelcast.core.EntryEvent;
import com.hazelcast.core.EntryListener;
import com.hazelcast.core.ILock;
import com.hazelcast.core.IMap;
import com.hazelcast.core.IQueue;
import com.hazelcast.core.ISet;
import java.util.Collection;
import java.util.Deque;
import java.util.Iterator;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import org.dataone.cn.batch.synchronization.type.DistributedDataClient;
import org.dataone.cn.hazelcast.HazelcastClientFactory;
import org.dataone.cn.synchronization.types.SyncObject;
import org.dataone.configuration.Settings;

/* loaded from: input_file:org/dataone/cn/batch/synchronization/type/SyncQueueFacade.class */
public class SyncQueueFacade implements EntryListener<String, String> {
    static final Logger __logger = Logger.getLogger(SyncQueueFacade.class);
    protected Deque<String> nodeIdRoundRobin;
    protected String synchronizationObjectQueue;
    DistributedDataClient processingClient;
    Map<String, String> queueMap;
    Map<String, String> priorityQueueMap;

    public SyncQueueFacade() {
        this(new DistributedDataClient() { // from class: org.dataone.cn.batch.synchronization.type.SyncQueueFacade.1
            @Override // org.dataone.cn.batch.synchronization.type.DistributedDataClient
            public <K, V> Map<K, V> getMap(String str) {
                return new AbstractListenableMapAdapter<K, V>(HazelcastClientFactory.getProcessingClient().getMap(str)) { // from class: org.dataone.cn.batch.synchronization.type.SyncQueueFacade.1.1
                    @Override // org.dataone.cn.batch.synchronization.type.AbstractListenableMapAdapter, org.dataone.cn.batch.synchronization.type.DistributedDataClient.ListenableMap
                    public void notifyEntryListeners(String str2, Object obj, Object obj2) {
                    }

                    @Override // org.dataone.cn.batch.synchronization.type.AbstractListenableMapAdapter, org.dataone.cn.batch.synchronization.type.DistributedDataClient.ListenableMap
                    public void addEntryListener(EntryListener<K, V> entryListener, boolean z) {
                        ((IMap) this.map).addEntryListener(entryListener, z);
                    }
                };
            }

            @Override // org.dataone.cn.batch.synchronization.type.DistributedDataClient
            public <E> IQueue<E> getQueue(String str) {
                return HazelcastClientFactory.getProcessingClient().getQueue(str);
            }

            @Override // org.dataone.cn.batch.synchronization.type.DistributedDataClient
            public <E> ISet<E> getSet(String str) {
                return HazelcastClientFactory.getProcessingClient().getSet(str);
            }

            @Override // org.dataone.cn.batch.synchronization.type.DistributedDataClient
            public ILock getLock(String str) {
                return HazelcastClientFactory.getProcessingClient().getLock(str);
            }
        });
    }

    public SyncQueueFacade(DistributedDataClient distributedDataClient) {
        this.nodeIdRoundRobin = new ConcurrentLinkedDeque();
        this.synchronizationObjectQueue = Settings.getConfiguration().getString("dataone.hazelcast.synchronizationObjectQueue", "default");
        this.processingClient = null;
        this.queueMap = null;
        this.priorityQueueMap = null;
        this.processingClient = distributedDataClient;
        this.queueMap = this.processingClient.getMap("dataone.synchronization.queueMap");
        if (this.queueMap instanceof DistributedDataClient.ListenableMap) {
            ((DistributedDataClient.ListenableMap) this.queueMap).addEntryListener(this, false);
            __logger.info(this + " Added listener to 'dataone.synchronization.queueMap'");
        }
        this.priorityQueueMap = this.processingClient.getMap("dataone.synchronization.priority.queueMap");
        if (this.priorityQueueMap instanceof DistributedDataClient.ListenableMap) {
            ((DistributedDataClient.ListenableMap) this.priorityQueueMap).addEntryListener(this, false);
            __logger.info(this + " Added listener to 'dataone.synchronization.queueMap'");
        }
        Iterator<String> it2 = getQueueNames().iterator();
        while (it2.hasNext()) {
            String next = it2.next();
            this.nodeIdRoundRobin.add(next);
            __logger.info(this + " added '" + next + "' to its queue round-robin. size: " + size(next));
        }
        if (this.queueMap.containsKey("legacy")) {
            return;
        }
        this.queueMap.put("legacy", this.synchronizationObjectQueue);
        __logger.info(this + " added 'legacy' queue to its queue round-robin. size: " + size("legacy"));
    }

    public void add(SyncObject syncObject) {
        String nodeId = syncObject.getNodeId() == null ? "generic" : syncObject.getNodeId();
        if (!this.queueMap.containsKey(nodeId)) {
            this.queueMap.put(nodeId, "dataone.synchronization.queue." + nodeId);
        }
        this.processingClient.getQueue(this.queueMap.get(nodeId)).add(syncObject);
    }

    public void addWithPriority(SyncObject syncObject) {
        String nodeId = syncObject.getNodeId() == null ? "generic" : syncObject.getNodeId();
        if (!this.priorityQueueMap.containsKey(nodeId)) {
            this.priorityQueueMap.put(nodeId, "dataone.synchronization.priority.queue." + nodeId);
        }
        this.processingClient.getQueue(this.priorityQueueMap.get(nodeId)).add(syncObject);
    }

    public SyncObject poll(long j, TimeUnit timeUnit) throws InterruptedException {
        SyncObject syncObject = null;
        if (__logger.isTraceEnabled()) {
            __logger.trace(String.format("poll timeout = %d %s. nodeId RR size %d", Long.valueOf(j), timeUnit, Integer.valueOf(this.nodeIdRoundRobin.size())));
        }
        long currentTimeMillis = System.currentTimeMillis();
        long convert = currentTimeMillis + TimeUnit.MILLISECONDS.convert(j, timeUnit);
        while (currentTimeMillis < convert) {
            String nextNodeId = getNextNodeId();
            if (this.priorityQueueMap.containsKey(nextNodeId)) {
                if (__logger.isTraceEnabled()) {
                    __logger.trace("...polling priority queue: " + nextNodeId);
                }
                syncObject = (SyncObject) this.processingClient.getQueue(this.priorityQueueMap.get(nextNodeId)).poll(100L, TimeUnit.MICROSECONDS);
            }
            if (syncObject == null && this.queueMap.containsKey(nextNodeId)) {
                if (__logger.isTraceEnabled()) {
                    __logger.trace("...polling queue: " + nextNodeId);
                }
                syncObject = (SyncObject) this.processingClient.getQueue(this.queueMap.get(nextNodeId)).poll(10L, TimeUnit.MILLISECONDS);
            }
            if (syncObject != null) {
                break;
            }
            currentTimeMillis = System.currentTimeMillis();
        }
        return syncObject;
    }

    protected String getNextNodeId() {
        if (this.nodeIdRoundRobin.size() == 1) {
            return this.nodeIdRoundRobin.getLast();
        }
        if (this.nodeIdRoundRobin.size() == 0) {
            return null;
        }
        String removeFirst = this.nodeIdRoundRobin.removeFirst();
        this.nodeIdRoundRobin.addLast(removeFirst);
        return removeFirst;
    }

    public int size() {
        int i = 0;
        Iterator<String> it2 = getQueueNames().iterator();
        while (it2.hasNext()) {
            i += size(it2.next());
        }
        return i;
    }

    public int size(String str) {
        int i = 0;
        if (this.priorityQueueMap.containsKey(str)) {
            i = 0 + this.processingClient.getQueue(this.priorityQueueMap.get(str)).size();
        }
        if (this.queueMap.containsKey(str)) {
            i += this.processingClient.getQueue(this.queueMap.get(str)).size();
        }
        return i;
    }

    public BlockingQueue<Object> getLegacyQueue() {
        return this.processingClient.getQueue(this.queueMap.get("legacy"));
    }

    public TreeSet<String> getQueueNames() {
        if (__logger.isTraceEnabled()) {
            __logger.trace("...queueMap keyset: " + StringUtils.join((Collection) this.queueMap.keySet(), ','));
            __logger.trace("...priorityQueueMap keyset: " + StringUtils.join((Collection) this.priorityQueueMap.keySet(), ','));
        }
        TreeSet<String> treeSet = new TreeSet<>();
        treeSet.addAll(this.queueMap.keySet());
        if (__logger.isTraceEnabled()) {
            __logger.trace("...   size of queueNames set (1 of 2 addAlls)" + treeSet.size());
        }
        treeSet.addAll(this.priorityQueueMap.keySet());
        if (__logger.isTraceEnabled()) {
            __logger.trace("...   size of queueNames set (2 of 2 addAlls)" + treeSet.size());
        }
        return treeSet;
    }

    @Override // com.hazelcast.core.EntryListener
    public synchronized void entryAdded(EntryEvent<String, String> entryEvent) {
        if (this.nodeIdRoundRobin.contains(entryEvent.getKey())) {
            __logger.info(this + " the queue named '" + entryEvent.getKey() + "' is already in the queue round robin");
        } else {
            this.nodeIdRoundRobin.add(entryEvent.getKey());
            __logger.info(this + " added queue named '" + entryEvent.getKey() + "' to the queue round robin");
        }
    }

    @Override // com.hazelcast.core.EntryListener
    public void entryRemoved(EntryEvent<String, String> entryEvent) {
        if (__logger.isDebugEnabled()) {
            __logger.debug(this + " received entryRemoved event for key '" + entryEvent.getKey() + "'.  (no-op)");
        }
    }

    @Override // com.hazelcast.core.EntryListener
    public void entryUpdated(EntryEvent<String, String> entryEvent) {
        if (__logger.isDebugEnabled()) {
            __logger.debug(this + " received entryUpdated event for key '" + entryEvent.getKey() + "'.  (no-op)");
        }
    }

    @Override // com.hazelcast.core.EntryListener
    public void entryEvicted(EntryEvent<String, String> entryEvent) {
        if (__logger.isDebugEnabled()) {
            __logger.debug(this + " received entryEvicted event for key '" + entryEvent.getKey() + "'.  (no-op)");
        }
    }
}
