001/* 002 * The contents of this file are subject to the terms of the Common Development and 003 * Distribution License (the License). You may not use this file except in compliance with the 004 * License. 005 * 006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the 007 * specific language governing permission and limitations under the License. 008 * 009 * When distributing Covered Software, include this CDDL Header Notice in each file and include 010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL 011 * Header, with the fields enclosed by brackets [] replaced by your own identifying 012 * information: "Portions Copyright [year] [name of copyright owner]". 013 * 014 * Copyright 2006-2008 Sun Microsystems, Inc. 015 * Portions Copyright 2011-2016 ForgeRock AS. 016 */ 017package org.opends.server.replication.plugin; 018 019import static org.opends.messages.ReplicationMessages.*; 020import static org.opends.server.util.StaticUtils.*; 021 022import java.util.concurrent.BlockingQueue; 023import java.util.concurrent.TimeUnit; 024import java.util.concurrent.atomic.AtomicBoolean; 025import java.util.concurrent.locks.ReentrantLock; 026 027import org.opends.server.api.DirectoryThread; 028import org.forgerock.i18n.slf4j.LocalizedLogger; 029import org.opends.server.replication.protocol.LDAPUpdateMsg; 030 031/** 032 * Thread that is used to get message from the replication servers (stored 033 * in the updates queue) and replay them in the current server. A configurable 034 * number of this thread is created for the whole MultimasterReplication object 035 * (i.e: these threads are shared across the ReplicationDomain objects for 036 * replaying the updates they receive) 037 */ 038public class ReplayThread extends DirectoryThread 039{ 040 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 041 042 private final BlockingQueue<UpdateToReplay> updateToReplayQueue; 043 private final ReentrantLock switchQueueLock; 044 private AtomicBoolean shutdown = new AtomicBoolean(false); 045 private static int count; 046 047 /** 048 * Constructor for the ReplayThread. 049 * 050 * @param updateToReplayQueue The queue of update messages we have to replay 051 * @param switchQueueLock lock to ensure moving updates from one queue to another is atomic 052 */ 053 public ReplayThread(BlockingQueue<UpdateToReplay> updateToReplayQueue, ReentrantLock switchQueueLock) 054 { 055 super("Replica replay thread " + count++); 056 this.updateToReplayQueue = updateToReplayQueue; 057 this.switchQueueLock = switchQueueLock; 058 } 059 060 /** 061 * Shutdown this replay thread. 062 */ 063 public void shutdown() 064 { 065 shutdown.set(true); 066 } 067 068 /** 069 * Run method for this class. 070 */ 071 @Override 072 public void run() 073 { 074 if (logger.isTraceEnabled()) 075 { 076 logger.trace("Replication Replay thread starting."); 077 } 078 079 while (!shutdown.get()) 080 { 081 try 082 { 083 if (switchQueueLock.tryLock(1L, TimeUnit.SECONDS)) 084 { 085 LDAPReplicationDomain domain; 086 LDAPUpdateMsg updateMsg; 087 try 088 { 089 if (shutdown.get()) 090 { 091 break; 092 } 093 UpdateToReplay updateToreplay = updateToReplayQueue.poll(1L, TimeUnit.SECONDS); 094 if (updateToreplay == null) 095 { 096 continue; 097 } 098 // Find replication domain for that update message and mark it as "in progress" 099 updateMsg = updateToreplay.getUpdateMessage(); 100 domain = updateToreplay.getReplicationDomain(); 101 domain.markInProgress(updateMsg); 102 } 103 finally 104 { 105 switchQueueLock.unlock(); 106 } 107 domain.replay(updateMsg, shutdown); 108 } 109 } 110 catch (Exception e) 111 { 112 /* 113 * catch all exceptions happening so that the thread never dies even 114 * in case of problems. 115 */ 116 logger.error(ERR_EXCEPTION_REPLAYING_REPLICATION_MESSAGE, stackTraceToSingleLineString(e)); 117 } 118 } 119 if (logger.isTraceEnabled()) 120 { 121 logger.trace("Replication Replay thread stopping."); 122 } 123 } 124}