Sunday, September 25, 2022

Replicated Log


Cluster nodes keep a Write-Forward Log. Every log entry shops
the state required for consensus together with the consumer request.
They coordinate to construct consensus over log entries,
so that each one cluster nodes have precisely the identical Write-Forward log.
The requests are then executed sequentially following the log.
As a result of all cluster nodes agree on every log entry, they execute the identical
requests in the identical order. This ensures that each one the cluster nodes
share the identical state.

Executing two phases for every state change request just isn’t environment friendly.
So cluster nodes choose a pacesetter at startup.
The chief election part establishes the Technology Clock
and detects all log entries within the earlier Quorum.
(The entries the earlier chief might need copied to the vast majority of
the cluster nodes.)
As soon as there’s a steady chief, solely the chief co-ordinates the replication.
Shoppers talk with the chief.
The chief provides every request to the log and makes certain that it is replicated
on all of the followers. Consensus is reached as soon as a log entry is efficiently
replicated to the vast majority of the followers.
This fashion, just one part execution to
attain consensus is required for every state change operation when there’s a
steady chief.

Following sections describe how Raft implements a replicated log.

Replicating consumer requests

Determine 1: Replication

For every log entry, the chief appends it to its native Write-Forward log
after which sends it to all of the followers.

chief (class ReplicatedLog…)

  non-public Lengthy appendAndReplicate(byte[] knowledge) {
      Lengthy lastLogEntryIndex = appendToLocalLog(knowledge);
      replicateOnFollowers(lastLogEntryIndex);
      return lastLogEntryIndex;
  }


  non-public void replicateOnFollowers(Lengthy entryAtIndex) {
      for (ultimate FollowerHandler follower : followers) {
          replicateOn(follower, entryAtIndex); //ship replication requests to followers
      }
  }

The followers deal with the replication request and append the log entries to their native logs.
After efficiently appending the log entries, they reply to the chief with the index of the
newest log entry they’ve.
The response additionally contains the present Technology Clock of the server.

The followers additionally verify if the entries exist already or there are entries past
those that are being replicated.
It ignores entries that are already current. But when there are entries that are from totally different generations,
they take away the conflicting entries.

follower (class ReplicatedLog…)

  void maybeTruncate(ReplicationRequest replicationRequest) {
      replicationRequest.getEntries().stream()
              .filter(entry -> wal.getLastLogIndex() >= entry.getEntryIndex() &&
                      entry.getGeneration() != wal.readAt(entry.getEntryIndex()).getGeneration())
              .forEach(entry -> wal.truncate(entry.getEntryIndex()));
  }

follower (class ReplicatedLog…)

  non-public ReplicationResponse appendEntries(ReplicationRequest replicationRequest) {
      Listing<WALEntry> entries = replicationRequest.getEntries();
      entries.stream()
              .filter(e -> !wal.exists(e))
              .forEach(e -> wal.writeEntry(e));
      return new ReplicationResponse(SUCCEEDED, serverId(), replicationState.getGeneration(), wal.getLastLogIndex());
  }

The follower rejects the replication request when the era quantity within the request
is decrease than the most recent era the server is aware of about.
This notifies the chief to step down and change into a follower.

follower (class ReplicatedLog…)

  Lengthy currentGeneration = replicationState.getGeneration();
  if (currentGeneration > request.getGeneration()) {
      return new ReplicationResponse(FAILED, serverId(), currentGeneration, wal.getLastLogIndex());
  }

The Chief retains observe of log indexes replicated at every server, when responses are obtained.
It makes use of it to trace the log entries that are efficiently copied to the Quorum
and tracks the index as a commitIndex. commitIndex is the Excessive-Water Mark within the log.

chief (class ReplicatedLog…)

  logger.data("Updating matchIndex for " + response.getServerId() + " to " + response.getReplicatedLogIndex());
  updateMatchingLogIndex(response.getServerId(), response.getReplicatedLogIndex());
  var logIndexAtQuorum = computeHighwaterMark(logIndexesAtAllServers(), config.numberOfServers());
  var currentHighWaterMark = replicationState.getHighWaterMark();
  if (logIndexAtQuorum > currentHighWaterMark && logIndexAtQuorum != 0) {
      applyLogEntries(currentHighWaterMark, logIndexAtQuorum);
      replicationState.setHighWaterMark(logIndexAtQuorum);
  }

chief (class ReplicatedLog…)

  Lengthy computeHighwaterMark(Listing<Lengthy> serverLogIndexes, int noOfServers) {
      serverLogIndexes.kind(Lengthy::compareTo);
      return serverLogIndexes.get(noOfServers / 2);
  }

chief (class ReplicatedLog…)

  non-public void updateMatchingLogIndex(int serverId, lengthy replicatedLogIndex) {
      FollowerHandler follower = getFollowerHandler(serverId);
      follower.updateLastReplicationIndex(replicatedLogIndex);
  }

chief (class ReplicatedLog…)

  public void updateLastReplicationIndex(lengthy lastReplicatedLogIndex) {
      this.matchIndex = lastReplicatedLogIndex;
  }

Full replication

You will need to make sure that all of the cluster nodes
obtain all of the log entries from the chief, even when
they’re disconnected or they crash and are available again up.
Raft has a mechanism to ensure all of the cluster nodes obtain
all of the log entries from the chief.

With each replication request in Raft, the chief additionally sends the log
index and era of the log entries which instantly precede
the brand new entries getting replicated. If the earlier log index and
time period don’t match with its native log, the followers reject the request.
This means to the chief that the follower log must be synced
for a few of the older entries.

follower (class ReplicatedLog…)

  if (!wal.isEmpty() && request.getPrevLogIndex() >= wal.getLogStartIndex() &&
           generationAt(request.getPrevLogIndex()) != request.getPrevLogGeneration()) {
      return new ReplicationResponse(FAILED, serverId(), replicationState.getGeneration(), wal.getLastLogIndex());
  }

follower (class ReplicatedLog…)

  non-public Lengthy generationAt(lengthy prevLogIndex) {
      WALEntry walEntry = wal.readAt(prevLogIndex);

      return walEntry.getGeneration();
  }

So the chief decrements the matchIndex and tries sending
log entries on the decrease index. This continues till the followers
settle for the replication request.

chief (class ReplicatedLog…)

  //rejected due to conflicting entries, decrement matchIndex
  FollowerHandler peer = getFollowerHandler(response.getServerId());
  logger.data("decrementing nextIndex for peer " + peer.getId() + " from " + peer.getNextIndex());
  peer.decrementNextIndex();
  replicateOn(peer, peer.getNextIndex());

This verify on the earlier log index and era
permits the chief to detect two issues.

  • If the follower log has lacking entries.
    For instance, if the follower log has just one entry
    and the chief begins replicating the third entry,
    the requests might be rejected till the chief replicates
    the second entry.
  • If the earlier entries within the log are from a special
    era, larger or decrease than the corresponding entries
    within the chief log. The chief will attempt replicating entries
    from decrease indexes till the requests get accepted.
    The followers truncate the entries for which the era
    doesn’t match.

This fashion, the chief tries to push its personal log to all of the followers
constantly by utilizing the earlier index to detect lacking entries
or conflicting entries.
This makes certain that each one the cluster nodes ultimately
obtain all of the log entries from the chief even after they
are disconnected for a while.

Raft doesn’t have a separate commit message, however sends the commitIndex as half
of the conventional replication requests.
The empty replication requests are additionally despatched as heartbeats.
So commitIndex is shipped to followers as a part of the heartbeat requests.

Log entries are executed within the log order

As soon as the chief updates its commitIndex, it executes the log entries so as,
from the final worth of the commitIndex to the most recent worth of the commitIndex.
The consumer requests are accomplished and the response is returned to the consumer
as soon as the log entries are executed.

class ReplicatedLog…

  non-public void applyLogEntries(Lengthy previousCommitIndex, Lengthy commitIndex) {
      for (lengthy index = previousCommitIndex + 1; index <= commitIndex; index++) {
          WALEntry walEntry = wal.readAt(index);
          var responses = stateMachine.applyEntries(Arrays.asList(walEntry));
          completeActiveProposals(index, responses);
      }
  }

The chief additionally sends the commitIndex with the heartbeat requests it sends to the followers.
The followers replace the commitIndex and apply the entries the identical method.

class ReplicatedLog…

  non-public void updateHighWaterMark(ReplicationRequest request) {
      if (request.getHighWaterMark() > replicationState.getHighWaterMark()) {
          var previousHighWaterMark = replicationState.getHighWaterMark();
          replicationState.setHighWaterMark(request.getHighWaterMark());
          applyLogEntries(previousHighWaterMark, request.getHighWaterMark());
      }
  }

Chief Election

Chief election is the part the place log entries dedicated within the earlier quorum
are detected.
Each cluster node operates in three states: candidate, chief or follower.
The cluster nodes begin in a follower state anticipating
a HeartBeat from an present chief.
If a follower does not hear from any chief in a predetermined time interval
,it strikes to the candidate state and begins leader-election.
The chief election algorithm establishes a brand new Technology Clock
worth. Raft refers back to the Technology Clock as time period.

The chief election mechanism additionally makes certain the elected chief has as many
up-to-date log entries stipulated by the quorum.
That is an optimization carried out by Raft
which avoids log entries from earlier Quorum
being transferred to the brand new chief.

New chief election is began by sending every of the peer servers
a message requesting a vote.

class ReplicatedLog…

  non-public void startLeaderElection() {
      replicationState.setGeneration(replicationState.getGeneration() + 1);
      registerSelfVote();
      requestVoteFrom(followers);
  }

As soon as a server is voted for in a given Technology Clock,
the identical vote is returned for that era all the time.
This ensures that another server requesting a vote for the
similar era just isn’t elected, when a profitable election has already
occurred.
The dealing with of the vote request occurs as follows:

class ReplicatedLog…

  VoteResponse handleVoteRequest(VoteRequest voteRequest) {
      //for larger era request change into follower.
      // However we have no idea who the chief is but.
      if (voteRequest.getGeneration() > replicationState.getGeneration()) {
          becomeFollower(LEADER_NOT_KNOWN, voteRequest.getGeneration());
      }

      VoteTracker voteTracker = replicationState.getVoteTracker();
      if (voteRequest.getGeneration() == replicationState.getGeneration() && !replicationState.hasLeader()) {
              if(isUptoDate(voteRequest) && !voteTracker.alreadyVoted()) {
                  voteTracker.registerVote(voteRequest.getServerId());
                  return grantVote();
              }
              if (voteTracker.alreadyVoted()) {
                  return voteTracker.votedFor == voteRequest.getServerId() ?
                          grantVote():rejectVote();

              }
      }
      return rejectVote();
  }

  non-public boolean isUptoDate(VoteRequest voteRequest)  (voteRequest.getLastLogEntryGeneration() == wal.getLastLogEntryGeneration() &&
              voteRequest.getLastLogEntryIndex() >= wal.getLastLogIndex());
      return outcome;
  

The server which receives votes from the vast majority of the servers
transitions to the chief state. The bulk is decided as mentioned
in Quorum. As soon as elected, the chief constantly
sends a HeartBeat to the entire followers.
If the followers do not obtain a HeartBeat
in a specified time interval,
a brand new chief election is triggered.

Log entries from earlier era

As mentioned within the above part, the primary part of the consensus
algorithms detects the present values, which had been copied
on the earlier runs of the algorithm. The opposite key side is that
these values are proposed because the values with the most recent era
of the chief. The second part decides that the worth is dedicated
provided that the values are proposed for the present era.
Raft by no means updates era numbers for the present entries
within the log. So if the chief has log entries from the older era
that are lacking from a few of the followers,
it can’t mark these entries as dedicated simply based mostly on
the bulk quorum.
That’s as a result of another server which is probably not out there now,
can have an entry on the similar index with larger era.
If the chief goes down with out replicating an entry from
its present era, these entries can get overwritten by the brand new chief.
So in Raft, the brand new chief should commit a minimum of one entry in its time period.
It might probably then safely commit all of the earlier entries.
Most sensible implementations of Raft attempt to commit a no-op entry
instantly after a pacesetter election, earlier than the chief is taken into account
able to serve consumer requests.
Confer with [raft-phd] part 3.6.1 for particulars.

An instance leader-election

Contemplate 5 servers, athens, byzantium, cyrene, delphi and ephesus.
ephesus is the chief for era 1. It has replicated entries to
itself, delphi and athens.

Determine 2: Misplaced heartbeat triggers an election

At this level, ephesus and delphi get disconnected from the remainder of the cluster.

byzantium has the least election timeout, so it
triggers the election by incrementing its Technology Clock to 2.
cyrene has its era lower than 2 and it additionally has similar log entry as byzantium.
So it grants the vote. However athens has an additional entry in its log. So it rejects the vote.

As a result of byzantium can’t get a majority 3 votes, it loses the election
and strikes again to follower state.

Determine 3: Misplaced election as a result of log just isn’t up-to-date

athens occasions out and triggers the election subsequent. It increments the Technology Clock
to three and sends vote request to byzantium and cyrene.
As a result of each byzantium and cyrene have decrease era quantity and fewer log entries than
athens, they each grant the vote to athens.
As soon as athens will get majority of the votes, it turns into the chief and begins
sending HeartBeats to byzantium and cyrene.
As soon as byzantium and cyrene obtain a heartbeat from the chief at larger era,
they increment their era. This confirms the management of athens.
athens then replicates its personal log to byzantium and cyrene.

Determine 4: Node with up-to-date log wins election

athens now replicates Entry2 from era 1 to byzantium and cyrene.
However as a result of it is an entry from the earlier era,
it doesn’t replace the commitIndex even when Entry2 is efficiently replicated
on the bulk quorum.

athens appends a no-op entry to its native log.
After this new entry in era 3 is efficiently replicated,
it updates the commitIndex

If ephesus comes again up or restores community connectivity and sends
request to cyrene. As a result of cyrene is now at era 3, it rejects the requests.
ephesus will get the brand new time period within the rejection response, and steps all the way down to be a follower.

Determine 7: Chief step-down

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Most Popular