public abstract class ReplicationDomain extends Object
It is intended that developer in need of a replication mechanism subclass this class with their own implementation.
The startup phase of the ReplicationDomain subclass,
should read the list of replication servers from the configuration,
instantiate a ServerState
then start the publish service
by calling startPublishService()
.
At this point it can start calling the publish(UpdateMsg)
method if needed.
When the startup phase reach the point when the subclass is ready
to handle updates the Replication Domain implementation should call the
startListenService()
method.
At this point a Listener thread is created on the Replication Service
and which can start receiving updates.
When updates are received the Replication Service calls the
processUpdate(UpdateMsg)
method.
ReplicationDomain implementation should implement the appropriate code
for replaying the update on the local repository.
When fully done the subclass must call the
processUpdateDone(UpdateMsg, String)
method.
This allows to process the update asynchronously if necessary.
To propagate changes to other replica, a ReplicationDomain implementation
must use the publish(UpdateMsg)
method.
If the Full Initialization process is needed then implementation
for importBackend(InputStream)
and
exportBackend(OutputStream)
must be
provided.
Full Initialization of a replica can be triggered by LDAP clients
by creating InitializeTasks or InitializeTargetTask.
Full initialization can also be triggered from the ReplicationDomain
implementation using methods initializeRemote(int, Task)
or initializeFromRemote(int, Task)
.
At shutdown time, the disableService()
method should be called to
cleanly stop the replication service.
Modifier and Type | Class and Description |
---|---|
protected static class |
ReplicationDomain.ImportExportContext
This class contains the context related to an import or export launched on the domain.
|
Modifier and Type | Field and Description |
---|---|
protected ReplicationBroker |
broker
The ReplicationBroker that is used by this ReplicationDomain to
connect to the ReplicationService.
|
protected org.forgerock.opendj.server.config.server.ReplicationDomainCfg |
config
The configuration of the replication domain.
|
protected long |
generationId
The generationId for this replication domain.
|
Constructor and Description |
---|
ReplicationDomain(org.forgerock.opendj.server.config.server.ReplicationDomainCfg config,
long generationId)
Creates a ReplicationDomain with the provided parameters.
|
ReplicationDomain(org.forgerock.opendj.server.config.server.ReplicationDomainCfg config,
long generationId,
ServerState serverState)
Creates a ReplicationDomain with the provided parameters.
|
Modifier and Type | Method and Description |
---|---|
void |
addAdditionalMonitoring(MonitorData monitorData)
Subclasses should use this method to add additional monitoring information
in the ReplicationDomain.
|
protected void |
changeConfig(org.forgerock.opendj.server.config.server.ReplicationDomainCfg config)
Change some ReplicationDomain parameters.
|
void |
changeConfig(Set<String> includeAttributes,
Set<String> includeAttributesForDeletes)
Applies a configuration change to the attributes which should be included
in the ECL.
|
abstract long |
countEntries()
This method should return the total number of objects in the
replicated domain.
|
int |
decodeTarget(String targetString)
Verifies that the given string represents a valid source
from which this server can be initialized.
|
void |
disableService()
Temporarily disable the Replication Service.
|
void |
enableService()
Restart the Replication service after a
disableService() . |
protected abstract void |
exportBackend(OutputStream output)
This method should trigger an export of the replicated data.
|
AssuredMode |
getAssuredMode()
Gives the mode for the assured replication of the domain.
|
int |
getAssuredSdAcknowledgedUpdates()
Gets the number of updates sent in assured safe data mode that have been
acknowledged without errors.
|
byte |
getAssuredSdLevel()
Gives the assured Safe Data level of the replication of the domain.
|
int |
getAssuredSdSentUpdates()
Gets the number of updates sent in assured safe data mode.
|
Map<Integer,Integer> |
getAssuredSdServerTimeoutUpdates()
Gets the number of updates sent in assured safe data mode that have not
been acknowledged due to timeout error per server.
|
int |
getAssuredSdTimeoutUpdates()
Gets the number of updates sent in assured safe data mode that have not
been acknowledged due to timeout error.
|
int |
getAssuredSrAcknowledgedUpdates()
Gets the number of updates sent in assured safe read mode that have been
acknowledged without errors.
|
int |
getAssuredSrNotAcknowledgedUpdates()
Gets the number of updates sent in assured safe read mode that have not
been acknowledged.
|
int |
getAssuredSrReceivedUpdates()
Gets the number of updates received in assured safe read mode request.
|
int |
getAssuredSrReceivedUpdatesAcked()
Gets the number of updates received in assured safe read mode that we acked
without error (no replay error).
|
int |
getAssuredSrReceivedUpdatesNotAcked()
Gets the number of updates received in assured safe read mode that we did
not ack due to error (replay error).
|
int |
getAssuredSrReplayErrorUpdates()
Gets the number of updates sent in assured safe read mode that have not
been acknowledged due to replay error.
|
int |
getAssuredSrSentUpdates()
Gets the number of updates sent in assured safe read mode.
|
Map<Integer,Integer> |
getAssuredSrServerNotAcknowledgedUpdates()
Gets the number of updates sent in assured safe read mode that have not
been acknowledged per server.
|
int |
getAssuredSrTimeoutUpdates()
Gets the number of updates sent in assured safe read mode that have not
been acknowledged due to timeout error.
|
int |
getAssuredSrWrongStatusUpdates()
Gets the number of updates sent in assured safe read mode that have not
been acknowledged due to wrong status error.
|
long |
getAssuredTimeout()
Gives the assured timeout of the replication of the domain (in ms).
|
org.forgerock.opendj.ldap.Dn |
getBaseDN()
Returns the base DN of this ReplicationDomain.
|
Set<String> |
getEclIncludes()
Get the attributes to include in each change for the ECL.
|
Set<String> |
getEclIncludesForDeletes()
Get the attributes to include in each delete change for the ECL.
|
long |
getGenerationID()
This method should return the generationID to use for this
ReplicationDomain.
|
CSNGenerator |
getGenerator()
Returns the
CSNGenerator that will be used to
generate CSN for this domain. |
byte |
getGroupId()
Gets the group id for this domain.
|
protected ReplicationDomain.ImportExportContext |
getImportExportContext()
Returns the Import/Export context associated to this ReplicationDomain.
|
CSN |
getLastLocalChange()
Returns the CSN of the last Change that was fully processed by this
ReplicationDomain.
|
Date |
getLastStatusChangeDate()
Gets the date of the last status change.
|
Set<String> |
getRefUrls()
Gets the referrals URLs this domain publishes.
|
Map<Integer,DSInfo> |
getReplicaInfos()
Gets the info for Replicas in the topology (except us).
|
Map<Integer,ServerState> |
getReplicaStates()
Gets the States of all the Replicas currently in the
Topology.
|
HostPort |
getReplicationServer()
Get the name of the replicationServer to which this domain is currently
connected.
|
List<RSInfo> |
getRsInfos()
Gets the info for RSs in the topology (except the one we are connected
to).
|
int |
getRsServerId()
Gets the server ID of the Replication Server to which the domain
is currently connected.
|
int |
getServerId()
Get the server ID.
|
ServerState |
getServerState()
Get the ServerState maintained by the Concrete class.
|
ServerStatus |
getStatus()
Gets the status for this domain.
|
boolean |
hasConnectionError()
Check if the domain has a connection error.
|
boolean |
ieRunning()
Returns a boolean indicating if an import or export is currently
processed.
|
protected abstract void |
importBackend(InputStream input)
This method should trigger an import of the replicated data.
|
void |
initializeFromRemote(int source,
Task initTask)
Initializes asynchronously this domain from a remote source server.
|
protected void |
initializeRemote(int serverToInitialize,
int serverRunningTheTask,
Task initTask,
int initWindow)
Process the initialization of some other server or servers in the topology
specified by the target argument when this initialization specifying the
server that requests the initialization.
|
void |
initializeRemote(int target,
Task initTask)
Initializes a remote server from this server.
|
boolean |
isAssured()
Tells if assured replication is enabled for this domain.
|
boolean |
isConnected()
Check if the domain is connected to a ReplicationServer.
|
protected boolean |
isListenerShuttingDown()
Returns
true if the listener thread is shutting down or has
shutdown. |
protected void |
prepareWaitForAckIfAssuredEnabled(UpdateMsg msg)
Prepare a message if it is to be sent in assured mode.
|
abstract boolean |
processUpdate(UpdateMsg updateMsg)
This method should handle the processing of
UpdateMsg receive from
remote replication entities. |
protected void |
processUpdateDone(UpdateMsg msg,
String replayErrorMsg)
This method must be called after each call to
processUpdate(UpdateMsg) when the processing of the
update is completed. |
void |
publish(UpdateMsg msg)
Publish an
UpdateMsg to the Replication Service. |
void |
publishReplicaOfflineMsg()
Publishes a replica offline message if all pending changes for current
replica have been sent out.
|
protected void |
readAssuredConfig(org.forgerock.opendj.server.config.server.ReplicationDomainCfg config,
boolean allowReconnection)
Gets and stores the assured replication configuration parameters.
|
protected byte[] |
receiveEntryBytes()
Receives bytes related to an entry in the context of an import to
initialize the domain (called by ReplLDIFInputStream).
|
void |
resetGenerationId(Long generationIdNewValue)
Reset the generationId of this domain in the whole topology.
|
void |
sessionInitiated(ServerStatus initStatus,
ServerState rsState)
Set the initial status of the domain and perform necessary initializations.
|
boolean |
setEclIncludes(int serverId,
Set<String> includeAttributes,
Set<String> includeAttributesForDeletes)
Set the attributes configured on a server to be included in the ECL.
|
void |
setGenerationID(long generationId)
Sets the generationId for this replication domain.
|
protected void |
signalNewStatus(StatusMachineEvent event)
Sets the status to a new value depending of the passed status machine
event.
|
void |
startListenService()
Starts the receiver side of the Replication Service.
|
void |
startPublishService()
Start the publish mechanism of the Replication Service.
|
String |
toString() |
protected void |
waitForAckIfAssuredEnabled(UpdateMsg msg)
Wait for the processing of an assured message after it has been sent, if
assured replication is configured, otherwise, do nothing.
|
protected volatile org.forgerock.opendj.server.config.server.ReplicationDomainCfg config
protected ReplicationBroker broker
protected volatile long generationId
public ReplicationDomain(org.forgerock.opendj.server.config.server.ReplicationDomainCfg config, long generationId)
config
- The configuration object for this ReplicationDomaingenerationId
- the generation of this ReplicationDomainpublic ReplicationDomain(org.forgerock.opendj.server.config.server.ReplicationDomainCfg config, long generationId, ServerState serverState)
config
- The configuration object for this ReplicationDomaingenerationId
- the generation of this ReplicationDomainserverState
- The serverState to usepublic CSNGenerator getGenerator()
CSNGenerator
that will be used to
generate CSN
for this domain.CSNGenerator
that will be used to
generate CSN
for this domain.public void sessionInitiated(ServerStatus initStatus, ServerState rsState)
initStatus
- The status to enter the state machine with.rsState
- The ServerState of the ReplicationServer
with which the session was established.public ServerStatus getStatus()
public org.forgerock.opendj.ldap.Dn getBaseDN()
public int getServerId()
public boolean isAssured()
public AssuredMode getAssuredMode()
public byte getAssuredSdLevel()
public long getAssuredTimeout()
public byte getGroupId()
public Set<String> getRefUrls()
TODO: fill that with all currently opened urls if no urls configured
public Map<Integer,DSInfo> getReplicaInfos()
public Map<Integer,ServerState> getReplicaStates()
public List<RSInfo> getRsInfos()
public int getRsServerId()
public int decodeTarget(String targetString) throws DirectoryException
targetString
- The string representing the sourceDirectoryException
- if the string is not validpublic void initializeRemote(int target, Task initTask) throws DirectoryException
The exportBackend(OutputStream)
will therefore be called
on this server, and the importBackend(InputStream)
will be called on the remote server.
The InputStream and OutputStream given as a parameter to those methods will be connected through the replication protocol.
target
- The server-id of the server that should be initialized.
The target can be discovered using the
getReplicaInfos()
method.initTask
- The task that triggers this initialization and that should
be updated with its progress.DirectoryException
- If it was not possible to publish the
Initialization message to the Topology.protected void initializeRemote(int serverToInitialize, int serverRunningTheTask, Task initTask, int initWindow) throws DirectoryException
serverToInitialize
- The target server that should be initialized.serverRunningTheTask
- The server that initiated the export. It can
be the serverID of this server, or the serverID of a remote server.initTask
- The task in this server that triggers this initialization
and that should be updated with its progress. Null when the export is done
following a request coming from a remote server (task is remote).initWindow
- The value of the initialization window for flow control
between the importer and the exporter.DirectoryException
- When an error occurs. No exception raised
means success.public ServerState getServerState()
protected byte[] receiveEntryBytes()
public void initializeFromRemote(int source, Task initTask) throws DirectoryException
When this method is called, a request for initialization is sent to the remote source server requesting initialization.
source
- The server-id of the source from which to initialize.
The source can be discovered using the
getReplicaInfos()
method.initTask
- The task that launched the initialization
and should be updated of its progress.DirectoryException
- If it was not possible to publish the
Initialization message to the Topology.
The task state is updated.protected void signalNewStatus(StatusMachineEvent event)
event
- The event that may make the status be changedpublic boolean ieRunning()
public void resetGenerationId(Long generationIdNewValue) throws DirectoryException
generationIdNewValue
- The new value of the generation Id.DirectoryException
- When an error occurspublic boolean isConnected()
public boolean hasConnectionError()
public HostPort getReplicationServer()
public int getAssuredSrSentUpdates()
public int getAssuredSrAcknowledgedUpdates()
public int getAssuredSrNotAcknowledgedUpdates()
public int getAssuredSrTimeoutUpdates()
public int getAssuredSrWrongStatusUpdates()
public int getAssuredSrReplayErrorUpdates()
public Map<Integer,Integer> getAssuredSrServerNotAcknowledgedUpdates()
public int getAssuredSrReceivedUpdates()
public int getAssuredSrReceivedUpdatesAcked()
public int getAssuredSrReceivedUpdatesNotAcked()
public int getAssuredSdSentUpdates()
public int getAssuredSdAcknowledgedUpdates()
public int getAssuredSdTimeoutUpdates()
public Map<Integer,Integer> getAssuredSdServerTimeoutUpdates()
public Date getLastStatusChangeDate()
public void startPublishService() throws org.forgerock.opendj.config.server.ConfigException
publish(UpdateMsg)
method.org.forgerock.opendj.config.server.ConfigException
- If the DirectoryServer configuration was incorrect.public void startListenService()
After this method has been called, the Replication Service will start
calling the processUpdate(UpdateMsg)
.
This method must be called once and must be called after the
startPublishService()
.
public void disableService()
enableService()
.
It can be useful to disable the Replication Service when the repository where the replicated information is stored becomes temporarily unavailable and replicated updates can therefore not be replayed during a while. This method is not MT safe.
protected final boolean isListenerShuttingDown()
true
if the listener thread is shutting down or has
shutdown.true
if the listener thread is shutting down or has
shutdown.public void enableService()
disableService()
.
The Replication Service will restart from the point indicated by the
ServerState
that was given as a parameter to the
startPublishService()
at startup time.
If some data have changed in the repository during the period of time when
the Replication Service was disabled, this ServerState
should
therefore be updated by the Replication Domain subclass before calling this
method. This method is not MT safe.
protected void changeConfig(org.forgerock.opendj.server.config.server.ReplicationDomainCfg config)
config
- The new configuration that this domain should now use.public void changeConfig(Set<String> includeAttributes, Set<String> includeAttributesForDeletes)
includeAttributes
- attributes to be included with all change records.includeAttributesForDeletes
- additional attributes to be included with delete change records.protected abstract void exportBackend(OutputStream output) throws DirectoryException
output
- The OutputStream where the export should
be produced.DirectoryException
- When needed.protected abstract void importBackend(InputStream input) throws DirectoryException
input
- The InputStream from which
the import should be reading entries.DirectoryException
- When needed.public abstract long countEntries() throws DirectoryException
DirectoryException
- when needed.public abstract boolean processUpdate(UpdateMsg updateMsg)
UpdateMsg
receive from
remote replication entities.
This method will be called by a single thread and should therefore should not be blocking.
updateMsg
- The UpdateMsg
that was received. true
is returned, no further processing is
necessary. If false
is returned, the subclass should
call the method processUpdateDone(UpdateMsg, String)
and
update the ServerState When this processing is complete.protected void processUpdateDone(UpdateMsg msg, String replayErrorMsg)
processUpdate(UpdateMsg)
when the processing of the
update is completed.
It is useful for implementation needing to process the update in an asynchronous way or using several threads, but must be called even by implementation doing it in a synchronous, single-threaded way.
msg
- The UpdateMsg whose processing was completed.replayErrorMsg
- if not null, this means an error occurred during the replay of
this update, and this is the matching human readable message
describing the problem.protected void prepareWaitForAckIfAssuredEnabled(UpdateMsg msg)
msg
- The update message to be sent soon.protected void waitForAckIfAssuredEnabled(UpdateMsg msg) throws TimeoutException
msg
- The UpdateMsg for which we are waiting for an ack.TimeoutException
- When the configured timeout occurs waiting for the
ack.public void publish(UpdateMsg msg)
UpdateMsg
to the Replication Service.
The Replication Service will handle the delivery of this UpdateMsg
to all the participants of this Replication Domain. These members will be
receive this UpdateMsg
through a call of the
processUpdate(UpdateMsg)
message.
msg
- The UpdateMsg that should be published.public void publishReplicaOfflineMsg()
public long getGenerationID()
public void setGenerationID(long generationId)
generationId
- the generationId to setpublic void addAdditionalMonitoring(MonitorData monitorData)
monitorData
- where to additional monitoring attributesprotected ReplicationDomain.ImportExportContext getImportExportContext()
public boolean setEclIncludes(int serverId, Set<String> includeAttributes, Set<String> includeAttributesForDeletes)
serverId
- Server where these attributes are configured.includeAttributes
- Attributes to be included with all change records, may include
wild-cards.includeAttributesForDeletes
- Additional attributes to be included with delete change records,
may include wild-cards.true
if the set of attributes was modified.public Set<String> getEclIncludes()
public Set<String> getEclIncludesForDeletes()
public CSN getLastLocalChange()
protected void readAssuredConfig(org.forgerock.opendj.server.config.server.ReplicationDomainCfg config, boolean allowReconnection)
config
- The configuration objectallowReconnection
- Tells if one must reconnect if significant changes occurredCopyright © 2010-2017 ForgeRock AS. All Rights Reserved.