/** * This work was created by participants in the DataONE project, and is * jointly copyrighted by participating institutions in DataONE. For * more information on DataONE, see our web site at http://dataone.org. * * Copyright ${year} * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * * $Id$ */ package org.dataone.cn.indexer.processor; import java.io.File; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Queue; import java.util.Vector; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.collections4.queue.CircularFifoQueue; import org.apache.log4j.Logger; import org.dataone.client.v2.formats.ObjectFormatCache; import org.dataone.cn.hazelcast.HazelcastClientFactory; import org.dataone.cn.index.processor.IndexTaskProcessingStrategy; import org.dataone.cn.index.task.IndexTask; import org.dataone.cn.index.task.IndexTaskRepository; import org.dataone.cn.index.task.ResourceMapIndexTask; import org.dataone.cn.index.util.PerformanceLogger; import org.dataone.cn.indexer.XmlDocumentUtility; import org.dataone.cn.indexer.parser.utility.SeriesIdResolver; import org.dataone.cn.indexer.resourcemap.ForesiteResourceMap; import org.dataone.cn.indexer.resourcemap.ResourceMap; import org.dataone.cn.indexer.resourcemap.ResourceMapFactory; import org.dataone.cn.indexer.solrhttp.HTTPService; import org.dataone.cn.indexer.solrhttp.SolrDoc; import org.dataone.configuration.Settings; import org.dataone.service.exceptions.BaseException; import org.dataone.service.types.v1.Identifier; import org.dataone.service.types.v1.ObjectFormatIdentifier; import org.dataone.service.types.v1.TypeFactory; import org.dataone.service.types.v2.ObjectFormat; import org.dataone.service.types.v2.SystemMetadata; import org.dspace.foresite.OREParserException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.orm.hibernate3.HibernateOptimisticLockingFailureException; import org.w3c.dom.Document; /** * MockIndexTaskProcessor is the controller class for processing IndexTasks. These * tasks are generated by the IndexTaskGenerator class and associated * collaborators. MockIndexTaskProcessor uses the IndexTaskRepository to locate * IndexTasks for processing and delegates to IndexTaskProcessingStrategy * implementations for actual processing behavior. * * @author sroseboo * */ public class MockIndexTaskProcessor { private static Logger logger = Logger.getLogger(MockIndexTaskProcessor.class.getName()); private static final String FORMAT_TYPE_DATA = "DATA"; private static final String LOAD_LOGGER_NAME = "indexProcessorLoad"; private static int BATCH_UPDATE_SIZE = Settings.getConfiguration().getInt("dataone.indexing.batchUpdateSize", 1000); private static int NUMOFPROCESSOR = Settings.getConfiguration().getInt("dataone.indexing.multiThreads.processThreadPoolSize", 10); private static int MAXATTEMPTS = Settings.getConfiguration().getInt("dataone.indexing.multiThreads.resourceMapWait.maxAttempt", 10); private static int FUTUREQUEUESIZE = Settings.getConfiguration().getInt("dataone.indexing.multiThreads.futureQueueSize", 100); private static ExecutorService executor = Executors.newFixedThreadPool(NUMOFPROCESSOR); private static final ReentrantLock lock = new ReentrantLock(); private static CircularFifoQueue futureQueue = new CircularFifoQueue(FUTUREQUEUESIZE); //a concurrent map to main the information about current processing resource map objects and their referenced ids //the key is a referenced id and value is the id of resource map. private static ConcurrentHashMap referencedIdsMap = new ConcurrentHashMap(); private static ConcurrentSkipListSet seriesIdsSet = new ConcurrentSkipListSet(); @Autowired private IndexTaskRepository repo; @Autowired private IndexTaskProcessingStrategy deleteProcessor; @Autowired private IndexTaskProcessingStrategy updateProcessor; @Autowired private HTTPService httpService; @Autowired private String solrQueryUri; private PerformanceLogger perfLog = PerformanceLogger.getInstance(); public MockIndexTaskProcessor() { } /** * Processes the index task queue written by the IndexTaskGenerator, * but unlike {@link #processIndexTaskQueue()}, all IndexTasks that * add solr documents will be grouped into batches and done in one * command to solr. */ /*public void batchProcessIndexTaskQueue() { logProcessorLoad(); List queue = getIndexTaskQueue(); List batchProcessList = new ArrayList(BATCH_UPDATE_SIZE); logger.info("batchProcessIndexTaskQueue, queue size: " + queue.size() + " tasks"); IndexTask nextTask = getNextIndexTask(queue); while (nextTask != null) { batchProcessList.add(nextTask); logger.info("added task: " + nextTask.getPid()); nextTask = getNextIndexTask(queue); if (nextTask != null) logger.info("next task: " + nextTask.getPid()); logger.info("queue size: " + queue.size()); if (batchProcessList.size() >= BATCH_UPDATE_SIZE) { batchProcessTasksOnThread(batchProcessList); batchProcessList = new ArrayList(BATCH_UPDATE_SIZE); } } batchProcessTasksOnThread(batchProcessList); List retryQueue = getIndexTaskRetryQueue(); List batchProcessRetryList = new ArrayList(BATCH_UPDATE_SIZE); logger.info("batchProcessIndexTaskQueue, retry queue size: " + queue.size() + " tasks"); nextTask = getNextIndexTask(retryQueue); while (nextTask != null) { batchProcessRetryList.add(nextTask); nextTask = getNextIndexTask(retryQueue); if (batchProcessRetryList.size() >= BATCH_UPDATE_SIZE) { batchProcessTasksOnThread(batchProcessRetryList); batchProcessRetryList = new ArrayList(BATCH_UPDATE_SIZE); } } batchProcessTasksOnThread(batchProcessRetryList); }*/ /** * Start a round of IndexTask processing. The IndexTask data store is * abstracted as a queue of tasks to process ordered by priority and * modification date. Typically invoked periodically by a quartz scheduled * job. */ public void processIndexTaskQueue() { logProcessorLoad(); List queue = getIndexTaskQueue(); IndexTask task = getNextIndexTask(queue); while (task != null) { processTaskOnThread(task); task = getNextIndexTask(queue); } processFailedIndexTaskQueue(); /*List retryQueue = getIndexTaskRetryQueue(); task = getNextIndexTask(retryQueue); while (task != null) { processTaskOnThread(task); task = getNextIndexTask(retryQueue); }*/ } public void mockProcessIndexTaskQueue() { System.out.println(" entering mockProcessIndexTaskQueue..."); try { Thread.sleep(20000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println(" ... exiting mockProcessIndexTaskQueue."); } /** * Index the given index queue * @param queue */ public void processIndexTaskQueue(List queue) { IndexTask task = null; if(queue != null) { int size = queue.size(); task = getNextIndexTask(queue); while (task != null) { processTaskOnThread(task); task = getNextIndexTask(queue); } logger.info("MockIndexTaskProcessor.processIndexTaskQueue - finish submitting the index task queue with the size "+size+ " and current queue size is down to "+queue.size()); } } /** * Index the index task which currently has the failed status on the index task repository */ public void processFailedIndexTaskQueue() { List retryQueue = getIndexTaskRetryQueue(); if(retryQueue != null) { IndexTask task = getNextIndexTask(retryQueue); logger.info("MockIndexTaskProcessor.processFailedIndexTaskQueue with size "+retryQueue.size()); while (task != null) { processTaskOnThread(task); task = getNextIndexTask(retryQueue); } } } /** * Logs the number of {@link IndexTask}s that need to be processed * and the number of tasks that have failed. */ private void logProcessorLoad() { Logger loadLogger = Logger.getLogger(LOAD_LOGGER_NAME); Long newTasks = null; Long failedTasks = null; try { newTasks = repo.countByStatus(IndexTask.STATUS_NEW); failedTasks = repo.countByStatus(IndexTask.STATUS_FAILED); } catch (Exception e) { logger.error("Unable to count NEW or FAILED tasks in task index repository.", e); } loadLogger.info("new tasks:" + newTasks + ", tasks previously failed: " + failedTasks ); } /* * Use multiple threads to process the index task */ private void processTaskOnThread(final IndexTask task) { logger.info("using multiple threads to process index and the size of the thread pool is "+NUMOFPROCESSOR); Runnable newThreadTask = new Runnable() { public void run() { processTask(task); } }; Future future = executor.submit(newThreadTask); futureQueue.add(future); } private void processTask(IndexTask task) { long start = System.currentTimeMillis(); try { checkReadinessProcessResourceMap(task); if (task.isDeleteTask()) { logger.info("+++++++++++++start to process delete index task for "+task.getPid()+" in thread "+Thread.currentThread().getId()); //System.out.println("+++++++++++++start to process delete index task for "+task.getPid()+" in thread "+Thread.currentThread().getId()); deleteProcessor.process(task); //System.out.println("+++++++++++++end to process delete index task for "+task.getPid()+" in thread "+Thread.currentThread().getId()); logger.info("+++++++++++++end to process delete index task for "+task.getPid()+" in thread "+Thread.currentThread().getId()); } else { logger.info("*********************start to process update index task for "+task.getPid()+" in thread "+Thread.currentThread().getId()); //System.out.println("*********************start to process update index task for "+task.getPid()+" in thread "+Thread.currentThread().getId()); updateProcessor.process(task); //System.out.println("*********************end to process update index task for "+task.getPid()+" in thread "+Thread.currentThread().getId()); logger.info("*********************end to process update index task for "+task.getPid()+" in thread "+Thread.currentThread().getId()); } } catch (Exception e) { logger.error("Unable to process task for pid: " + task.getPid(), e); if(task != null) { repo.delete(task.getId()); } handleFailedTask(task); return; } finally { removeIdsFromResourceMapReferencedSetAndSeriesIdsSet(task); } if(task != null) { repo.delete(task.getId()); } /*if(task != null && task instanceof ResourceMapIndexTask) { repo.delete(task.getId());//the ReousrceMapIndexTask is not the original object. repo.delete(IndexTask) wouldn't work. } else { repo.delete(task); }*/ logger.info("Indexing complete for pid: " + task.getPid()); perfLog.log("MockIndexTaskProcessor.processTasks process pid "+task.getPid(), System.currentTimeMillis()-start); } /* * When a resource map object is indexed, it will change the solr index of its referenced ids. * It can be a race condition. https://redmine.dataone.org/issues/7771 * So we maintain a set containing the referenced ids which the resource map objects are currently being processed. * Before we start to process a new resource map object on a thread, we need to check the set. */ private void checkReadinessProcessResourceMap(IndexTask task) throws Exception{ //only handle resourceMap index task if(task != null && task instanceof ResourceMapIndexTask ) { logger.debug("$$$$$$$$$$$$$$$$$ the index task "+task.getPid()+" is a resource map task in the the thread "+ Thread.currentThread().getId()); lock.lock(); try { ResourceMapIndexTask resourceMapTask = (ResourceMapIndexTask) task; List referencedIds = resourceMapTask.getReferencedIds(); if(referencedIds != null) { for (String id : referencedIds) { if(SeriesIdResolver.isSeriesId(TypeFactory.buildIdentifier(id))) { boolean isClear = false; for(int i=0; i referencedIds = resourceMapTask.getReferencedIds(); if(referencedIds != null) { for (String id : referencedIds) { if(id != null) { referencedIdsMap.remove(id); seriesIdsSet.remove(id); } } } } else { Identifier id = new Identifier(); id.setValue(task.getPid()); SystemMetadata smd = HazelcastClientFactory.getSystemMetadataMap().get(id); logger.debug("remove the series id (if it has) for +++++ "+task.getPid()); if(smd != null && smd.getSeriesId()!= null && smd.getSeriesId().getValue()!= null) { logger.debug("remove the series id "+smd.getSeriesId().getValue()+" for +++++ "+task.getPid()); seriesIdsSet.remove(smd.getSeriesId().getValue()); } } } /* * Use multiple threads to process the index task */ /*private void batchProcessTasksOnThread(final List taskList) { logger.info("using multiple threads to process BATCHED index tasks and the size of the pool is "+NUMOFPROCESSOR); Runnable newThreadTask = new Runnable() { public void run() { batchProcessTasks(taskList); } }; Future future = executor.submit(newThreadTask); futureQueue.add(future); }*/ /*private void batchProcessTasks(List taskList) { if(taskList == null) { return; } long startBatch = System.currentTimeMillis(); int size = taskList.size(); logger.info("batch processing: " + size + " tasks"); List updateTasks = new ArrayList<>(); List deleteTasks = new ArrayList<>(); for (IndexTask task : taskList) { if (task.isDeleteTask()) { logger.info("Adding delete task to be processed for pid: " + task.getPid()); deleteTasks.add(task); } else { logger.info("Adding update task to be processed for pid: " + task.getPid()); updateTasks.add(task); } } logger.info("update tasks: " + updateTasks.size()); logger.info("delete tasks: " + deleteTasks.size()); try { batchCheckReadinessProcessResourceMap(taskList); try { deleteProcessor.process(deleteTasks); for (IndexTask task : deleteTasks) { repo.delete(task); logger.info("Indexing complete for pid: " + task.getPid()); } } catch (Exception e) { StringBuilder failedPids = new StringBuilder(); for (IndexTask task : deleteTasks) failedPids.append(task.getPid()).append(", "); logger.error("Unable to process tasks for pids: " + failedPids.toString(), e); handleFailedTasks(deleteTasks); } try { updateProcessor.process(updateTasks); for (IndexTask task : updateTasks) { repo.delete(task); logger.info("Indexing complete for pid: " + task.getPid()); } } catch (Exception e) { StringBuilder failedPids = new StringBuilder(); for (IndexTask task : updateTasks) failedPids.append(task.getPid()).append(", "); logger.error("Unable to process tasks for pids: " + failedPids.toString(), e); handleFailedTasks(deleteTasks); } } catch(Exception e) { logger.error("Couldn't batch indexing the tasks since "+e.getMessage()); } finally { batchRemoveIdsFromResourceMapReferencedSet(taskList); } perfLog.log("MockIndexTaskProcessor.batchProcessTasks process "+size+" objects in ", System.currentTimeMillis()-startBatch); }*/ private void batchCheckReadinessProcessResourceMap(List tasks) throws Exception{ lock.lock(); try { if(tasks != null) { for (IndexTask task : tasks) { checkReadinessProcessResourceMap(task); } } } finally { lock.unlock(); } } private void batchRemoveIdsFromResourceMapReferencedSet(List tasks) { if(tasks != null) { for (IndexTask task : tasks) { removeIdsFromResourceMapReferencedSetAndSeriesIdsSet(task); } } } private void handleFailedTasks(List tasks) { for (IndexTask task : tasks) { task.markFailed(); saveTaskWithoutDuplication(task); } } private void handleFailedTask(IndexTask task) { if(task != null) { task.markFailed(); saveTaskWithoutDuplication(task); } } /** * returns the next ready task from the given queue. Readiness is determined * by availability of the object (unless it's a data object), and the prior * indexing of all Dataone objects referenced by resource maps. * * @param queue * @return the next ready task */ private IndexTask getNextIndexTask(List queue) { IndexTask task = null; while (task == null && queue.isEmpty() == false) { task = queue.remove(0); if (task == null) continue; task.markInProgress(); task = saveTask(task); if (task == null) continue; // saveTask can return null logger.info("Start of indexing pid: " + task.getPid()); if (task.isDeleteTask()) { return task; } if (!isObjectPathReady(task)) { task.markNew(); saveTaskWithoutDuplication(task); logger.info("Task for pid: " + task.getPid() + " not processed since the object path is not ready."); task = null; continue; } if (representsResourceMap(task)) { boolean ready = true; ResourceMap rm = null; List referencedIds = null; try { rm = ResourceMapFactory.buildResourceMap(task.getObjectPath()); referencedIds = rm.getAllDocumentIDs(); boolean found = referencedIds.remove(task.getPid()); // list.remove removes only the first occurrence found, so keep going until non left. while(found) { found = referencedIds.remove(task.getPid()); } if (areAllReferencedDocsIndexed(referencedIds) == false) { logger.info("****************Not all map resource references indexed for map: " + task.getPid() + ". Marking new and continuing..."); ready = false; } } catch (OREParserException oreException) { ready = false; logger.error("Unable to parse ORE doc: " + task.getPid() + ". Unrecoverable parse error: task will not be re-tried."); if (logger.isTraceEnabled()) { oreException.printStackTrace(); } } catch (Exception e) { ready = false; logger.error("unable to load resource for pid: " + task.getPid() + " at object path: " + task.getObjectPath() + ". Marking new and continuing..."); } if(!ready) { task.markNew(); saveTaskWithoutDuplication(task); logger.info("Task for resource map pid: " + task.getPid() + " not processed."); task = null; continue; } else { logger.info("the original index task - "+task.toString()); ResourceMapIndexTask resourceMapIndexTask = new ResourceMapIndexTask(); resourceMapIndexTask.copy(task); resourceMapIndexTask.setReferencedIds(referencedIds); task = resourceMapIndexTask; if(task instanceof ResourceMapIndexTask) { logger.info("the new index task is a ResourceMapIndexTask"); logger.info("the new index task - "+task.toString()); } else { logger.error("Something is wrong to change the IndexTask object to the ResourceMapIndexTask object "); } } } } return task; } /*private boolean isResourceMapReadyToIndex(IndexTask task, List queue) { boolean ready = true; if (representsResourceMap(task)) { ResourceMap rm = null; try { rm = ResourceMapFactory.buildResourceMap(task.getObjectPath()); List referencedIds = rm.getAllDocumentIDs(); referencedIds.remove(task.getPid()); if (areAllReferencedDocsIndexed(referencedIds) == false) { logger.info("Not all map resource references indexed for map: " + task.getPid() + ". Marking new and continuing..."); ready = false; } } catch (OREParserException oreException) { ready = false; logger.error("Unable to parse ORE doc: " + task.getPid() + ". Unrecoverable parse error: task will not be re-tried."); if (logger.isTraceEnabled()) { oreException.printStackTrace(); } } catch (Exception e) { ready = false; logger.error("unable to load resource for pid: " + task.getPid() + " at object path: " + task.getObjectPath() + ". Marking new and continuing..."); } } return ready; }*/ /** * Referenced documents are PIDs or SIDs that are either archived or not. * This routine checks the solr index first, then failing to find a record, * checks HZ system metadata map. * * @param referencedIds * @return */ private boolean areAllReferencedDocsIndexed(List referencedIds) { if (referencedIds == null || referencedIds.size() == 0) { return true; // empty reference map...ok/ready to index. } List updateDocuments = null; int numberOfIndexedOrRemovedReferences = 0; try { updateDocuments = httpService.getDocumentsById(this.solrQueryUri, referencedIds); numberOfIndexedOrRemovedReferences = 0; for (String id : referencedIds) { boolean foundId = false; for (SolrDoc solrDoc : updateDocuments) { if (solrDoc.getIdentifier().equals(id) || id.equals(solrDoc.getSeriesId())) { foundId = true; numberOfIndexedOrRemovedReferences++; break; } } if (foundId == false) { Identifier pid = new Identifier(); pid.setValue(id); logger.info("Identifier " + id + " was not found in the referenced id list in the Solr search index."); SystemMetadata smd = HazelcastClientFactory.getSystemMetadataMap().get(pid); if (smd != null && notVisibleInIndex(smd)) { numberOfIndexedOrRemovedReferences++; } } } } catch (Exception e) { logger.error(e.getMessage(), e); return false; } return referencedIds.size() == numberOfIndexedOrRemovedReferences; } private boolean notVisibleInIndex(SystemMetadata smd) { if (smd == null) return false; return ! SolrDoc.visibleInIndex(smd); } // return (!SolrDoc.visibleInIndex(smd) && smd != null); private boolean representsResourceMap(IndexTask task) { return ForesiteResourceMap.representsResourceMap(task.getFormatId()); } /** * returns true if the task is for a data object, otherwise, if the objectPath * in the task is not filled, attempt to fill it via hazelclient. * if not available, the task is not ready and false is returned. * if object path is available, update the task and return true */ private boolean isObjectPathReady(IndexTask task) { if (isDataObject(task)) return true; boolean ok = true; if (task.getObjectPath() == null) { String objectPath = retrieveHzObjectPath(task.getPid()); if (objectPath == null) { ok = false; evictHzObjectPathEntry(task.getPid()); logger.info("Object path for pid: " + task.getPid() + " is not available. Object path entry will be evicting from map. " + "Task will be retried."); } task.setObjectPath(objectPath); } // TODO: if the task has the wrong path to begin with, but the right path // exists in Hazelcast, we never get to for the correct information. // if the objectPath is invalid, should we not check Hazelcast to see if it // has a valid path? if (task.getObjectPath() != null) { File objectPathFile = new File(task.getObjectPath()); if (!objectPathFile.exists()) { // object path is present but doesn't correspond to a file // this task is not ready to index. ok = false; logger.info("Object path exists for pid: " + task.getPid() + " however the file location: " + task.getObjectPath() + " does not exist. " + "Marking not ready - task will be marked new and retried."); } } return ok; } private boolean isDataObject(IndexTask task) { ObjectFormat format = null; try { ObjectFormatIdentifier formatId = new ObjectFormatIdentifier(); formatId.setValue(task.getFormatId()); format = ObjectFormatCache.getInstance().getFormat(formatId); } catch (BaseException e) { logger.error(e.getMessage(), e); return false; } return FORMAT_TYPE_DATA.equals(format.getFormatType()); } private String retrieveHzObjectPath(String pid) { Identifier PID = new Identifier(); PID.setValue(pid); return HazelcastClientFactory.getObjectPathMap().get(PID); } private void evictHzObjectPathEntry(String pid) { Identifier PID = new Identifier(); PID.setValue(pid); HazelcastClientFactory.getObjectPathMap().evict(PID); } private List getIndexTaskQueue() { long getIndexTasksStart = System.currentTimeMillis(); List indexTasks = repo.findByStatusOrderByPriorityAscTaskModifiedDateAsc(IndexTask.STATUS_NEW); perfLog.log("MockIndexTaskProcessor.getIndexTaskQueue() fetching NEW IndexTasks from repo", System.currentTimeMillis() - getIndexTasksStart); return indexTasks; } private List getIndexTaskRetryQueue() { return repo.findByStatusAndNextExecutionLessThan(IndexTask.STATUS_FAILED, System.currentTimeMillis()); } /* * @return - Can return null upon error */ private IndexTask saveTask(IndexTask task) { try { task = repo.save(task); logger.info("IndexTaskProcess.saveTask save the index task "+task.getPid()); } catch (HibernateOptimisticLockingFailureException e) { logger.error("Unable to update index task for pid: " + task.getPid() + "."); task = null; } return task; } /* * Save the task only if no new or failed task already exists for the pid */ private void saveTaskWithoutDuplication(IndexTask task) { if(task != null) { if(!newOrFailedIndexTaskExists(task.getPid())) { saveTask(task); } } } /** * If an index task exists with the new or failed status for the given id * @param id * @return true if the index task with new or failed status exists; otherwise false. */ private boolean newOrFailedIndexTaskExists(String id) { logger.info("IndexTaskProcess.newOrFailedIndexTaskExists for id "+id); boolean exist=false; if(id != null ) { List itList = repo.findByPidAndStatus(id, IndexTask.STATUS_NEW); if(itList != null && !itList.isEmpty()) { logger.info("IndexTaskProcess.newOrFailedIndexTaskExists did find a new-status index task for id "+id); exist = true; } if(!exist) { itList = repo.findByPidAndStatus(id, IndexTask.STATUS_FAILED); if(itList != null && !itList.isEmpty()) { logger.info("IndexTaskProcess.newOrFailedIndexTaskExists did find a failed-status index task for id "+id); exist = true; } } } return exist; } // private Document loadDocument(IndexTask task) { // Document docObject = null; // try { // docObject = XmlDocumentUtility.loadDocument(task.getObjectPath()); // } catch (Exception e) { // logger.error(e.getMessage(), e); // } // if (docObject == null) { // logger.error("Could not load OBJECT file for ID,Path=" + task.getPid() + ", " // + task.getObjectPath()); // } // return docObject; // } public void setSolrQueryUri(String uri) { this.solrQueryUri = uri; } /** * Get the ExecutorService to handle multiple thread * @return */ public ExecutorService getExecutorService() { return executor; } /** * Get the last 100 futures of the index task threads scheduled by executor service. * @return */ // public Queue getFutureQueue() { // return futureQueue; // } }