001/* 002 * The contents of this file are subject to the terms of the Common Development and 003 * Distribution License (the License). You may not use this file except in compliance with the 004 * License. 005 * 006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the 007 * specific language governing permission and limitations under the License. 008 * 009 * When distributing Covered Software, include this CDDL Header Notice in each file and include 010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL 011 * Header, with the fields enclosed by brackets [] replaced by your own identifying 012 * information: "Portions Copyright [year] [name of copyright owner]". 013 * 014 * Copyright 2006-2009 Sun Microsystems, Inc. 015 * Portions Copyright 2011-2015 ForgeRock AS. 016 */ 017package org.opends.server.replication.server; 018 019import java.net.SocketException; 020 021import org.forgerock.i18n.LocalizableMessage; 022import org.opends.server.api.DirectoryThread; 023import org.forgerock.i18n.slf4j.LocalizedLogger; 024import org.opends.server.replication.common.ServerStatus; 025import org.opends.server.replication.protocol.ReplicaOfflineMsg; 026import org.opends.server.replication.protocol.Session; 027import org.opends.server.replication.protocol.UpdateMsg; 028import org.opends.server.replication.service.DSRSShutdownSync; 029 030import static org.opends.messages.ReplicationMessages.*; 031import static org.opends.server.replication.common.ServerStatus.*; 032import static org.opends.server.util.StaticUtils.*; 033 034/** 035 * This class defines a server writer, which is used to send changes to a 036 * directory server. 037 */ 038public class ServerWriter extends DirectoryThread 039{ 040 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 041 042 private final Session session; 043 private final ServerHandler handler; 044 private final ReplicationServerDomain replicationServerDomain; 045 private final DSRSShutdownSync dsrsShutdownSync; 046 047 /** 048 * Create a ServerWriter. Then ServerWriter then waits on the ServerHandler 049 * for new updates and forward them to the server 050 * 051 * @param session 052 * the Session that will be used to send updates. 053 * @param handler 054 * handler for which the ServerWriter is created. 055 * @param replicationServerDomain 056 * The ReplicationServerDomain of this ServerWriter. 057 * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances. 058 */ 059 public ServerWriter(Session session, ServerHandler handler, 060 ReplicationServerDomain replicationServerDomain, 061 DSRSShutdownSync dsrsShutdownSync) 062 { 063 // Session may be null for ECLServerWriter. 064 super("Replication server RS(" + handler.getReplicationServerId() 065 + ") writing to " + handler + " at " 066 + (session != null ? session.getReadableRemoteAddress() : "unknown")); 067 068 this.session = session; 069 this.handler = handler; 070 this.replicationServerDomain = replicationServerDomain; 071 this.dsrsShutdownSync = dsrsShutdownSync; 072 } 073 074 /** 075 * Run method for the ServerWriter. 076 * Loops waiting for changes from the ReplicationServerDomain and forward them 077 * to the other servers 078 */ 079 @Override 080 public void run() 081 { 082 if (logger.isTraceEnabled()) 083 { 084 logger.trace(getName() + " starting"); 085 } 086 087 LocalizableMessage errMessage = null; 088 try 089 { 090 boolean shutdown = false; 091 while (!shutdown 092 || !dsrsShutdownSync.canShutdown(replicationServerDomain.getBaseDN())) 093 { 094 final UpdateMsg updateMsg = this.handler.take(); 095 if (updateMsg == null) 096 { 097 // this connection is closing 098 errMessage = LocalizableMessage.raw( 099 "Connection closure: null update returned by domain."); 100 shutdown = true; 101 } 102 else if (!isUpdateMsgFiltered(updateMsg)) 103 { 104 // Publish the update to the remote server using a protocol version it supports 105 session.publish(updateMsg); 106 if (updateMsg instanceof ReplicaOfflineMsg) 107 { 108 dsrsShutdownSync.replicaOfflineMsgForwarded(replicationServerDomain.getBaseDN()); 109 } 110 } 111 } 112 } 113 catch (SocketException e) 114 { 115 /* 116 * The remote host has disconnected and this particular Tree is going to 117 * be removed, just ignore the exception and let the thread die as well 118 */ 119 errMessage = handler.getBadlyDisconnectedErrorMessage(); 120 logger.error(errMessage); 121 } 122 catch (Exception e) 123 { 124 /* 125 * An unexpected error happened. 126 * Log an error and close the connection. 127 */ 128 errMessage = ERR_WRITER_UNEXPECTED_EXCEPTION.get(handler + 129 " " + stackTraceToSingleLineString(e)); 130 logger.error(errMessage); 131 } 132 finally { 133 session.close(); 134 replicationServerDomain.stopServer(handler, false); 135 if (logger.isTraceEnabled()) 136 { 137 logger.trace(getName() + " stopped " + errMessage); 138 } 139 } 140 } 141 142 private boolean isUpdateMsgFiltered(UpdateMsg updateMsg) 143 { 144 if (handler.isDataServer()) 145 { 146 /** 147 * Ignore updates to DS in bad BAD_GENID_STATUS or FULL_UPDATE_STATUS 148 * 149 * The RSD lock should not be taken here as it is acceptable to have a delay 150 * between the time the server has a wrong status and the fact we detect it: 151 * the updates that succeed to pass during this time will have no impact on remote server. 152 * But it is interesting to not saturate uselessly the network 153 * if the updates are not necessary so this check to stop sending updates is interesting anyway. 154 * Not taking the RSD lock allows to have better performances in normal mode (most of the time). 155 */ 156 final ServerStatus dsStatus = handler.getStatus(); 157 if (dsStatus == BAD_GEN_ID_STATUS) 158 { 159 logger.warn(WARN_IGNORING_UPDATE_TO_DS_BADGENID, handler.getReplicationServerId(), 160 updateMsg.getCSN(), handler.getBaseDN(), handler.getServerId(), 161 session.getReadableRemoteAddress(), 162 handler.getGenerationId(), 163 replicationServerDomain.getGenerationId()); 164 return true; 165 } 166 else if (dsStatus == FULL_UPDATE_STATUS) 167 { 168 logger.warn(WARN_IGNORING_UPDATE_TO_DS_FULLUP, handler.getReplicationServerId(), 169 updateMsg.getCSN(), handler.getBaseDN(), handler.getServerId(), 170 session.getReadableRemoteAddress()); 171 return true; 172 } 173 } 174 else 175 { 176 /** 177 * Ignore updates to RS with bad gen id 178 * (no system managed status for a RS) 179 */ 180 final long referenceGenerationId = replicationServerDomain.getGenerationId(); 181 if (referenceGenerationId != handler.getGenerationId() 182 || referenceGenerationId == -1 || handler.getGenerationId() == -1) 183 { 184 logger.error(WARN_IGNORING_UPDATE_TO_RS, 185 handler.getReplicationServerId(), 186 updateMsg.getCSN(), handler.getBaseDN(), handler.getServerId(), 187 session.getReadableRemoteAddress(), 188 handler.getGenerationId(), 189 referenceGenerationId); 190 return true; 191 } 192 } 193 return false; 194 } 195}