public class DataServerHandler extends MonitorProvider
Modifier and Type | Field and Description |
---|---|
protected long |
generationId
Remote generation id.
|
protected byte |
groupId
Group id of this remote server.
|
protected long |
heartbeatInterval
The time in milliseconds between heartbeats from the replication server.
|
protected HostPort |
hostPort
The host port of the remote server.
|
protected ReplicationServer |
replicationServer
Local hosting RS.
|
protected ReplicationServerDomain |
replicationServerDomain
Specifies the related replication server domain based on baseDN.
|
protected int |
serverId
The serverId of the remote server.
|
protected Session |
session
The session opened with the remote server.
|
protected boolean |
sslEncryption
The SSL encryption after the negotiation with the peer.
|
protected int |
weight
Weight of this remote server.
|
registry
Constructor and Description |
---|
DataServerHandler(Session session,
ReplicationServer replicationServer)
Creates a new data server handler.
|
Modifier and Type | Method and Description |
---|---|
protected void |
abortStart(LocalizableMessage reason)
Abort a start procedure currently establishing.
|
protected void |
addMonitorObjectClassNames(Attribute objectClassAttribute)
Adds to the provided attribute the objectclass names that should be included in the monitor entry created from
this monitor provider.
|
void |
bindTo(MeterRegistry parent) |
ServerStatus |
changeStatus(StatusMachineEvent event)
Change the status according to the event.
|
void |
changeStatusForResetGenId(long newGenId)
Order the peer DS server to change his status or close the connection according to the requested new generation
id.
|
void |
checkWindow()
Check the protocol window and send WindowMsg if necessary.
|
protected Dn |
computeMonitorInstanceDn()
Returns the monitor instance DN for this monitor provider.
|
protected ReplServerStartMsg |
createReplServerStartMsg()
Creates a ReplServerStartMsg for the current ServerHandler.
|
void |
doStop()
Stop this handler.
|
int |
estimateNumberOfUnsentMessages()
Returns an estimation of the number of messages not yet sent.
|
protected void |
finalizeStart()
Finalize the initialization, create reader, writer, heartbeat system and monitoring system.
|
long |
getApproxFirstMissingDate()
Get the age of the older change that has not yet been replicated to the server handled by this ServerHandler.
|
LocalizableMessage |
getBadlyDisconnectedErrorMessage()
Returns a "badly disconnected" error message for this server handler.
|
protected Dn |
getBaseDN()
Get the baseDN for this handler.
|
ReplicationServerDomain |
getDomain()
Returns the Replication Server Domain to which belongs this server handler.
|
long |
getGenerationId()
Returns the value of generationId for that handler.
|
byte |
getGroupId()
Gets the group id of the server represented by this object.
|
long |
getHeartbeatInterval()
Get our heartbeat interval.
|
byte |
getLocalGroupId()
Get the groupId of the hosting RS.
|
protected UpdateMsg |
getNextMessage()
Get the next update that must be sent to the consumer from the changelog database.
|
CSN |
getOlderUpdateCSN()
Get the older CSN for that server.
|
short |
getProtocolVersion()
Gets the protocol version used with this remote server.
|
long |
getReferenceGenId()
Get the reference generation id (associated with the changes in the db).
|
HostPort |
getReplicationServerHostPort()
Get the host port of the hosting replication server.
|
int |
getReplicationServerId()
Get the serverId of the hosting replication server.
|
int |
getServerId()
Get the Server Id.
|
ServerState |
getServerState()
Get the state of this server.
|
ServerStatus |
getStatus()
Gets the status of the connected DS.
|
void |
incrementAssuredSdReceivedUpdates()
Increment the number of updates received from the server in assured safe data mode.
|
void |
incrementAssuredSdReceivedUpdatesTimeout()
Increment the number of updates received from the server in assured safe data mode that timed out.
|
void |
incrementAssuredSdSentUpdatesTimeout()
Increment the number of updates sent to the server in assured safe data mode that timed out.
|
void |
incrementAssuredSrReceivedUpdates()
Increment the number of updates received from the server in assured safe read mode.
|
void |
incrementAssuredSrReceivedUpdatesTimeout()
Increment the number of updates received from the server in assured safe read mode that timed out.
|
void |
incrementAssuredSrSentUpdatesTimeout()
Increment the number of updates sent to the server in assured safe read mode that timed out.
|
boolean |
isDataServer()
Check if the server associated to this ServerHandler is a data server in the topology.
|
boolean |
isReplicationServer()
Check if the server associated to this ServerHandler is a replication server.
|
void |
lockDomainNoTimeout()
Lock the domain without a timeout.
|
void |
lockDomainWithTimeout()
Lock the domain with a timeout.
|
protected void |
logStartHandshakeRCVandSND(StartMsg inStartMsg,
StartMsg outStartMsg)
Log the messages involved in the start handshake.
|
protected void |
logStartHandshakeSNDandRCV(StartMsg outStartMsg,
StartMsg inStartMsg)
Log the messages involved in the start handshake.
|
protected void |
logStartSessionHandshake(StartSessionMsg inStartSessionMsg,
TopologyMsg outTopoMsg)
Log the messages involved in the Topology/StartSession handshake.
|
protected void |
logStopReceived()
Log stop message has been received.
|
protected void |
logTopoHandshakeRCVandSND(TopologyMsg inTopoMsg,
TopologyMsg outTopoMsg)
Log the messages involved in the Topology handshake.
|
protected void |
logTopoHandshakeSNDandRCV(TopologyMsg outTopoMsg,
TopologyMsg inTopoMsg)
Log the messages involved in the Topology handshake.
|
ServerStatus |
processNewStatus(ChangeStatusMsg csMsg)
Process message of a remote server changing his status.
|
boolean |
processStartFromRemote(ServerStartMsg serverStartMsg)
Processes a start message received from a remote data server.
|
void |
put(UpdateMsg update)
Put a new update message received.
|
void |
receiveNewStatus(ChangeStatusMsg csMsg)
Process message of a remote server changing his status.
|
protected void |
releaseDomainLock()
Releases the lock on the replication server domain if it was held.
|
void |
replyToWindowProbe()
Process the reception of a WindowProbeMsg message.
|
void |
send(ReplicationMsg msg)
Sends a message.
|
void |
sendTopoInfo(TopologyMsg topoMsg)
Sends the provided TopologyMsg to the peer server.
|
protected void |
setBaseDNAndDomain(Dn baseDN,
boolean isDataServer)
Set the baseDN for this handler.
|
void |
setConsumerActive(boolean active)
Set that the consumer is now becoming inactive and thus getNextMessage should not return any UpdateMsg any more.
|
void |
setGenerationId(long generationId)
Set a new generation ID.
|
void |
setInitialServerState(ServerState serverState)
Set the initial value of the serverState for this handler.
|
void |
shutdown()
Shutdown This ServerHandler.
|
void |
startFromRemoteDS(ServerStartMsg inServerStartMsg)
Starts the handler from a remote ServerStart message received from the remote data server.
|
UpdateMsg |
take()
Select the next update that must be sent to the server managed by this ServerHandler.
|
DSInfo |
toDSInfo()
Creates a DSInfo structure representing this remote DS.
|
RSInfo |
toRSInfo()
Creates a RSInfo structure representing this remote RS.
|
String |
toString() |
configurationDnToMonitorDn, deregisterAll, getMonitorEntry, getMonitorInstanceDn
protected int serverId
protected final Session session
protected HostPort hostPort
protected long generationId
protected byte groupId
protected boolean sslEncryption
protected long heartbeatInterval
protected int weight
protected final ReplicationServer replicationServer
protected ReplicationServerDomain replicationServerDomain
public DataServerHandler(Session session, ReplicationServer replicationServer)
session
- The session opened with the remote data server.replicationServer
- The hosting RS.public void changeStatusForResetGenId(long newGenId) throws IOException
newGenId
- The new generation id to take into accountIOException
- If IO error occurred.public ServerStatus changeStatus(StatusMachineEvent event) throws IOException
event
- The event to be used for new status computationIOException
- When raised by the underlying sessionprotected Dn computeMonitorInstanceDn()
MonitorProvider
computeMonitorInstanceDn
in class MonitorProvider
public void bindTo(MeterRegistry parent)
bindTo
in interface MeterBinder
protected void addMonitorObjectClassNames(Attribute objectClassAttribute)
MonitorProvider
addMonitorObjectClassNames
in class MonitorProvider
objectClassAttribute
- the objectClass attribute where to add the object class namespublic ServerStatus getStatus()
public boolean isDataServer()
public ServerStatus processNewStatus(ChangeStatusMsg csMsg)
csMsg
- The message containing the new statuspublic boolean processStartFromRemote(ServerStartMsg serverStartMsg) throws LdapException
serverStartMsg
- The provided start message received.LdapException
- raised when an error occurs.public void startFromRemoteDS(ServerStartMsg inServerStartMsg)
inServerStartMsg
- The provided ServerStart message received.public DSInfo toDSInfo()
public String toString()
toString
in class MonitorProvider
public void receiveNewStatus(ChangeStatusMsg csMsg)
csMsg
- The message containing the new statusprotected void abortStart(LocalizableMessage reason)
reason
- The provided reason.protected void releaseDomainLock()
public void checkWindow() throws IOException
IOException
- when the session becomes unavailable.protected void finalizeStart() throws LdapException
LdapException
- When an exception is raised.public void send(ReplicationMsg msg) throws IOException
msg
- The message to be sent.IOException
- When it occurs while sending the message,public long getApproxFirstMissingDate()
public ReplicationServerDomain getDomain()
public long getGenerationId()
public byte getGroupId()
public long getHeartbeatInterval()
public short getProtocolVersion()
public int getServerId()
public void incrementAssuredSdReceivedUpdates()
public void incrementAssuredSdReceivedUpdatesTimeout()
public void incrementAssuredSdSentUpdatesTimeout()
public void incrementAssuredSrReceivedUpdates()
public void incrementAssuredSrReceivedUpdatesTimeout()
public void incrementAssuredSrSentUpdatesTimeout()
public boolean isReplicationServer()
public void lockDomainNoTimeout() throws InterruptedException
If domain already exists, lock it until handshake is finished otherwise it will be created and locked later in the method
InterruptedException
- If the current thread was interrupted while waiting for the lock.public void lockDomainWithTimeout() throws LdapException, InterruptedException
Take the lock on the domain. WARNING: Here we try to acquire the lock with a timeout. This is for preventing a deadlock that may happen if there are cross connection attempts (for same domain) from this replication server and from a peer one.
Here is the scenario:
To prevent threads locking in such situation, the listen threads here will both timeout trying to acquire the lock. The random time for the timeout should allow on connection attempt to be aborted whereas the other one should have time to finish in the same time.
Warning: the minimum time (3s) should be big enough to allow normal situation connections to terminate. The added random time should represent a big enough range so that the chance to have one listen thread timing out a lot before the peer one is great. When the first listen thread times out, the remote connect thread should release the lock and allow the peer listen thread to take the lock it was waiting for and process the connection attempt.
LdapException
- When an exception occurs.InterruptedException
- If the current thread was interrupted while waiting for the lock.public void replyToWindowProbe() throws IOException
IOException
- When the session becomes unavailable.public void sendTopoInfo(TopologyMsg topoMsg) throws IOException
topoMsg
- The TopologyMsg message to be sent.IOException
- When it occurs while sending the message,public void setGenerationId(long generationId)
generationId
- The new generation IDpublic void shutdown()
public UpdateMsg take() throws ChangelogException
ChangelogException
- If a problem occurs when reading the changelogpublic RSInfo toRSInfo()
protected void logStartHandshakeRCVandSND(StartMsg inStartMsg, StartMsg outStartMsg)
inStartMsg
- The message received first.outStartMsg
- The message sent in response.protected void logStartHandshakeSNDandRCV(StartMsg outStartMsg, StartMsg inStartMsg)
outStartMsg
- The message sent first.inStartMsg
- The message received in response.protected void logTopoHandshakeRCVandSND(TopologyMsg inTopoMsg, TopologyMsg outTopoMsg)
inTopoMsg
- The message received first.outTopoMsg
- The message sent in response.protected void logTopoHandshakeSNDandRCV(TopologyMsg outTopoMsg, TopologyMsg inTopoMsg)
outTopoMsg
- The message sent first.inTopoMsg
- The message received in response.protected void logStartSessionHandshake(StartSessionMsg inStartSessionMsg, TopologyMsg outTopoMsg)
inStartSessionMsg
- The message received first.outTopoMsg
- The message sent in response.protected void logStopReceived()
public long getReferenceGenId()
public void put(UpdateMsg update) throws IOException
update
- the update message received.IOException
- when it occurs.public void doStop()
protected ReplServerStartMsg createReplServerStartMsg()
public LocalizableMessage getBadlyDisconnectedErrorMessage()
protected UpdateMsg getNextMessage() throws ChangelogException
null
when there is no updateChangelogException
- If a problem occurs when reading the changelogpublic CSN getOlderUpdateCSN()
public int estimateNumberOfUnsentMessages()
public ServerState getServerState()
protected Dn getBaseDN()
public void setConsumerActive(boolean active)
active
- the provided state of the consumer.public void setInitialServerState(ServerState serverState) throws LdapException
serverState
- the provided serverState.LdapException
- raised when a problem occurs.protected void setBaseDNAndDomain(Dn baseDN, boolean isDataServer) throws LdapException
baseDN
- The provided baseDN.isDataServer
- The handler is a dataServerLdapException
- raised when a problem occurs.public byte getLocalGroupId()
public int getReplicationServerId()
public HostPort getReplicationServerHostPort()
Copyright 2010-2018 ForgeRock AS.