/** * '$RCSfile$' * Purpose: Implements a service for managing a Hazelcast cluster member * Copyright: 2011 Regents of the University of California and the * National Center for Ecological Analysis and Synthesis * Authors: Christopher Jones * * '$Author$' * '$Date$' * '$Revision$' * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ package edu.ucsb.nceas.metacat.dataone.hazelcast; import java.io.FileNotFoundException; import java.sql.SQLException; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.locks.Lock; import org.apache.log4j.Logger; import org.dataone.service.exceptions.InvalidSystemMetadata; import org.dataone.service.types.v1.Identifier; import org.dataone.service.types.v2.SystemMetadata; import com.hazelcast.config.Config; import com.hazelcast.config.FileSystemXmlConfig; import com.hazelcast.core.EntryEvent; import com.hazelcast.core.EntryListener; import com.hazelcast.core.Hazelcast; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.ILock; import com.hazelcast.core.IMap; import com.hazelcast.core.ISet; import com.hazelcast.core.ItemEvent; import com.hazelcast.core.ItemListener; import com.hazelcast.core.LifecycleEvent; import com.hazelcast.core.LifecycleListener; import com.hazelcast.core.Member; import com.hazelcast.core.MembershipEvent; import com.hazelcast.core.MembershipListener; import com.hazelcast.partition.Partition; import com.hazelcast.partition.PartitionService; import edu.ucsb.nceas.metacat.IdentifierManager; import edu.ucsb.nceas.metacat.McdbDocNotFoundException; import edu.ucsb.nceas.metacat.common.index.IndexTask; import edu.ucsb.nceas.metacat.common.index.event.IndexEvent; import edu.ucsb.nceas.metacat.properties.PropertyService; import edu.ucsb.nceas.metacat.shared.BaseService; import edu.ucsb.nceas.metacat.shared.ServiceException; import edu.ucsb.nceas.metacat.util.DocumentUtil; import edu.ucsb.nceas.utilities.FileUtil; import edu.ucsb.nceas.utilities.PropertyNotFoundException; /** * The Hazelcast service enables Metacat as a Hazelcast cluster member */ public class HazelcastService extends BaseService implements EntryListener, MembershipListener, LifecycleListener, ItemListener { private static final String MISSING_PID_PREFIX = "missing-"; /* The instance of the logging class */ private static Logger logMetacat = Logger.getLogger(HazelcastService.class); /* The singleton instance of the hazelcast service */ private static HazelcastService hzService = null; /* The Hazelcast configuration */ private Config hzConfig; /* The name of the system metadata map */ private String systemMetadataMap; /* The Hazelcast distributed system metadata map */ private IMap systemMetadata; /* The name of the identifiers set */ private String identifiersSet; /* The Hazelcast distributed identifiers set */ private ISet identifiers; /* The Hazelcast distributed missing identifiers set */ private ISet missingIdentifiers; /* The Hazelcast distributed index queue */ private String hzIndexQueue; private IMap indexQueue; /* The Hazelcast distributed index event map */ private String hzIndexEventMap; private IMap indexEventMap; private HazelcastInstance hzInstance; /* * Constructor: Creates an instance of the hazelcast service. Since * this uses a singleton pattern, use getInstance() to gain the instance. */ private HazelcastService() { super(); _serviceName="HazelcastService"; try { init(); } catch (ServiceException se) { logMetacat.error("There was a problem creating the HazelcastService. " + "The error message was: " + se.getMessage()); } } /** * Get the instance of the HazelcastService that has been instantiated, * or instantiate one if it has not been already. * * @return hazelcastService - The instance of the hazelcast service */ public static HazelcastService getInstance(){ if ( hzService == null ) { hzService = new HazelcastService(); } return hzService; } /** * Initializes the Hazelcast service */ public void init() throws ServiceException { logMetacat.debug("HazelcastService.init() called."); String configFileName = null; try { configFileName = PropertyService.getProperty("dataone.hazelcast.configFilePath"); hzConfig = new FileSystemXmlConfig(configFileName); } catch (Exception e) { configFileName = PropertyService.CONFIG_FILE_DIR + FileUtil.getFS() + "hazelcast.xml"; logMetacat.warn("Custom Hazelcast configuration not defined, using default: " + configFileName); // make sure we have the config try { hzConfig = new FileSystemXmlConfig(configFileName); } catch (FileNotFoundException e1) { String msg = e.getMessage(); logMetacat.error(msg); throw new ServiceException(msg); } } this.hzInstance = Hazelcast.newHazelcastInstance(hzConfig); logMetacat.debug("Initialized hzInstance"); // Get configuration properties on instantiation try { systemMetadataMap = PropertyService.getProperty("dataone.hazelcast.storageCluster.systemMetadataMap"); identifiersSet = PropertyService.getProperty("dataone.hazelcast.storageCluster.identifiersSet"); // Get a reference to the shared system metadata map as a cluster member // NOTE: this loads the map from the backing store and can take a long time for large collections systemMetadata = this.hzInstance.getMap(systemMetadataMap); logMetacat.debug("Initialized systemMetadata"); // Get a reference to the shared identifiers set as a cluster member // NOTE: this takes a long time to complete logMetacat.warn("Retrieving hzIdentifiers from Hazelcast"); identifiers = this.hzInstance.getSet(identifiersSet); logMetacat.warn("Retrieved hzIdentifiers from Hazelcast"); // for publishing the "PIDs Wanted" list missingIdentifiers = this.hzInstance.getSet("hzMissingIdentifiersSet"); missingIdentifiers.addItemListener(this, true); // for index tasks hzIndexQueue = PropertyService.getProperty("index.hazelcast.indexqueue"); indexQueue = this.hzInstance.getMap(hzIndexQueue); // for index events (failures) hzIndexEventMap = PropertyService.getProperty("index.hazelcast.indexeventmap"); indexEventMap = this.hzInstance.getMap(hzIndexEventMap); // Listen for changes to the system metadata map systemMetadata.addEntryListener(this, true); // Listen for members added/removed hzInstance.getCluster().addMembershipListener(this); // Listen for lifecycle state changes hzInstance.getLifecycleService().addLifecycleListener(this); } catch (PropertyNotFoundException e) { String msg = "Couldn't find Hazelcast properties for the DataONE clusters. " + "The error message was: " + e.getMessage(); logMetacat.error(msg); } // make sure we have all metadata locally try { // synch on restart resynchInThread(); } catch (Exception e) { String msg = "Problem resynchronizing system metadata. " + e.getMessage(); logMetacat.error(msg, e); } } /** * Get the system metadata map * * @return systemMetadata - the hazelcast map of system metadata * @param identifier - the identifier of the object as a string */ public IMap getSystemMetadataMap() { return systemMetadata; } /** * Get the identifiers set * @return identifiers - the set of unique DataONE identifiers in the cluster */ public ISet getIdentifiers() { return identifiers; } /** * Get the index queue * @return the set of SystemMetadata to be indexed */ public IMap getIndexQueue() { return indexQueue; } /** * Get the index event map * @return indexEventMap - the hazelcast map of index events */ public IMap getIndexEventMap() { return indexEventMap; } /** * When Metacat changes the underlying store, we need to refresh the * in-memory representation of it. * @param guid */ public void refreshSystemMetadataEntry(String guid) { Identifier identifier = new Identifier(); identifier.setValue(guid); // force hazelcast to update system metadata in memory from the store HazelcastService.getInstance().getSystemMetadataMap().evict(identifier); } public Lock getLock(String identifier) { Lock lock = null; try { lock = getInstance().getHazelcastInstance().getLock(identifier); } catch (RuntimeException e) { logMetacat.info("Couldn't get a lock for identifier " + identifier + " !!"); } return lock; } /** * Get the DataONE hazelcast node map * @return nodes - the hazelcast map of nodes */ // public IMap getNodesMap() { // return nodes; // } /** * Indicate whether or not this service is refreshable. * * @return refreshable - the boolean refreshable status */ public boolean refreshable() { // TODO: Determine the consequences of restarting the Hazelcast instance // Set this to true if it's okay to drop from the cluster, lose the maps, // and start back up again return false; } /** * Stop the HazelcastService. When stopped, the service will no longer * respond to requests. */ public void stop() throws ServiceException { this.hzInstance.getLifecycleService().shutdown(); } public HazelcastInstance getHazelcastInstance() { return this.hzInstance; } /** * Refresh the Hazelcast service by restarting it */ @Override protected void doRefresh() throws ServiceException { // TODO: verify that the correct config file is still used this.hzInstance.getLifecycleService().restart(); } /** * Implement the EntryListener interface for Hazelcast, reponding to entry * added events in the hzSystemMetadata map. Evaluate the entry and create * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE) * * @param event - The EntryEvent that occurred */ @Override public void entryAdded(EntryEvent event) { logMetacat.info("SystemMetadata entry added event on identifier " + event.getKey().getValue()); // handle as update - that method will create if necessary entryUpdated(event); } /** * Implement the EntryListener interface for Hazelcast, reponding to entry * evicted events in the hzSystemMetadata map. Evaluate the entry and create * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE) * * @param event - The EntryEvent that occurred */ @Override public void entryEvicted(EntryEvent event) { logMetacat.info("SystemMetadata entry evicted event on identifier " + event.getKey().getValue()); // ensure identifiers are listed in the hzIdentifiers set if ( !identifiers.contains(event.getKey()) ) { identifiers.add(event.getKey()); } } /** * Implement the EntryListener interface for Hazelcast, reponding to entry * removed events in the hzSystemMetadata map. Evaluate the entry and create * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE) * * @param event - The EntryEvent that occurred */ @Override public void entryRemoved(EntryEvent event) { logMetacat.info("SystemMetadata entry removed event on identifier " + event.getKey().getValue()); // we typically don't remove objects in Metacat, but can remove System Metadata IdentifierManager.getInstance().deleteSystemMetadata(event.getValue().getIdentifier().getValue()); // keep the hzIdentifiers set in sync with the systemmetadata table if ( identifiers.contains(event.getKey()) ) { identifiers.remove(event.getKey()); } } /** * Implement the EntryListener interface for Hazelcast, reponding to entry * updated events in the hzSystemMetadata map. Evaluate the entry and create * CNReplicationTasks as appropriate (for DATA, METADATA, RESOURCE) * * @param event - The EntryEvent that occurred */ @Override public void entryUpdated(EntryEvent event) { logMetacat.debug("Entry added/updated to System Metadata map: " + event.getKey().getValue()); PartitionService partitionService = this.hzInstance.getPartitionService(); Partition partition = partitionService.getPartition(event.getKey()); Member ownerMember = partition.getOwner(); SystemMetadata sysmeta = event.getValue(); if (!ownerMember.localMember()) { if (sysmeta == null) { logMetacat.warn("No SystemMetadata provided in the event, getting from shared map: " + event.getKey().getValue()); sysmeta = getSystemMetadataMap().get(event.getKey()); if (sysmeta == null) { // this is a problem logMetacat.error("Could not find SystemMetadata in shared map for: " + event.getKey().getValue()); // TODO: should probably return at this point since the save will fail } } // need to pull the entry into the local store saveLocally(event.getValue()); } // ensure identifiers are listed in the hzIdentifiers set if (!identifiers.contains(event.getKey())) { identifiers.add(event.getKey()); } } /** * Save SystemMetadata to local store if needed * @param sm */ private void saveLocally(SystemMetadata sm) { logMetacat.debug("Saving entry locally: " + sm.getIdentifier().getValue()); try { IdentifierManager.getInstance().insertOrUpdateSystemMetadata(sm); } catch (McdbDocNotFoundException e) { logMetacat.error("Could not save System Metadata to local store.", e); } catch (SQLException e) { logMetacat.error("Could not save System Metadata to local store.", e); } catch (InvalidSystemMetadata e) { logMetacat.error("Could not save System Metadata to local store.", e); } } /** * Checks the local backing store for missing SystemMetadata, * retrieves those entries from the shared map if they exist, * and saves them locally. */ private void synchronizeLocalStore() { List localIds = IdentifierManager.getInstance().getLocalIdsWithNoSystemMetadata(true, -1); if (localIds != null) { logMetacat.debug("Member missing SystemMetadata entries, count = " + localIds.size()); for (String localId: localIds) { logMetacat.debug("Processing system metadata for localId: " + localId); try { String docid = DocumentUtil.getSmartDocId(localId); int rev = DocumentUtil.getRevisionFromAccessionNumber(localId); String guid = IdentifierManager.getInstance().getGUID(docid, rev); logMetacat.debug("Found mapped guid: " + guid); Identifier pid = new Identifier(); pid.setValue(guid); SystemMetadata sm = systemMetadata.get(pid); logMetacat.debug("Found shared system metadata for guid: " + guid); saveLocally(sm); logMetacat.debug("Saved shared system metadata locally for guid: " + guid); } catch (Exception e) { logMetacat.error("Could not save shared SystemMetadata entry locally, localId: " + localId, e); } } } } /** * Make sure we have a copy of every entry in the shared map. * We use lazy loading and therefore the CNs may not all be in sync when one * comes back online after an extended period of being offline * This method loops through the entries that a FULLY UP-TO-DATE CN has * and makes sure each one is present on the shared map. * It is meant to overcome a HZ weakness wherein ownership of a key results in * null values where the owner does not have a complete backing store. * This will be an expensive routine and should be run in a background process so that * the server can continue to service other requests during the synch * @throws Exception */ private void resynchToRemote() { // the local identifiers not already present in the shared map Set localIdKeys = loadAllKeys(); // the PIDs missing locally Set missingIdKeys = new HashSet(); // only contribute PIDs that are not already shared Iterator idIter = identifiers.iterator(); int processedCount = 0; while (idIter.hasNext()) { Identifier pid = idIter.next(); if (localIdKeys.contains(pid)) { logMetacat.debug("Shared pid is already in local identifier set: " + pid.getValue()); localIdKeys.remove(pid); } else { // we don't have this locally, so we should try to get it missingIdKeys.add(pid); } processedCount++; } logMetacat.warn("processedCount (identifiers from iterator): " + processedCount); logMetacat.warn("local pid count not yet shared: " + localIdKeys.size() + ", shared pid count: " + identifiers.size()); //identifiers.addAll(idKeys); logMetacat.warn("Loading missing local keys into hzIdentifiers"); for (Identifier key: localIdKeys) { if (!identifiers.contains(key)) { logMetacat.debug("Adding missing hzIdentifiers key: " + key.getValue()); identifiers.add(key); } } logMetacat.warn("Initialized identifiers with missing local keys"); logMetacat.warn("Processing missing SystemMetadata for missing pid count: " + missingIdKeys.size()); // loop through all the missing PIDs to find any null (missing) SM that needs to be resynched Iterator missingPids = missingIdKeys.iterator(); while (missingPids.hasNext()) { Identifier pid = missingPids.next(); // publish that we need this SM entry logMetacat.debug("Publishing missing pid to wanted list: " + pid.getValue()); missingIdentifiers.add(pid); } } public void resynchInThread() { logMetacat.debug("launching system metadata resynch in a thread"); ExecutorService executor = Executors.newSingleThreadExecutor(); executor.execute(new Runnable() { @Override public void run() { try { // this is a push mechanism DateFormat dateFormat = new SimpleDateFormat("MMM dd, yyyy HH:mm:ss aaa"); System.out.println(dateFormat.format(Calendar.getInstance().getTime())+" Start the hazelcast synchronization"); logMetacat.warn("Start the hazelcast synchronization"); resynchToRemote(); System.out.println(dateFormat.format(Calendar.getInstance().getTime())+" End the hazelcast synchronization"); logMetacat.warn("End the hazelcast synchronization"); } catch (Exception e) { logMetacat.error("Error in resynchInThread: " + e.getMessage(), e); } } }); executor.shutdown(); } /** * When there is missing SystemMetadata on the local member, * we retrieve it from the shared map and add it to the local * backing store for safe keeping. */ @Override public void memberAdded(MembershipEvent event) { Member member = event.getMember(); logMetacat.debug("Member added to cluster: " + member.getInetSocketAddress()); boolean isLocal = member.localMember(); if (isLocal) { logMetacat.debug("Member islocal: " + member.getInetSocketAddress()); synchronizeLocalStore(); } } @Override public void memberRemoved(MembershipEvent event) { // TODO Auto-generated method stub } /** * In cases where this cluster is paused, we want to * check that the local store accurately reflects the shared * SystemMetadata map * @param event */ @Override public void stateChanged(LifecycleEvent event) { logMetacat.debug("HZ LifecycleEvent.state: " + event.getState()); if (event.getState().equals(LifecycleEvent.LifecycleState.RESUMED)) { logMetacat.debug("HZ LifecycleEvent.state is RESUMED, calling synchronizeLocalStore()"); synchronizeLocalStore(); } } /** * Load all System Metadata keys from the backing store * @return set of pids */ private Set loadAllKeys() { Set pids = new HashSet(); try { // ALTERNATIVE 1: this has more overhead than just looking at the GUIDs // ObjectList ol = IdentifierManager.getInstance().querySystemMetadata( // null, //startTime, // null, //endTime, // null, //objectFormatId, // false, //replicaStatus, // 0, //start, // -1 //count // ); // for (ObjectInfo o: ol.getObjectInfoList()) { // Identifier pid = o.getIdentifier(); // if ( !pids.contains(pid) ) { // pids.add(pid); // } // } // ALTERNATIVE method: look up all the Identifiers from the table List guids = IdentifierManager.getInstance().getAllSystemMetadataGUIDs(); logMetacat.warn("Local SystemMetadata pid count: " + guids.size()); for (String guid: guids){ Identifier pid = new Identifier(); pid.setValue(guid); pids.add(pid); } } catch (Exception e) { throw new RuntimeException(e.getMessage(), e); } return pids; } /** * Respond to itemAdded events on the hzMissingIdentifiers Set. Uses a * distributed ILock to try to prevent multiple put calls on hzSystemMetadata * * @param pid the identifier of the event */ @Override public void itemAdded(ItemEvent event) { Identifier pid = (Identifier) event.getItem(); // publish the SM for the pid if we have it locally logMetacat.debug("Responding to itemAdded for pid: " + pid.getValue()); // lock this event, only if we have a local copy to contribute ILock lock = null; try { // look up the local copy of the SM SystemMetadata sm = IdentifierManager.getInstance().getSystemMetadata(pid.getValue()); if (sm != null) { lock = hzInstance.getLock(MISSING_PID_PREFIX + pid.getValue()); if ( lock.tryLock() ) { // "publish" the system metadata to the shared map since it showed up on the missing queue logMetacat.debug("Adding SystemMetadata to shared map for pid: " + pid.getValue()); systemMetadata.put(pid, sm); // remove the entry since we processed it missingIdentifiers.remove(pid); } else { logMetacat.debug(MISSING_PID_PREFIX + pid.getValue() + " was already locked. Skipping."); } } else { // can't help here logMetacat.warn("Local system metadata not found for pid: " + pid.getValue()); } } catch (Exception e) { logMetacat.error("Error looking up missing system metadata for pid: " + pid.getValue()); } finally { if ( lock != null ) { lock.unlock(); } } } /** * Respond to itemRemoved events on the hzMissingIdentifiers Set * * @param pid the identifier of the event */ @Override public void itemRemoved(ItemEvent event) { // do nothing since someone probably handled the wanted PID } }