public abstract class ReplicationDomain extends Object
It is intended that developer in need of a replication mechanism subclass this class with their own implementation.
The startup phase of the ReplicationDomain subclass, should read the list of replication servers from the
configuration, instantiate a ServerState
then start the publish service by calling
startPublishService()
. At this point it can start calling the publish(UpdateMsg)
method if needed.
When the startup phase reach the point when the subclass is ready to handle updates the Replication Domain
implementation should call the startListenService()
method. At this point a Listener thread is created on
the Replication Service and which can start receiving updates.
When updates are received the Replication Service calls the dispatchUpdateForReplay(UpdateMsg)
method.
ReplicationDomain implementation should implement the appropriate code for replaying the update on the local
repository. When fully done the subclass must call the processUpdateAfterReplay(UpdateMsg, String)
method.
This allows to process the update asynchronously if necessary.
To propagate changes to other replica, a ReplicationDomain implementation must use the publish(UpdateMsg)
method.
If the Full Initialization process is needed then implementation for importBackend(InputStream)
and
exportBackend(OutputStream)
must be provided.
Full Initialization of a replica can be triggered by LDAP clients by creating InitializeTasks or
InitializeTargetTask. Full initialization can also be triggered from the ReplicationDomain implementation using
methods initializeRemote(int, Task)
or initializeFromRemote(int, Task)
.
At shutdown time, the disableService()
method should be called to cleanly stop the replication service.
Modifier and Type | Class and Description |
---|---|
protected static class |
ReplicationDomain.ImportExportContext
This class contains the context related to an import or export launched on the domain.
|
Modifier and Type | Field and Description |
---|---|
protected ReplicationBroker |
broker
The ReplicationBroker that is used by this ReplicationDomain to connect to the ReplicationService.
|
protected ReplicationDomainCfg |
config
The configuration of the replication domain.
|
protected long |
generationId
The generationId for this replication domain.
|
Constructor and Description |
---|
ReplicationDomain(ReplicationDomainCfg config,
long generationId)
Creates a ReplicationDomain with the provided parameters.
|
ReplicationDomain(ReplicationDomainCfg config,
long generationId,
ServerState serverState)
Creates a ReplicationDomain with the provided parameters.
|
Modifier and Type | Method and Description |
---|---|
void |
addAdditionalMonitoring(MeterRegistryHolder registry)
Subclasses should use this method to add additional monitoring information in the ReplicationDomain.
|
protected void |
changeConfig(ReplicationDomainCfg config)
Change some ReplicationDomain parameters.
|
void |
changeConfig(Set<String> includeAttributes,
Set<String> includeAttributesForDeletes)
Applies a configuration change to the attributes which should be included in the ECL.
|
abstract long |
countEntries()
This method should return the total number of objects in the replicated domain.
|
int |
decodeTarget(String targetString)
Verifies that the given string represents a valid source from which this server can be initialized.
|
void |
disableService()
Temporarily disable the Replication Service.
|
abstract void |
dispatchUpdateForReplay(UpdateMsg updateMsg)
This method ensures this
UpdateMsg received from remote replication entities will be replayed,
by dispatching it to the replay threads. |
void |
enableService()
Restart the Replication service after a
disableService() . |
protected abstract void |
exportBackend(OutputStream output)
This method should trigger an export of the replicated data.
|
AssuredMode |
getAssuredMode()
Gives the mode for the assured replication of the domain.
|
long |
getAssuredSdAcknowledgedUpdates()
Gets the number of updates sent in assured safe data mode that have been acknowledged without errors.
|
byte |
getAssuredSdLevel()
Gives the assured Safe Data level of the replication of the domain.
|
long |
getAssuredSdSentUpdates()
Gets the number of updates sent in assured safe data mode.
|
Map<Integer,Integer> |
getAssuredSdServerTimeoutUpdates()
Gets the number of updates sent in assured safe data mode that have not been acknowledged due to timeout error
per server.
|
long |
getAssuredSdTimeoutUpdates()
Gets the number of updates sent in assured safe data mode that have not been acknowledged due to timeout error.
|
Map<Integer,Integer> |
getAssuredSrServerNotAcknowledgedUpdates()
Gets the number of updates sent in assured safe read mode that have not been acknowledged per server.
|
long |
getAssuredTimeout()
Gives the assured timeout of the replication of the domain (in ms).
|
Dn |
getBaseDN()
Returns the base DN of this ReplicationDomain.
|
CSNGenerator |
getCsnGenerator()
Returns the
CSNGenerator that will be used to generate CSN for this domain. |
Set<String> |
getEclIncludes()
Get the attributes to include in each change for the ECL.
|
Set<String> |
getEclIncludesForDeletes()
Get the attributes to include in each delete change for the ECL.
|
long |
getGenerationID()
This method should return the generationID to use for this ReplicationDomain.
|
byte |
getGroupId()
Gets the group id for this domain.
|
protected ReplicationDomain.ImportExportContext |
getImportExportContext()
Returns the Import/Export context associated to this ReplicationDomain.
|
CSN |
getLastLocalChange()
Returns the CSN of the last Change that was fully processed by this ReplicationDomain.
|
Entry |
getMonitorEntry()
Returns the monitor entry for this replication domain.
|
Set<String> |
getRefUrls()
Gets the referrals URLs this domain publishes.
|
Map<Integer,DSInfo> |
getReplicaInfos()
Gets the info for Replicas in the topology (except us).
|
Map<Integer,ServerState> |
getReplicaStates()
Gets the States of all the Replicas currently in the Topology.
|
int |
getReplicationServerPort()
Get the port of the replicationServer to which this domain is currently connected.
|
List<RSInfo> |
getRsInfos()
Gets the info for RSs in the topology (except the one we are connected to).
|
int |
getRsServerId()
Gets the server ID of the Replication Server to which the domain is currently connected.
|
int |
getServerId()
Get the server ID.
|
ServerState |
getServerState()
Get the ServerState maintained by the Concrete class.
|
ServerStatus |
getStatus()
Gets the status for this domain.
|
boolean |
hasConnectionError()
Check if the domain has a connection error.
|
boolean |
ieRunning()
Returns a boolean indicating if an import or export is currently processed.
|
protected abstract void |
importBackend(InputStream input)
This method should trigger an import of the replicated data.
|
void |
initializeFromRemote(int source,
Task initTask)
Initializes asynchronously this domain from a remote source server.
|
protected void |
initializeRemote(int serverToInitialize,
int serverRunningTheTask,
Task initTask,
int initWindow)
Process the initialization of some other server or servers in the topology specified by the target argument when
this initialization specifying the server that requests the initialization.
|
void |
initializeRemote(int target,
Task initTask)
Initializes a remote server from this server.
|
boolean |
isAssured()
Tells if assured replication is enabled for this domain.
|
boolean |
isConnected()
Check if the domain is connected to a ReplicationServer.
|
protected boolean |
isListenerShuttingDown()
Returns
true if the listener thread is shutting down or has shutdown. |
protected void |
prepareWaitForAckIfAssuredEnabled(UpdateMsg msg)
Prepare a message if it is to be sent in assured mode.
|
protected void |
processUpdateAfterReplay(UpdateMsg msg,
String replayErrorMsg)
This method must be called after each call to
dispatchUpdateForReplay(UpdateMsg) when the processing of
the update is completed. |
void |
publish(UpdateMsg msg)
Publish an
UpdateMsg to the Replication Service. |
void |
publishReplicaOfflineMsg()
Publishes a replica offline message if all pending changes for current replica have been sent out.
|
protected void |
readAssuredConfig(ReplicationDomainCfg config,
boolean allowReconnection)
Gets and stores the assured replication configuration parameters.
|
protected byte[] |
receiveEntryBytes()
Receives bytes related to an entry in the context of an import to initialize the domain (called by
ReplLDIFInputStream).
|
protected void |
recreateRemoteReplicasFromState()
Rebuilds Remote DS lists after a reset of the domain state, either at startup or domain re-initialization.
|
void |
resetGenerationId(Long generationIdNewValue)
Reset the generationId of this domain in the whole topology.
|
void |
sessionInitiated(ServerStatus initStatus,
ServerState rsState)
Set the initial status of the domain and perform necessary initializations.
|
boolean |
setEclIncludes(int serverId,
Set<String> includeAttributes,
Set<String> includeAttributesForDeletes)
Set the attributes configured on a server to be included in the ECL.
|
void |
setGenerationID(long generationId)
Sets the generationId for this replication domain.
|
protected void |
signalNewStatus(StatusMachineEvent event)
Sets the status to a new value depending of the passed status machine event.
|
void |
startListenService()
Starts the receiver side of the Replication Service.
|
void |
startPublishService()
Start the publish mechanism of the Replication Service.
|
String |
toString() |
protected void |
updateState()
Update the server state if necessary.
|
protected void |
waitForAckIfAssuredEnabled(UpdateMsg msg)
Wait for the processing of an assured message after it has been sent, if assured replication is configured,
otherwise, do nothing.
|
protected volatile ReplicationDomainCfg config
protected ReplicationBroker broker
protected volatile long generationId
public ReplicationDomain(ReplicationDomainCfg config, long generationId)
config
- The configuration object for this ReplicationDomaingenerationId
- the generation of this ReplicationDomainpublic ReplicationDomain(ReplicationDomainCfg config, long generationId, ServerState serverState)
config
- The configuration object for this ReplicationDomaingenerationId
- the generation of this ReplicationDomainserverState
- The serverState to usepublic CSNGenerator getCsnGenerator()
CSNGenerator
that will be used to generate CSN
for this domain.CSNGenerator
that will be used to generate CSN
for this domain.public void sessionInitiated(ServerStatus initStatus, ServerState rsState)
initStatus
- The status to enter the state machine with.rsState
- The ServerState of the ReplicationServer with which the session was established.public ServerStatus getStatus()
public Dn getBaseDN()
public int getServerId()
public boolean isAssured()
public AssuredMode getAssuredMode()
public byte getAssuredSdLevel()
public long getAssuredTimeout()
public byte getGroupId()
public Set<String> getRefUrls()
TODO: fill that with all currently opened urls if no urls configured
public Map<Integer,DSInfo> getReplicaInfos()
public Map<Integer,ServerState> getReplicaStates()
When this method is called, a Monitoring message will be sent to the Replication Server to which this domain is currently connected so that it computes a table containing information about all Directory Servers in the topology. This Computation involves communications will all the servers currently connected.
public List<RSInfo> getRsInfos()
public int getRsServerId()
protected void recreateRemoteReplicasFromState()
protected void updateState()
public int decodeTarget(String targetString) throws LdapException
targetString
- The string representing the sourceLdapException
- if the string is not validpublic void initializeRemote(int target, Task initTask) throws LdapException
The exportBackend(OutputStream)
will therefore be called on this server, and the
importBackend(InputStream)
will be called on the remote server.
The InputStream and OutputStream given as a parameter to those methods will be connected through the replication protocol.
target
- The server-id of the server that should be initialized. The target can be discovered using the
getReplicaInfos()
method.initTask
- The task that triggers this initialization and that should be updated with its progress.LdapException
- If it was not possible to publish the Initialization message to the Topology.protected void initializeRemote(int serverToInitialize, int serverRunningTheTask, Task initTask, int initWindow) throws LdapException
serverToInitialize
- The target server that should be initialized.serverRunningTheTask
- The server that initiated the export. It can be the serverID of this server, or the serverID of a
remote server.initTask
- The task in this server that triggers this initialization and that should be updated with its
progress. Null when the export is done following a request coming from a remote server (task is
remote).initWindow
- The value of the initialization window for flow control between the importer and the exporter.LdapException
- When an error occurs. No exception raised means success.public ServerState getServerState()
protected byte[] receiveEntryBytes()
public void initializeFromRemote(int source, Task initTask) throws LdapException
When this method is called, a request for initialization is sent to the remote source server requesting initialization.
source
- The server-id of the source from which to initialize. The source can be discovered using the
getReplicaInfos()
method.initTask
- The task that launched the initialization and should be updated of its progress.LdapException
- If it was not possible to publish the Initialization message to the Topology. The task state is
updated.protected void signalNewStatus(StatusMachineEvent event)
event
- The event that may make the status be changedpublic boolean ieRunning()
public void resetGenerationId(Long generationIdNewValue) throws LdapException
generationIdNewValue
- The new value of the generation Id.LdapException
- When an error occurspublic boolean isConnected()
public boolean hasConnectionError()
public int getReplicationServerPort()
public Entry getMonitorEntry()
public Map<Integer,Integer> getAssuredSrServerNotAcknowledgedUpdates()
public long getAssuredSdSentUpdates()
public long getAssuredSdAcknowledgedUpdates()
public long getAssuredSdTimeoutUpdates()
public Map<Integer,Integer> getAssuredSdServerTimeoutUpdates()
public void startPublishService()
publish(UpdateMsg)
method.public void startListenService()
After this method has been called, the Replication Service will start calling the
dispatchUpdateForReplay(UpdateMsg)
.
This method must be called once and must be called after the startPublishService()
.
public void disableService()
enableService()
.
It can be useful to disable the Replication Service when the repository where the replicated information is stored becomes temporarily unavailable and replicated updates can therefore not be replayed during a while. This method is not MT safe.
protected final boolean isListenerShuttingDown()
true
if the listener thread is shutting down or has shutdown.true
if the listener thread is shutting down or has shutdown.public void enableService()
disableService()
.
The Replication Service will restart from the point indicated by the ServerState
that was given as a
parameter to the startPublishService()
at startup time.
If some data have changed in the repository during the period of time when the Replication Service was disabled,
this ServerState
should therefore be updated by the Replication Domain subclass before calling this
method. This method is not MT safe.
protected void changeConfig(ReplicationDomainCfg config)
config
- The new configuration that this domain should now use.public void changeConfig(Set<String> includeAttributes, Set<String> includeAttributesForDeletes)
includeAttributes
- attributes to be included with all change records.includeAttributesForDeletes
- additional attributes to be included with delete change records.protected abstract void exportBackend(OutputStream output) throws LdapException
output
- The OutputStream where the export should be produced.LdapException
- When needed.protected abstract void importBackend(InputStream input) throws LdapException
input
- The InputStream from which the import should be reading entries.LdapException
- When needed.public abstract long countEntries() throws LdapException
LdapException
- when needed.public abstract void dispatchUpdateForReplay(UpdateMsg updateMsg)
UpdateMsg
received from remote replication entities will be replayed,
by dispatching it to the replay threads.updateMsg
- The UpdateMsg
to replay.protected void processUpdateAfterReplay(UpdateMsg msg, String replayErrorMsg)
dispatchUpdateForReplay(UpdateMsg)
when the processing of
the update is completed.
It is useful for implementation needing to process the update in an asynchronous way or using several threads, but must be called even by implementation doing it in a synchronous, single-threaded way.
msg
- The UpdateMsg whose processing was completed.replayErrorMsg
- if not null, this means an error occurred during the replay of this update, and this is the matchingprotected void prepareWaitForAckIfAssuredEnabled(UpdateMsg msg)
If the assured mode is enabled, this method should be called before publish(UpdateMsg)
method.
This will configure the update accordingly before it is sent and will prepare the mechanism
that will block until the matching ack is received.
To wait for the ack after publish call, use the waitForAckIfAssuredEnabled(UpdateMsg)
method.
The expected typical usage in a service inheriting from this class is the following sequence:
UpdateMsg msg = xxx;
prepareWaitForAckIfAssuredEnabled(msg);
publish(msg);
waitForAckIfAssuredEnabled(msg);
Note: prepareWaitForAckIfAssuredEnabled(UpdateMsg)
and waitForAckIfAssuredEnabled(UpdateMsg)
have no effect if assured replication is disabled.
Note: this mechanism should not be used if using publish(UpdateMsg)
as usage of these methods is already
hidden inside.
msg
- The update message to be sent soon.protected void waitForAckIfAssuredEnabled(UpdateMsg msg) throws TimeoutException
msg
- The UpdateMsg for which we are waiting for an ack.TimeoutException
- When the configured timeout occurs waiting for the ack.public void publish(UpdateMsg msg)
UpdateMsg
to the Replication Service.
The Replication Service will handle the delivery of this UpdateMsg
to all the participants of this
Replication Domain. These members will be receive this UpdateMsg
through a call of the
dispatchUpdateForReplay(UpdateMsg)
message.
msg
- The UpdateMsg that should be published.public void publishReplicaOfflineMsg()
public long getGenerationID()
public void setGenerationID(long generationId)
generationId
- the generationId to setpublic void addAdditionalMonitoring(MeterRegistryHolder registry)
registry
- where to additional monitoring attributesprotected ReplicationDomain.ImportExportContext getImportExportContext()
public boolean setEclIncludes(int serverId, Set<String> includeAttributes, Set<String> includeAttributesForDeletes)
serverId
- Server where these attributes are configured.includeAttributes
- Attributes to be included with all change records, may include wild-cards.includeAttributesForDeletes
- Additional attributes to be included with delete change records, may include wild-cards.true
if the set of attributes was modified.public Set<String> getEclIncludes()
public Set<String> getEclIncludesForDeletes()
public CSN getLastLocalChange()
protected void readAssuredConfig(ReplicationDomainCfg config, boolean allowReconnection)
config
- The configuration objectallowReconnection
- Tells if one must reconnect if significant changes occurredCopyright 2010-2018 ForgeRock AS.