/**
 * This work was created by participants in the DataONE project, and is
 * jointly copyrighted by participating institutions in DataONE. For 
 * more information on DataONE, see our web site at http://dataone.org.
 *
 *   Copyright ${year}
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and 
 * limitations under the License.
 * 
 * $Id$
 */

package org.dataone.cn.indexer.processor;

import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Queue;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.commons.collections4.queue.CircularFifoQueue;
import org.apache.log4j.Logger;
import org.dataone.client.v2.formats.ObjectFormatCache;
import org.dataone.cn.hazelcast.HazelcastClientFactory;
import org.dataone.cn.index.processor.IndexTaskProcessingStrategy;
import org.dataone.cn.index.task.IndexTask;
import org.dataone.cn.index.task.IndexTaskRepository;
import org.dataone.cn.index.task.ResourceMapIndexTask;
import org.dataone.cn.index.util.PerformanceLogger;
import org.dataone.cn.indexer.XmlDocumentUtility;
import org.dataone.cn.indexer.parser.utility.SeriesIdResolver;
import org.dataone.cn.indexer.resourcemap.ForesiteResourceMap;
import org.dataone.cn.indexer.resourcemap.ResourceMap;
import org.dataone.cn.indexer.resourcemap.ResourceMapFactory;
import org.dataone.cn.indexer.solrhttp.HTTPService;
import org.dataone.cn.indexer.solrhttp.SolrDoc;
import org.dataone.configuration.Settings;
import org.dataone.service.exceptions.BaseException;
import org.dataone.service.types.v1.Identifier;
import org.dataone.service.types.v1.ObjectFormatIdentifier;
import org.dataone.service.types.v1.TypeFactory;
import org.dataone.service.types.v2.ObjectFormat;
import org.dataone.service.types.v2.SystemMetadata;
import org.dspace.foresite.OREParserException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.orm.hibernate3.HibernateOptimisticLockingFailureException;
import org.w3c.dom.Document;

/**
 * MockIndexTaskProcessor is the controller class for processing IndexTasks. These
 * tasks are generated by the IndexTaskGenerator class and associated
 * collaborators. MockIndexTaskProcessor uses the IndexTaskRepository to locate
 * IndexTasks for processing and delegates to IndexTaskProcessingStrategy
 * implementations for actual processing behavior.
 * 
 * @author sroseboo
 * 
 */
public class MockIndexTaskProcessor {

    private static Logger logger = Logger.getLogger(MockIndexTaskProcessor.class.getName());
    private static final String FORMAT_TYPE_DATA = "DATA";
    private static final String LOAD_LOGGER_NAME = "indexProcessorLoad";
    private static int BATCH_UPDATE_SIZE = Settings.getConfiguration().getInt("dataone.indexing.batchUpdateSize", 1000);
    private static int NUMOFPROCESSOR = Settings.getConfiguration().getInt("dataone.indexing.multiThreads.processThreadPoolSize", 10);
    private static int MAXATTEMPTS = Settings.getConfiguration().getInt("dataone.indexing.multiThreads.resourceMapWait.maxAttempt", 10);
    private static int FUTUREQUEUESIZE = Settings.getConfiguration().getInt("dataone.indexing.multiThreads.futureQueueSize", 100);
    private static ExecutorService executor = Executors.newFixedThreadPool(NUMOFPROCESSOR);
    private static final ReentrantLock lock = new ReentrantLock();
    private static CircularFifoQueue<Future> futureQueue = new CircularFifoQueue<Future>(FUTUREQUEUESIZE);
    //a concurrent map to main the information about current processing resource map objects and their referenced ids
    //the key is a referenced id and value is the id of resource map.
    private static ConcurrentHashMap <String, String> referencedIdsMap = new ConcurrentHashMap<String, String>(); 
    private static ConcurrentSkipListSet<String> seriesIdsSet = new ConcurrentSkipListSet<String>();
    
    @Autowired
    private IndexTaskRepository repo;

    @Autowired
    private IndexTaskProcessingStrategy deleteProcessor;

    @Autowired
    private IndexTaskProcessingStrategy updateProcessor;

    @Autowired
    private HTTPService httpService;

    @Autowired
    private String solrQueryUri;

    private PerformanceLogger perfLog = PerformanceLogger.getInstance();
    
    public MockIndexTaskProcessor() {
    }

    /**
     * Processes the index task queue written by the IndexTaskGenerator, 
     * but unlike {@link #processIndexTaskQueue()}, all IndexTasks that 
     * add solr documents will be grouped into batches and done in one
     * command to solr.
     */
    /*public void batchProcessIndexTaskQueue() {
        logProcessorLoad();
        
        List<IndexTask> queue = getIndexTaskQueue();
        List<IndexTask> batchProcessList = new ArrayList<IndexTask>(BATCH_UPDATE_SIZE);

        logger.info("batchProcessIndexTaskQueue, queue size: " + queue.size() + " tasks");
        
        IndexTask nextTask = getNextIndexTask(queue);
        while (nextTask != null) {
            batchProcessList.add(nextTask);
            logger.info("added task: " + nextTask.getPid());
            nextTask = getNextIndexTask(queue);
            if (nextTask != null)
                logger.info("next task: " + nextTask.getPid());
            logger.info("queue size: " + queue.size());
            
            if (batchProcessList.size() >= BATCH_UPDATE_SIZE) {
                batchProcessTasksOnThread(batchProcessList);
                batchProcessList = new ArrayList<IndexTask>(BATCH_UPDATE_SIZE);
            }
        }
        batchProcessTasksOnThread(batchProcessList);
        
        List<IndexTask> retryQueue = getIndexTaskRetryQueue();
        List<IndexTask> batchProcessRetryList = new ArrayList<IndexTask>(BATCH_UPDATE_SIZE);
        
        logger.info("batchProcessIndexTaskQueue, retry queue size: " + queue.size() + " tasks");
        
        nextTask = getNextIndexTask(retryQueue);
        while (nextTask != null) {
            batchProcessRetryList.add(nextTask);
            nextTask = getNextIndexTask(retryQueue);
            
            if (batchProcessRetryList.size() >= BATCH_UPDATE_SIZE) {
                batchProcessTasksOnThread(batchProcessRetryList);
                batchProcessRetryList = new ArrayList<IndexTask>(BATCH_UPDATE_SIZE);
            }
        }
        batchProcessTasksOnThread(batchProcessRetryList);
    }*/
    
    /**
     * Start a round of IndexTask processing. The IndexTask data store is
     * abstracted as a queue of tasks to process ordered by priority and
     * modification date. Typically invoked periodically by a quartz scheduled
     * job.
     */
    public void processIndexTaskQueue() {
        logProcessorLoad();
        
        List<IndexTask> queue = getIndexTaskQueue();
        IndexTask task = getNextIndexTask(queue);
        while (task != null) {
            processTaskOnThread(task);
            task = getNextIndexTask(queue);
        }

        processFailedIndexTaskQueue();
        /*List<IndexTask> retryQueue = getIndexTaskRetryQueue();
        task = getNextIndexTask(retryQueue);
        while (task != null) {
            processTaskOnThread(task);
            task = getNextIndexTask(retryQueue);
        }*/
        
    }
    
    public void mockProcessIndexTaskQueue() {
        
        System.out.println("     entering mockProcessIndexTaskQueue...");
        
        try {
            Thread.sleep(20000);
            
            
            
            
            
            
            
            
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        System.out.println("     ... exiting mockProcessIndexTaskQueue.");

    }
    
    /**
     * Index the given index queue
     * @param queue
     */
    public void processIndexTaskQueue(List<IndexTask> queue) {
        IndexTask task = null;
        if(queue != null) {
            int size = queue.size();
            task = getNextIndexTask(queue);
            while (task != null) {
                processTaskOnThread(task);
                task = getNextIndexTask(queue);
            }
            logger.info("MockIndexTaskProcessor.processIndexTaskQueue - finish submitting the index task queue with the size "+size+ " and current queue size is down to "+queue.size());
        }
        
    }
    
    /**
     * Index the index task which currently has the failed status on the index task repository
     */
    public void processFailedIndexTaskQueue() {
        List<IndexTask> retryQueue = getIndexTaskRetryQueue();
        if(retryQueue != null) {
            IndexTask task = getNextIndexTask(retryQueue);
            logger.info("MockIndexTaskProcessor.processFailedIndexTaskQueue with size "+retryQueue.size());
            while (task != null) {
                processTaskOnThread(task);
                task = getNextIndexTask(retryQueue);
            }
        }
    }

    /**
     * Logs the number of {@link IndexTask}s that need to be processed
     * and the number of tasks that have failed.
     */
    private void logProcessorLoad() {
        
        Logger loadLogger = Logger.getLogger(LOAD_LOGGER_NAME);
        
        Long newTasks = null;
        Long failedTasks = null;
        try {
            newTasks = repo.countByStatus(IndexTask.STATUS_NEW);
            failedTasks = repo.countByStatus(IndexTask.STATUS_FAILED);
        } catch (Exception e) {
            logger.error("Unable to count NEW or FAILED tasks in task index repository.", e);
        }
        
        loadLogger.info("new tasks:" + newTasks + ", tasks previously failed: " + failedTasks );
    }
    
    

    /*
     * Use multiple threads to process the index task
     */
    private void processTaskOnThread(final IndexTask task) {
        logger.info("using multiple threads to process index and the size of the thread pool is "+NUMOFPROCESSOR);
        Runnable newThreadTask = new Runnable() {
            public void run() {
                processTask(task);
            }
        };
        Future future = executor.submit(newThreadTask);
        futureQueue.add(future);
    }
    
    private void processTask(IndexTask task) {
        long start = System.currentTimeMillis();
        try {
            checkReadinessProcessResourceMap(task);
            if (task.isDeleteTask()) {
                logger.info("+++++++++++++start to process delete index task for "+task.getPid()+" in thread "+Thread.currentThread().getId());
                //System.out.println("+++++++++++++start to process delete index task for "+task.getPid()+" in thread "+Thread.currentThread().getId());
                deleteProcessor.process(task);
                //System.out.println("+++++++++++++end to process delete index task for "+task.getPid()+" in thread "+Thread.currentThread().getId());
                logger.info("+++++++++++++end to process delete index task for "+task.getPid()+" in thread "+Thread.currentThread().getId());
            } else {
                logger.info("*********************start to process update index task for "+task.getPid()+" in thread "+Thread.currentThread().getId());
                //System.out.println("*********************start to process update index task for "+task.getPid()+" in thread "+Thread.currentThread().getId());
                updateProcessor.process(task);
                //System.out.println("*********************end to process update index task for "+task.getPid()+" in thread "+Thread.currentThread().getId());
                logger.info("*********************end to process update index task for "+task.getPid()+" in thread "+Thread.currentThread().getId());
            }
        } catch (Exception e) {
            logger.error("Unable to process task for pid: " + task.getPid(), e);
            if(task != null) {
                repo.delete(task.getId());
            }
            handleFailedTask(task);
            return;
        } finally {
            removeIdsFromResourceMapReferencedSetAndSeriesIdsSet(task);
        }
        if(task != null) {
            repo.delete(task.getId());
        }
        /*if(task != null && task instanceof ResourceMapIndexTask) {
            repo.delete(task.getId());//the ReousrceMapIndexTask is not the original object. repo.delete(IndexTask) wouldn't work.
        } else {
            repo.delete(task);
        }*/
        
        logger.info("Indexing complete for pid: " + task.getPid());
        perfLog.log("MockIndexTaskProcessor.processTasks process pid "+task.getPid(), System.currentTimeMillis()-start);
    }
    
    /*
     * When a resource map object is indexed, it will change the solr index of its referenced ids.
     * It can be a race condition. https://redmine.dataone.org/issues/7771
     * So we maintain a set containing the referenced ids which the resource map objects are currently being processed.
     * Before we start to process a new resource map object on a thread, we need to check the set.
     */
    private void checkReadinessProcessResourceMap(IndexTask task) throws Exception{
        //only handle resourceMap index task
        if(task != null && task instanceof ResourceMapIndexTask ) {
            logger.debug("$$$$$$$$$$$$$$$$$ the index task "+task.getPid()+" is a resource map task in the the thread "+ Thread.currentThread().getId());
            lock.lock();
            try {
                ResourceMapIndexTask resourceMapTask = (ResourceMapIndexTask) task;
                List<String> referencedIds = resourceMapTask.getReferencedIds();
                if(referencedIds != null) {
                    for (String id : referencedIds) {
                        if(SeriesIdResolver.isSeriesId(TypeFactory.buildIdentifier(id))) {
                            boolean isClear = false;
                            for(int i=0; i<MAXATTEMPTS; i++) {
                                if(seriesIdsSet.contains(id)) {
                                    System.out.println("###################Another index task is process the object with series id " 
                                            + id + " as well. So the thread to process id "
                                            + task.getPid() + " has to wait 0.5 seconds.");
                                    Thread.sleep(500);
                                } else {
                                    isClear = true;
                                    seriesIdsSet.add(id);
                                    break;
                                }
                            }
                            if(!isClear) {
                                removeIdsFromResourceMapReferencedSetAndSeriesIdsSet(task);
                                String message = "We waited for another thread to finish indexing a pid with series id "
                                + id + " for a while. Now we quited and can't index id "+task.getPid();
                                logger.error(message);
                                throw new Exception(message);
                            }
                            
                        }
                        
                        boolean clear = false;
                        for(int i=0; i<MAXATTEMPTS; i++) {
                            if(id != null && !id.trim().equals("") && referencedIdsMap.containsKey(id)) {
                                //another resource map is process the referenced id as well.
                                if(resourceMapTask.getPid().equals(referencedIdsMap.get(id))) {
                                    // this referenced id was put by the same resource map object. So we don't need wait.
                                    clear = true;
                                    break;
                                } else {
                                    // this referenced id was put by another resource map object. Wait .5 second.
                                    logger.info("###################Another resource map is process the referenced id "+id+" as well. So the thread to process id "
                                            +resourceMapTask.getPid()+" has to wait 0.5 seconds.");
                                    Thread.sleep(500);
                                }
                            } else if (id != null && !id.trim().equals("") && !referencedIdsMap.containsKey(id)) {
                                //no resource map is process the referenced id. It is good and we add it to the set.
                                referencedIdsMap.put(id, resourceMapTask.getPid());
                                clear = true;
                                break;
                            }
                        }
                        if(!clear) {
                            removeIdsFromResourceMapReferencedSetAndSeriesIdsSet(resourceMapTask);
                            String message = "We waited for another thread to finish indexing a resource map which has the referenced id "+id+
                                               " for a while. Now we quited and can't index id "+resourceMapTask.getPid();
                            logger.error(message);
                            throw new Exception(message);
                        }
                        
                    }
                }
                
            } catch (Exception e) {
                throw e;
            } finally {
                lock.unlock();
            }
        } else {
            if (logger.isDebugEnabled())
                logger.debug("xxxxxxxxxxxxxxxxxxxx the index task "+task.getPid()
                        +" is NOT a resource map task in the the thread "+ Thread.currentThread().getId());
            
            Identifier pid = new Identifier();
            pid.setValue(task.getPid());
            SystemMetadata smd = HazelcastClientFactory.getSystemMetadataMap().get(pid);
            if(smd != null) {
                Identifier sid = smd.getSeriesId();
                if(sid != null && sid.getValue() != null && !sid.getValue().trim().equals("")) {
                    lock.lock();
                    try {
                        if (logger.isDebugEnabled())
                            logger.debug("xxxxxxxxxxxxxxxxxxxx the index task "+task.getPid()
                                    +" has a sid "+sid.getValue()+" in the the thread "+ Thread.currentThread().getId());
                        boolean clear = false;
                        for(int i=0; i<MAXATTEMPTS; i++) {
                            if(seriesIdsSet.contains(sid.getValue())) {
                                if (logger.isDebugEnabled())
                                    logger.debug("###################Another index task is process the object with series id "
                                            +sid.getValue()+" as well. So the thread to process id "
                                            +task.getPid()+" has to wait 0.5 seconds.");
                                Thread.sleep(500);
                            } else {
                                clear = true;
                                seriesIdsSet.add(sid.getValue());
                                break;
                            }
                        }
                        if(!clear) {
                            removeIdsFromResourceMapReferencedSetAndSeriesIdsSet(task);
                            String message = "We waited for another thread to finish indexing a pid with series id "+sid.getValue()+
                                               " for a while. Now we quited and can't index id "+task.getPid();
                            logger.error(message);
                            throw new Exception(message);
                        }
                    } catch (Exception e) {
                        throw e;
                    } finally {
                        lock.unlock();
                    }
                }
                
            }
            
        }
    }
    
    
    /*
     * Remove the referenced ids from the set.
     */
    private void removeIdsFromResourceMapReferencedSetAndSeriesIdsSet(IndexTask task) {
        if(task != null && task instanceof ResourceMapIndexTask ) {
            ResourceMapIndexTask resourceMapTask = (ResourceMapIndexTask) task;
            List<String> referencedIds = resourceMapTask.getReferencedIds();
            if(referencedIds != null) {
                for (String id : referencedIds) {
                    if(id != null) {
                        referencedIdsMap.remove(id);
                        seriesIdsSet.remove(id);
                    }
                }
            }
        } else {
            Identifier id = new Identifier();
            id.setValue(task.getPid());
            SystemMetadata smd = HazelcastClientFactory.getSystemMetadataMap().get(id);
            logger.debug("remove the series id (if it has) for +++++ "+task.getPid());
            if(smd != null && smd.getSeriesId()!= null && smd.getSeriesId().getValue()!= null) {
                logger.debug("remove the series id "+smd.getSeriesId().getValue()+" for +++++ "+task.getPid());
                seriesIdsSet.remove(smd.getSeriesId().getValue());
            }
        }
    }
    
    /*
     * Use multiple threads to process the index task
     */
    /*private void batchProcessTasksOnThread(final List<IndexTask> taskList) {
        logger.info("using multiple threads to process BATCHED index tasks and the size of the pool is "+NUMOFPROCESSOR);
        Runnable newThreadTask = new Runnable() {
            public void run() {
                batchProcessTasks(taskList);
            }
        };
        Future future = executor.submit(newThreadTask);
        futureQueue.add(future);
    }*/

    /*private void batchProcessTasks(List<IndexTask> taskList) {
        if(taskList == null) {
            return;
        }
        long startBatch = System.currentTimeMillis();
        int size = taskList.size();
        logger.info("batch processing: " + size + " tasks");
        
        List<IndexTask> updateTasks = new ArrayList<>();
        List<IndexTask> deleteTasks = new ArrayList<>();
        
        for (IndexTask task : taskList) {
            if (task.isDeleteTask()) {
                logger.info("Adding delete task to be processed for pid: " + task.getPid());
                deleteTasks.add(task);
            } else {
                logger.info("Adding update task to be processed for pid: " + task.getPid());
                updateTasks.add(task);
            }
        }    
        
        logger.info("update tasks: " + updateTasks.size());
        logger.info("delete tasks: " + deleteTasks.size());
        try {
            batchCheckReadinessProcessResourceMap(taskList);
            try {
                deleteProcessor.process(deleteTasks);
                
                for (IndexTask task : deleteTasks) {
                    repo.delete(task);
                    logger.info("Indexing complete for pid: " + task.getPid());
                }
                
            } catch (Exception e) {
                StringBuilder failedPids = new StringBuilder(); 
                for (IndexTask task : deleteTasks)
                    failedPids.append(task.getPid()).append(", ");
                logger.error("Unable to process tasks for pids: " + failedPids.toString(), e);
                handleFailedTasks(deleteTasks);
            }
            
            try {
                updateProcessor.process(updateTasks);
                
                for (IndexTask task : updateTasks) {
                    repo.delete(task);
                    logger.info("Indexing complete for pid: " + task.getPid());
                }
                
            } catch (Exception e) {
                StringBuilder failedPids = new StringBuilder(); 
                for (IndexTask task : updateTasks)
                    failedPids.append(task.getPid()).append(", ");
                logger.error("Unable to process tasks for pids: " + failedPids.toString(), e);
                handleFailedTasks(deleteTasks);
            }
        } catch(Exception e) {
            logger.error("Couldn't batch indexing the tasks since "+e.getMessage());
        } finally {
            batchRemoveIdsFromResourceMapReferencedSet(taskList);
        }
        
        perfLog.log("MockIndexTaskProcessor.batchProcessTasks process "+size+" objects in ", System.currentTimeMillis()-startBatch);
    }*/
    
    private void batchCheckReadinessProcessResourceMap(List<IndexTask> tasks) throws Exception{
        lock.lock();
        try {
            if(tasks != null) {
                for (IndexTask task : tasks) {
                    checkReadinessProcessResourceMap(task);
                }
            }
            
        } finally {
            lock.unlock();
        }
    }
    
    private void batchRemoveIdsFromResourceMapReferencedSet(List<IndexTask> tasks) {
        if(tasks != null) {
            for (IndexTask task : tasks) {
                removeIdsFromResourceMapReferencedSetAndSeriesIdsSet(task);
            }
        }
    }
    
    private void handleFailedTasks(List<IndexTask> tasks) {
        for (IndexTask task : tasks) {
            task.markFailed();
            saveTaskWithoutDuplication(task);
        }
    }
    
    private void handleFailedTask(IndexTask task) {
        if(task != null) {
            task.markFailed();
            saveTaskWithoutDuplication(task);
        }
       
    }

    /**
     * returns the next ready task from the given queue. Readiness is determined
     * by availability of the object (unless it's a data object), and the prior
     * indexing of all Dataone objects referenced by resource maps.
     * 
     * @param queue
     * @return the next ready task
     */
    private IndexTask getNextIndexTask(List<IndexTask> queue) {
        IndexTask task = null;
        while (task == null && queue.isEmpty() == false) {
            task = queue.remove(0);

            if (task == null) continue;
            
            task.markInProgress();
            task = saveTask(task);
            
            if (task == null) continue;  // saveTask can return null

            logger.info("Start of indexing pid: " + task.getPid());

            if (task.isDeleteTask()) {
                return task;
            }

            if (!isObjectPathReady(task)) {
                task.markNew();
                saveTaskWithoutDuplication(task);
                logger.info("Task for pid: " + task.getPid() + " not processed since the object path is not ready.");
                task = null;
                continue;
            }

            if (representsResourceMap(task)) {
                boolean ready = true;
                ResourceMap rm = null;
                List<String> referencedIds = null;
                try {
                    rm = ResourceMapFactory.buildResourceMap(task.getObjectPath());
                    referencedIds = rm.getAllDocumentIDs();

                    boolean found = referencedIds.remove(task.getPid());
                    // list.remove removes only the first occurrence found, so keep going until non left.
                    while(found) {
                        found = referencedIds.remove(task.getPid());
                    }

                    if (areAllReferencedDocsIndexed(referencedIds) == false) {
                        logger.info("****************Not all map resource references indexed for map: " + task.getPid()
                                + ".  Marking new and continuing...");
                        ready = false;
                    }
                } catch (OREParserException oreException) {
                    ready = false;
                    logger.error("Unable to parse ORE doc: " + task.getPid()
                            + ".  Unrecoverable parse error: task will not be re-tried.");
                    if (logger.isTraceEnabled()) {
                        oreException.printStackTrace();

                    }
                } catch (Exception e) {
                    ready = false;
                    logger.error("unable to load resource for pid: " + task.getPid()
                            + " at object path: " + task.getObjectPath()
                            + ".  Marking new and continuing...");
                }
                if(!ready) {
                    task.markNew();
                    saveTaskWithoutDuplication(task);
                    logger.info("Task for resource map pid: " + task.getPid() + " not processed.");
                    task = null;
                    continue;
                } else {
                    logger.info("the original index task - "+task.toString());
                    ResourceMapIndexTask resourceMapIndexTask = new ResourceMapIndexTask();
                    resourceMapIndexTask.copy(task);
                    resourceMapIndexTask.setReferencedIds(referencedIds);
                    task = resourceMapIndexTask;
                    if(task instanceof ResourceMapIndexTask) {
                        logger.info("the new index task is a ResourceMapIndexTask");
                        logger.info("the new index task - "+task.toString());
                    } else {
                        logger.error("Something is wrong to change the IndexTask object to the ResourceMapIndexTask object ");
                    }
                    
                }
                
            }
        }
        return task;
    }

    /*private boolean isResourceMapReadyToIndex(IndexTask task, List<IndexTask> queue) {
        boolean ready = true;

        if (representsResourceMap(task)) {
            ResourceMap rm = null;
            try {
                rm = ResourceMapFactory.buildResourceMap(task.getObjectPath());
                List<String> referencedIds = rm.getAllDocumentIDs();
                referencedIds.remove(task.getPid());

                if (areAllReferencedDocsIndexed(referencedIds) == false) {
                    logger.info("Not all map resource references indexed for map: " + task.getPid()
                            + ".  Marking new and continuing...");
                    ready = false;
                }
            } catch (OREParserException oreException) {
                ready = false;
                logger.error("Unable to parse ORE doc: " + task.getPid()
                        + ".  Unrecoverable parse error: task will not be re-tried.");
                if (logger.isTraceEnabled()) {
                    oreException.printStackTrace();

                }
            } catch (Exception e) {
                ready = false;
                logger.error("unable to load resource for pid: " + task.getPid()
                        + " at object path: " + task.getObjectPath()
                        + ".  Marking new and continuing...");
            }
        }

        return ready;
    }*/

    /**
     * Referenced documents are PIDs or SIDs that are either archived or not.
     * This routine checks the solr index first, then failing to find a record,
     * checks HZ system metadata map.  
     * 
     * @param referencedIds
     * @return
     */
    private boolean areAllReferencedDocsIndexed(List<String> referencedIds) {
        if (referencedIds == null || referencedIds.size() == 0) {
            return true; // empty reference map...ok/ready to index.
        }
        List<SolrDoc> updateDocuments = null;
        int numberOfIndexedOrRemovedReferences = 0;
        try {
            updateDocuments = httpService.getDocumentsById(this.solrQueryUri, referencedIds);
            numberOfIndexedOrRemovedReferences = 0;
            for (String id : referencedIds) {
                boolean foundId = false;
                for (SolrDoc solrDoc : updateDocuments) {
                    if (solrDoc.getIdentifier().equals(id) || id.equals(solrDoc.getSeriesId())) {
                        foundId = true;
                        numberOfIndexedOrRemovedReferences++;
                        break;
                    }
                }
                if (foundId == false) {
                    Identifier pid = new Identifier();
                    pid.setValue(id);
                    logger.info("Identifier " + id
                            + " was not found in the referenced id list in the Solr search index.");
                    SystemMetadata smd = HazelcastClientFactory.getSystemMetadataMap().get(pid);
                    if (smd != null && notVisibleInIndex(smd)) {
                        numberOfIndexedOrRemovedReferences++;
                    }
                }
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            return false;
        }
        return referencedIds.size() == numberOfIndexedOrRemovedReferences;
    }

    private boolean notVisibleInIndex(SystemMetadata smd) {
        
        if (smd == null) 
            return false;
            
        return ! SolrDoc.visibleInIndex(smd);
    }
//        return (!SolrDoc.visibleInIndex(smd) && smd != null);


    private boolean representsResourceMap(IndexTask task) {
        return ForesiteResourceMap.representsResourceMap(task.getFormatId());
    }

    /**
     * returns true if the task is for a data object, otherwise, if the objectPath 
     * in the task is not filled, attempt to fill it via hazelclient.
     * if not available, the task is not ready and false is returned.
     * if object path is available, update the task and return true
     */
    private boolean isObjectPathReady(IndexTask task) {
        
        if (isDataObject(task)) 
            return true;

        boolean ok = true;

        if (task.getObjectPath() == null) {
            String objectPath = retrieveHzObjectPath(task.getPid());
            if (objectPath == null) {
                ok = false;
                evictHzObjectPathEntry(task.getPid());
                logger.info("Object path for pid: " + task.getPid()
                        + " is not available.  Object path entry will be evicting from map.  "
                        + "Task will be retried.");
            }
            task.setObjectPath(objectPath);
        }

        // TODO: if the task has the wrong path to begin with, but the right path 
        // exists in Hazelcast, we never get to for the correct information.
        // if the objectPath is invalid, should we not check Hazelcast to see if it 
        // has a valid path?

        if (task.getObjectPath() != null) {
            File objectPathFile = new File(task.getObjectPath());
            if (!objectPathFile.exists()) {
                // object path is present but doesn't correspond to a file
                // this task is not ready to index.
                ok = false;
                logger.info("Object path exists for pid: " + task.getPid()
                        + " however the file location: " + task.getObjectPath()
                        + " does not exist.  "
                        + "Marking not ready - task will be marked new and retried.");
            }
        }
        return ok;
    }

    private boolean isDataObject(IndexTask task) {
        ObjectFormat format = null;
        try {
            ObjectFormatIdentifier formatId = new ObjectFormatIdentifier();
            formatId.setValue(task.getFormatId());
            format = ObjectFormatCache.getInstance().getFormat(formatId);
        } catch (BaseException e) {
            logger.error(e.getMessage(), e);
            return false;
        }
        return FORMAT_TYPE_DATA.equals(format.getFormatType());
    }

    private String retrieveHzObjectPath(String pid) {
        Identifier PID = new Identifier();
        PID.setValue(pid);
        return HazelcastClientFactory.getObjectPathMap().get(PID);
    }

    private void evictHzObjectPathEntry(String pid) {
        Identifier PID = new Identifier();
        PID.setValue(pid);
        HazelcastClientFactory.getObjectPathMap().evict(PID);
    }

    private List<IndexTask> getIndexTaskQueue() {
        long getIndexTasksStart = System.currentTimeMillis();
        List<IndexTask> indexTasks = repo.findByStatusOrderByPriorityAscTaskModifiedDateAsc(IndexTask.STATUS_NEW);
        perfLog.log("MockIndexTaskProcessor.getIndexTaskQueue() fetching NEW IndexTasks from repo", System.currentTimeMillis() - getIndexTasksStart);
        return indexTasks;
    }

    private List<IndexTask> getIndexTaskRetryQueue() {
        return repo.findByStatusAndNextExecutionLessThan(IndexTask.STATUS_FAILED,
                System.currentTimeMillis());
    }

    /*
     * @return - Can return null upon error
     */
    private IndexTask saveTask(IndexTask task) {
        try {
            task = repo.save(task);
            logger.info("IndexTaskProcess.saveTask save the index task "+task.getPid());
        } catch (HibernateOptimisticLockingFailureException e) {
            logger.error("Unable to update index task for pid: " + task.getPid() + ".");
            task = null;
        }
        return task;
    }
    
    /*
     * Save the task only if no new or failed task already exists for the pid
     */
    private void saveTaskWithoutDuplication(IndexTask task) {
        if(task != null) {
            if(!newOrFailedIndexTaskExists(task.getPid())) {
                    saveTask(task);
            }
        }
    }
    
    /**
     * If an index task exists with the new or failed status for the given id
     * @param id
     * @return true if the index task with new or failed status exists; otherwise false.
     */
    private boolean newOrFailedIndexTaskExists(String id) {
        logger.info("IndexTaskProcess.newOrFailedIndexTaskExists for id "+id);
        boolean exist=false;
        if(id != null ) {
            List<IndexTask> itList = repo.findByPidAndStatus(id, IndexTask.STATUS_NEW);
            if(itList != null && !itList.isEmpty()) {
                logger.info("IndexTaskProcess.newOrFailedIndexTaskExists did find a new-status index task for id "+id);
                exist = true;
            }
            if(!exist) {
                itList = repo.findByPidAndStatus(id, IndexTask.STATUS_FAILED);
                if(itList != null && !itList.isEmpty()) {
                    logger.info("IndexTaskProcess.newOrFailedIndexTaskExists did find a failed-status index task for id "+id);
                    exist = true;
                }
            }
        }
        
        return exist;
    }

//    private Document loadDocument(IndexTask task) {
//        Document docObject = null;
//        try {
//            docObject = XmlDocumentUtility.loadDocument(task.getObjectPath());
//        } catch (Exception e) {
//            logger.error(e.getMessage(), e);
//        }
//        if (docObject == null) {
//            logger.error("Could not load OBJECT file for ID,Path=" + task.getPid() + ", "
//                    + task.getObjectPath());
//        }
//        return docObject;
//    }

    public void setSolrQueryUri(String uri) {
        this.solrQueryUri = uri;
    }
    
    /**
     * Get the ExecutorService to handle multiple thread
     * @return
     */
    public ExecutorService getExecutorService() {
        return executor;
    }
    
    /**
     * Get the last 100 futures of the index task threads scheduled by executor service.
     * @return
     */
//    public Queue<Future> getFutureQueue() {
//        return futureQueue;
//    }
}