/** * This work was created by participants in the DataONE project, and is * jointly copyrighted by participating institutions in DataONE. For * more information on DataONE, see our web site at http://dataone.org. * * Copyright 2012. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ package org.dataone.service.cn.replication; import java.net.URI; import java.util.List; import org.apache.log4j.Logger; import org.dataone.client.D1NodeFactory; import org.dataone.client.rest.HttpMultipartRestClient; import org.dataone.client.v2.CNode; import org.dataone.client.v2.itk.D1Client; import org.dataone.configuration.Settings; import org.dataone.service.exceptions.BaseException; import org.dataone.service.exceptions.InvalidRequest; import org.dataone.service.exceptions.InvalidToken; import org.dataone.service.exceptions.NotAuthorized; import org.dataone.service.exceptions.NotFound; import org.dataone.service.exceptions.NotImplemented; import org.dataone.service.exceptions.ServiceFailure; import org.dataone.service.types.v1.Identifier; import org.dataone.service.types.v1.NodeReference; import org.dataone.service.types.v1.Replica; import org.dataone.service.types.v1.ReplicationStatus; import org.dataone.service.types.v2.SystemMetadata; /** * * Encapsulation of common operations involving replica system metadata objects. * * Implementations make use of the CN REST api to perform updates/creates on * system metadata replica objects. * * @author sroseboo * */ public class ReplicationService { private CNode cn; public static Logger log = Logger.getLogger(ReplicationService.class); public ReplicationService() { initializeCN(); } /** * Handles moving a 'queued' replication object into 'requested' state. * Replica is able to move to 'requested' if the replica request is * successfully made to the targetNode. If the target node cannot be * acquired, the replica object is removed. If the replica request fails at * the target member node, the replica is updated to 'failed' status. * * @param identifier * @param targetNode */ public void requestQueuedReplication(Identifier identifier, NodeReference targetNode) { if (identifier == null || targetNode == null) { return; } if (cn == null) { log.error("Unable to request replicas - CN is null."); return; } SystemMetadata sysmeta = null; try { sysmeta = getSystemMetadata(identifier); } catch (NotFound e) { } if (sysmeta == null) { log.error("Unable to get system metadata for: " + identifier.getValue() + ". exiting..."); return; } if (alreadyReplicated(sysmeta, targetNode)) { log.debug("Replica is already handled for " + targetNode.getValue() + ", identifier " + identifier.getValue() + ". exiting..."); return; } boolean updated = setReplicaToRequested(identifier, targetNode); if (updated == false) { log.error("Unable to set replication status to 'requested' for: " + identifier.getValue() + " for node: " + targetNode.getValue() + ". exiting..."); return; } // request replication be queued ReplicationCommunication rc = ReplicationCommunication.getInstance(targetNode); boolean success = false; try { success = rc.requestReplication(targetNode, sysmeta); } catch (BaseException e) { log.warn(e.getMessage(), e); } if (!success) { log.error("Unable to request replica from target mn: " + targetNode.getValue() + " for: " + identifier.getValue() + ". setting status to failed."); setReplicationStatus(identifier, targetNode, ReplicationStatus.FAILED); } } /** * Delete the replica entry for the target node using the CN router URL * rather than the local CN via D1Client. This may help with local CN * communication trouble. * * @param pid * - the identifier of the object system metadata being modified * @param targetNode * - the node id of the replica target being deleted * @param serialVersion * - the serialVersion of the system metadata being operated on **/ public boolean deleteReplicationMetadata(Identifier pid, NodeReference targetNode) { if (this.cn == null) { log.error("cannot set replication status, no CN object"); return false; } SystemMetadata sysmeta; boolean deleted = false; // try multiple times since at this point we may be dealing with a lame // CN in the cluster and the RR may still point us to it for (int i = 0; i < 5; i++) { try { // refresh the system metadata in case it changed sysmeta = getSystemMetadata(pid); deleted = cn.deleteReplicationMetadata(null, pid, targetNode, sysmeta .getSerialVersion().longValue()); if (deleted) { break; } } catch (BaseException be) { log.error( "BaseException error in calling deleteReplicationMetadata() for identifier " + pid.getValue() + " and target node " + targetNode.getValue() + ": " + be.getMessage(), be); } catch (RuntimeException re) { log.error( "Runtime exception calling delete replica metadata for: " + pid.getValue() + " for node: " + targetNode.getValue(), re); } } if (!deleted) { log.error("Ultimately unable to delete replica metadata for: " + pid.getValue() + " on node: " + targetNode.getValue()); } return deleted; } public boolean setReplicaToRequested(Identifier identifier, NodeReference targetNode) { return setReplicationStatus(identifier, targetNode, ReplicationStatus.REQUESTED); } public boolean setReplicaToCompleted(Identifier identifier, NodeReference targetNode) { return setReplicationStatus(identifier, targetNode, ReplicationStatus.COMPLETED); } /** * Set the replication status against the router CN address instead of the * local CN via D1Client. This may help with local CN communication trouble. **/ private boolean setReplicationStatus(Identifier pid, NodeReference targetNode, ReplicationStatus status) { if (this.cn == null) { log.error("cannot set replication status, no CN object"); return false; } boolean updated = false; for (int i = 0; i < 5; i++) { try { updated = cn.setReplicationStatus(null, pid, targetNode, status, null); if (updated) { break; } } catch (InvalidRequest ire) { log.warn( "Couldn't set the replication status to " + status.toString() + ", it may have possibly " + "already been set to completed for identifier " + pid.getValue() + " and target node " + targetNode.getValue() + ". The error was: " + ire.getMessage(), ire); } catch (BaseException be) { log.error( "Error in calling setReplicationStatus() for identifier " + pid.getValue() + ", target node " + targetNode.getValue() + " and status of " + status.toString() + ": " + be.getMessage() + ", detail code " + be.getDetail_code() + ", description: " + be.getDescription(), be); } } if (!updated) { log.error("Ultimately unable to update status: " + status + " for: " + pid.getValue() + " on node: " + targetNode.getValue()); } return updated; } private boolean alreadyReplicated(SystemMetadata sysmeta, NodeReference targetNode) { boolean handled = false; List replicaList = sysmeta.getReplicaList(); for (Replica replica : replicaList) { NodeReference listedNode = replica.getReplicaMemberNode(); if (listedNode.getValue().equals(targetNode.getValue())) { ReplicationStatus currentStatus = replica.getReplicationStatus(); if (currentStatus == ReplicationStatus.REQUESTED || currentStatus == ReplicationStatus.COMPLETED) { handled = true; break; } } } if (handled) { log.debug("Replica is already handled for: " + sysmeta.getIdentifier().getValue() + " at node: " + targetNode.getValue()); } return handled; } private boolean hasQueuedReplica(SystemMetadata sysmeta, NodeReference targetNode) { boolean queued = false; List replicaList = sysmeta.getReplicaList(); for (Replica replica : replicaList) { NodeReference listedNode = replica.getReplicaMemberNode(); if (listedNode.getValue().equals(targetNode.getValue())) { ReplicationStatus currentStatus = replica.getReplicationStatus(); if (currentStatus == ReplicationStatus.QUEUED) { queued = true; break; } } } if (!queued) { log.debug("Replica is not queued for: " + sysmeta.getIdentifier().getValue() + " at node: " + targetNode.getValue()); } return queued; } public SystemMetadata getSystemMetadata(Identifier identifier) throws NotFound { SystemMetadata sysmeta = null; if (identifier != null && identifier.getValue() != null) { try { sysmeta = cn.getSystemMetadata(null, identifier); } catch (InvalidToken e) { log.error("Cannot get system metedata for id: " + identifier.getValue(), e); } catch (ServiceFailure e) { log.error("Cannot get system metedata for id: " + identifier.getValue(), e); } catch (NotAuthorized e) { log.error("Cannot get system metedata for id: " + identifier.getValue(), e); } catch (NotImplemented e) { log.error("Cannot get system metedata for id: " + identifier.getValue(), e); } } return sysmeta; } // public InputStream getObjectFromCN(Identifier identifier) throws NotFound { // InputStream is = null; // if (identifier != null && identifier.getValue() != null) { // try { // is = cn.get(null, identifier); // } catch (InvalidToken e) { // log.error("Unable to get object from CN for pid: " + identifier.getValue(), e); // } catch (ServiceFailure e) { // log.error("Unable to get object from CN for pid: " + identifier.getValue(), e); // } catch (NotAuthorized e) { // log.error("Unable to get object from CN for pid: " + identifier.getValue(), e); // } catch (NotImplemented e) { // log.error("Unable to get object from CN for pid: " + identifier.getValue(), e); // } // } // return is; // } public NodeReference determineReplicationSourceNode(SystemMetadata sysMeta) { NodeReference source = null; NodeReference authNode = sysMeta.getAuthoritativeMemberNode(); for (Replica replica : sysMeta.getReplicaList()) { if (replica.getReplicaMemberNode().equals(authNode) && replica.getReplicationStatus().equals(ReplicationStatus.COMPLETED)) { source = authNode; break; } else if (source == null && replica.getReplicationStatus().equals(ReplicationStatus.COMPLETED)) { // set the source to the first completed replica but keep iterating to find // the authoritative MN and give preference to its 'replica' as source. source = replica.getReplicaMemberNode(); } } return source; } /** * Update the replica metadata against the CN router address rather than the * local CN address. This only gets called if normal updates fail due to * local CN communication errors * * @return true if the replica metadata are updated * @param session * @param pid * @param replicaMetadata **/ public boolean updateReplicationMetadata(Identifier pid, Replica replicaMetadata) { SystemMetadata sysmeta = null; boolean updated = false; for (int i = 0; i < 5; i++) { try { // refresh the system metadata in case it changed sysmeta = getSystemMetadata(pid); updated = cn.updateReplicationMetadata(null, pid, replicaMetadata, sysmeta .getSerialVersion().longValue()); if (updated) { break; } } catch (BaseException be) { // the replica has already completed from a different task if (be instanceof InvalidRequest) { log.warn( "Couldn't update replication metadata to " + replicaMetadata.getReplicationStatus().toString() + ", it may have possibly already been updated for identifier " + pid.getValue() + " and target node " + replicaMetadata.getReplicaMemberNode().getValue() + ". The error was: " + be.getMessage(), be); return false; } if (log.isDebugEnabled()) { log.debug(be); } log.error("Error in calling updateReplicationMetadata(): " + be.getMessage()); continue; } catch (RuntimeException re) { if (log.isDebugEnabled()) { log.debug(re); } log.error("Error in getting sysyem metadata from the map: " + re.getMessage()); continue; } } return updated; } // TODO: consolidate with ReplicationManager CN - if falling back to Factory.buildCNode, // you end up with 2 MultipartRestClients (2 HttpClients, 2 ConnectionManagers...) private void initializeCN() { try { this.cn = D1Client.getCN(); } catch (BaseException e) { log.warn("Caught a ServiceFailure while getting a reference to the CN ", e); // try again, then fail try { try { Thread.sleep(5000L); } catch (InterruptedException e1) { log.error("There was a problem getting a Coordinating Node reference.", e1); } this.cn = D1Client.getCN(); } catch (BaseException e1) { log.warn("Second ServiceFailure while getting a reference to the CN", e1); try { log.warn("...Building CNode without baseURL check."); this.cn = D1NodeFactory.buildNode(CNode.class, new HttpMultipartRestClient(), URI.create(Settings.getConfiguration().getString("D1Client.CN_URL"))); } catch (Exception e2) { log.error("ClientSideException trying to build a CNode.", e2); this.cn = null; } } } } }