public abstract class ReplicationDomain extends Object
It is intended that developer in need of a replication mechanism subclass this class with their own implementation.
The startup phase of the ReplicationDomain subclass, should read the list of replication servers from the
configuration, instantiate a ServerState
then start the publish service by calling
startPublishService()
. At this point it can start calling the #publish(UpdateMsg)
method if needed.
When the startup phase reach the point when the subclass is ready to handle updates the Replication Domain
implementation should call the startListenService()
method. At this point a Listener thread is created on
the Replication Service and which can start receiving updates.
When updates are received the Replication Service calls the dispatchUpdateForReplay(UpdateMsg)
method.
ReplicationDomain implementation should implement the appropriate code for replaying the update on the local
repository. When fully done the subclass must call the processUpdateAfterReplay(UpdateMsg)
method.
This allows to process the update asynchronously if necessary.
To propagate changes to other replica, a ReplicationDomain implementation must use the #publish(UpdateMsg)
method.
If the Full Initialization process is needed then implementation for importBackend(InputStream)
and
exportBackend(OutputStream)
must be provided.
Full Initialization of a replica can be triggered by LDAP clients by creating InitializeTasks or
InitializeTargetTask. Full initialization can also be triggered from the ReplicationDomain implementation using
methods #initializeRemote(int, Task)
or #initializeFromRemote(int, Task)
.
At shutdown time, the disableService()
method should be called to cleanly stop the replication service.
Modifier and Type | Class and Description |
---|---|
protected static class |
ReplicationDomain.ImportExportContext
This class contains the context related to an import or export launched on the domain.
|
Modifier and Type | Field and Description |
---|---|
protected ReplicationBroker |
broker
The ReplicationBroker that is used by this ReplicationDomain to connect to the ReplicationService.
|
protected ReplicationDomainCfg |
config
The configuration of the replication domain.
|
protected long |
generationId
The generationId for this replication domain.
|
Constructor and Description |
---|
ReplicationDomain(ReplicationContext replicationContext,
ReplicationDomainCfg config,
long generationId)
Creates a ReplicationDomain with the provided parameters.
|
ReplicationDomain(ReplicationContext replicationContext,
ReplicationDomainCfg config,
long generationId,
ServerState serverState)
Creates a ReplicationDomain with the provided parameters.
|
Modifier and Type | Method and Description |
---|---|
void |
addAdditionalMonitoring(MeterRegistryHolder registry)
Subclasses should use this method to add additional monitoring information in the ReplicationDomain.
|
protected void |
changeConfig(ReplicationDomainCfg config)
Change some ReplicationDomain parameters.
|
void |
changeConfig(Set<String> includeAttributes,
Set<String> includeAttributesForDeletes)
Applies a configuration change to the attributes which should be included in the ECL.
|
abstract long |
countEntries()
This method should return the total number of objects in the replicated domain.
|
ReplicaId |
decodeTarget(String targetString)
Verifies that the given string represents a valid source from which this server can be initialized.
|
void |
disableService()
Temporarily disable the Replication Service.
|
abstract void |
dispatchUpdateForReplay(UpdateMsg updateMsg)
This method ensures this
UpdateMsg received from remote replication entities will be replayed,
by dispatching it to the replay threads. |
void |
enableService()
Restart the Replication service after a
disableService() . |
protected abstract void |
exportBackend(OutputStream output)
This method should trigger an export of the replicated data.
|
Dn |
getBaseDN()
Returns the base DN of this ReplicationDomain.
|
CSNGenerator |
getCsnGenerator()
Returns the
CSNGenerator that will be used to generate CSN for this domain. |
Set<String> |
getEclIncludes()
Get the attributes to include in each change for the ECL.
|
Set<String> |
getEclIncludesForDeletes()
Get the attributes to include in each delete change for the ECL.
|
long |
getGenerationID()
This method should return the generationID to use for this ReplicationDomain.
|
GroupId |
getGroupId()
Gets the group id for this domain.
|
HealthStatus |
getHealthStatus(long delayThreshold)
Returns the health status based on the current replication delay.
|
protected ReplicationDomain.ImportExportContext |
getImportExportContext()
Returns the Import/Export context associated to this ReplicationDomain.
|
CSN |
getLastLocalChange()
Returns the CSN of the last Change that was fully processed by this ReplicationDomain.
|
Entry |
getMonitorEntry()
Returns the monitor entry for this replication domain.
|
Set<String> |
getRefUrls()
Gets the referrals URLs this domain publishes.
|
ReplicaId |
getReplicaId()
Get the replica ID which identifies this Replication Domain inside the Replication Service.
|
Map<ReplicaId,DSInfo> |
getReplicaInfos()
Gets the info for Replicas in the topology (except us).
|
Map<ReplicaId,ServerState> |
getReplicaStates()
Gets the States of all the Replicas currently in the Topology.
|
ReplicationContext |
getReplicationContext()
Returns the replication context.
|
ReplicationServerId |
getReplicationServerId()
Gets the server ID of the Replication Server to which the domain is currently connected.
|
int |
getReplicationServerPort()
Get the port of the replicationServer to which this domain is currently connected.
|
List<RSInfo> |
getRsInfos()
Gets the info for RSs in the topology (except the one we are connected to).
|
ServerState |
getServerState()
Get the ServerState maintained by the Concrete class.
|
ServerStatus |
getStatus()
Gets the status for this domain.
|
boolean |
hasConnectionError()
Check if the domain has a connection error.
|
boolean |
ieRunning()
Returns a boolean indicating if an import or export is currently processed.
|
protected abstract void |
importBackend(InputStream input)
This method should trigger an import of the replicated data.
|
void |
initializeFromRemote(ReplicaId source,
Task initTask)
Initializes asynchronously this domain from a remote source server.
|
protected void |
initializeRemote(ReplicaId replicaToInitialize,
ReplicaId replicaRunningTheTask,
Task initTask,
int initWindow)
Process the initialization of some other server or servers in the topology specified by the target argument when
this initialization specifying the server that requests the initialization.
|
void |
initializeRemote(ReplicaId target,
Task initTask)
Initializes a remote server from this server.
|
boolean |
isConnected()
Check if the domain is connected to a ReplicationServer.
|
protected boolean |
isListenerShuttingDown()
Returns
true if the listener thread is shutting down or has shutdown. |
protected void |
processUpdateAfterReplay(UpdateMsg msg)
This method must be called after each call to
dispatchUpdateForReplay(UpdateMsg) when the processing of
the update is completed. |
void |
publish(ReplicationMsg msg)
Publish an
UpdateMsg to the Replication Service. |
void |
publishHeartbeatMsg()
Publishes a heartbeat message if all pending changes for current replica have been sent out.
|
void |
publishReplicaOfflineMsg()
Publishes a replica offline message if all pending changes for current replica have been sent out.
|
protected byte[] |
receiveEntryBytes()
Receives bytes related to an entry in the context of an import to initialize the domain (called by
ReplLDIFInputStream).
|
protected void |
recreateRemoteReplicasFromState()
Rebuilds Remote DS lists after a reset of the domain state, either at startup or domain re-initialization.
|
void |
resetGenerationId(Long generationIdNewValue)
Reset the generationId of this domain in the whole topology.
|
void |
sessionInitiated(ServerStatus initStatus,
ServerState rsState)
Set the initial status of the domain and perform necessary initializations.
|
boolean |
setEclIncludes(ReplicaId replicaId,
Set<String> includeAttributes,
Set<String> includeAttributesForDeletes)
Set the attributes configured on a server to be included in the ECL.
|
void |
setGenerationID(long generationId)
Sets the generationId for this replication domain.
|
protected void |
signalNewStatus(StatusMachineEvent event)
Sets the status to a new value depending of the passed status machine event.
|
void |
startListenService()
Starts the receiver side of the Replication Service.
|
void |
startPublishService()
Start the publish mechanism of the Replication Service.
|
String |
toString() |
protected void |
updateState()
Update the server state if necessary.
|
protected volatile ReplicationDomainCfg config
protected ReplicationBroker broker
protected volatile long generationId
public ReplicationDomain(ReplicationContext replicationContext, ReplicationDomainCfg config, long generationId)
replicationContext
- The replication contextconfig
- The configuration object for this ReplicationDomaingenerationId
- the generation of this ReplicationDomainpublic ReplicationDomain(ReplicationContext replicationContext, ReplicationDomainCfg config, long generationId, ServerState serverState)
replicationContext
- The replication contextconfig
- The configuration object for this ReplicationDomaingenerationId
- the generation of this ReplicationDomainserverState
- The serverState to usepublic HealthStatus getHealthStatus(long delayThreshold)
delayThreshold
- The maximum replication delay in milliseconds for considering this replication domain healthy.public CSNGenerator getCsnGenerator()
CSNGenerator
that will be used to generate CSN
for this domain.CSNGenerator
that will be used to generate CSN
for this domain.public void sessionInitiated(ServerStatus initStatus, ServerState rsState)
initStatus
- The status to enter the state machine with.rsState
- The ServerState of the ReplicationServer with which the session was established.public ServerStatus getStatus()
public Dn getBaseDN()
public ReplicaId getReplicaId()
public ReplicationContext getReplicationContext()
public GroupId getGroupId()
public Set<String> getRefUrls()
TODO: fill that with all currently opened urls if no urls configured
public Map<ReplicaId,DSInfo> getReplicaInfos()
public Map<ReplicaId,ServerState> getReplicaStates()
When this method is called, a Monitoring message will be sent to the Replication Server to which this domain is currently connected so that it computes a table containing information about all Directory Servers in the topology. This Computation involves communications will all the servers currently connected.
public List<RSInfo> getRsInfos()
public ReplicationServerId getReplicationServerId()
protected void recreateRemoteReplicasFromState()
protected void updateState()
public ReplicaId decodeTarget(String targetString) throws LdapException
targetString
- The string representing the sourceLdapException
- if the string is not validpublic void initializeRemote(ReplicaId target, Task initTask) throws LdapException
The exportBackend(OutputStream)
will therefore be called on this server, and the
importBackend(InputStream)
will be called on the remote server.
The InputStream and OutputStream given as a parameter to those methods will be connected through the replication protocol.
target
- The server-id of the server that should be initialized. The target can be discovered using the
getReplicaInfos()
method.initTask
- The task that triggers this initialization and that should be updated with its progress.LdapException
- If it was not possible to publish the Initialization message to the Topology.protected void initializeRemote(ReplicaId replicaToInitialize, ReplicaId replicaRunningTheTask, Task initTask, int initWindow) throws LdapException
replicaToInitialize
- The target replica that should be initialized.replicaRunningTheTask
- The replica that initiated the export. It can be the replica id of this server,
or the replica id of a remote replica.initTask
- The task in this server that triggers this initialization and that should be updated with its
progress. Null when the export is done following a request coming from a remote server (task is
remote).initWindow
- The value of the initialization window for flow control between the importer and the exporter.LdapException
- When an error occurs. No exception raised means success.public ServerState getServerState()
protected byte[] receiveEntryBytes()
public void initializeFromRemote(ReplicaId source, Task initTask) throws LdapException
When this method is called, a request for initialization is sent to the remote source server requesting initialization.
source
- The server-id of the source from which to initialize. The source can be discovered using the
getReplicaInfos()
method.initTask
- The task that launched the initialization and should be updated of its progress.LdapException
- If it was not possible to publish the Initialization message to the Topology. The task state is
updated.protected void signalNewStatus(StatusMachineEvent event)
event
- The event that may make the status be changedpublic boolean ieRunning()
public void resetGenerationId(Long generationIdNewValue) throws LdapException
generationIdNewValue
- The new value of the generation Id.LdapException
- When an error occurspublic boolean isConnected()
public boolean hasConnectionError()
public int getReplicationServerPort()
public Entry getMonitorEntry()
public void startPublishService()
#publish(UpdateMsg)
method.public void startListenService()
After this method has been called, the Replication Service will start calling the
dispatchUpdateForReplay(UpdateMsg)
.
This method must be called once and must be called after the startPublishService()
.
public void disableService()
enableService()
.
It can be useful to disable the Replication Service when the repository where the replicated information is stored becomes temporarily unavailable and replicated updates can therefore not be replayed during a while. This method is not MT safe.
protected final boolean isListenerShuttingDown()
true
if the listener thread is shutting down or has shutdown.true
if the listener thread is shutting down or has shutdown.public void enableService()
disableService()
.
The Replication Service will restart from the point indicated by the ServerState
that was given as a
parameter to the startPublishService()
at startup time.
If some data have changed in the repository during the period of time when the Replication Service was disabled,
this ServerState
should therefore be updated by the Replication Domain subclass before calling this
method. This method is not MT safe.
protected void changeConfig(ReplicationDomainCfg config)
config
- The new configuration that this domain should now use.public void changeConfig(Set<String> includeAttributes, Set<String> includeAttributesForDeletes)
includeAttributes
- attributes to be included with all change records.includeAttributesForDeletes
- additional attributes to be included with delete change records.protected abstract void exportBackend(OutputStream output) throws LdapException
output
- The OutputStream where the export should be produced.LdapException
- When needed.protected abstract void importBackend(InputStream input) throws LdapException
input
- The InputStream from which the import should be reading entries.LdapException
- When needed.public abstract long countEntries() throws LdapException
LdapException
- when needed.public abstract void dispatchUpdateForReplay(UpdateMsg updateMsg)
UpdateMsg
received from remote replication entities will be replayed,
by dispatching it to the replay threads.updateMsg
- The UpdateMsg
to replay.protected final void processUpdateAfterReplay(UpdateMsg msg)
dispatchUpdateForReplay(UpdateMsg)
when the processing of
the update is completed.
It is useful for implementation needing to process the update in an asynchronous way or using several threads, but must be called even by implementation doing it in a synchronous, single-threaded way.
msg
- The UpdateMsg whose processing was completed.public void publish(ReplicationMsg msg)
UpdateMsg
to the Replication Service.
The Replication Service will handle the delivery of this UpdateMsg
to all the participants of this
Replication Domain. These members will be receive this UpdateMsg
through a call of the
dispatchUpdateForReplay(UpdateMsg)
message.
msg
- The UpdateMsg that should be published.public void publishHeartbeatMsg()
public void publishReplicaOfflineMsg()
public long getGenerationID()
public void setGenerationID(long generationId)
generationId
- the generationId to setpublic void addAdditionalMonitoring(MeterRegistryHolder registry)
registry
- where to additional monitoring attributesprotected ReplicationDomain.ImportExportContext getImportExportContext()
public boolean setEclIncludes(ReplicaId replicaId, Set<String> includeAttributes, Set<String> includeAttributesForDeletes)
replicaId
- Server where these attributes are configured.includeAttributes
- Attributes to be included with all change records, may include wild-cards.includeAttributesForDeletes
- Additional attributes to be included with delete change records, may include wild-cards.true
if the set of attributes was modified.public Set<String> getEclIncludes()
public Set<String> getEclIncludesForDeletes()
public CSN getLastLocalChange()
Copyright 2010-2022 ForgeRock AS.