в предыдущей статьеzookeeperПосле анализа реализации выборов мы знаем, чтоzookeeperПосле избрания кластераleaderузел войдетLEADINGусловие,followerузел войдетFOLLOWINGСтатус; в это время узлы в кластере будут выполнять операции синхронизации данных для обеспечения согласованности данных. Только после завершения синхронизации данныхzookeeperТолько кластеры могут предоставлять внешние сервисы.
LEADING
Когда роль узла подтверждена после избрания в качествеleaderвойдет позжеLEADINGСостояние, исходный код выглядит следующим образом:
public void run() {
try {
/*
* Main loop
*/
while (running) {
switch (getPeerState()) {
case LEADING:
LOG.info("LEADING");
try {
setLeader(makeLeader(logFactory));
leader.lead();
setLeader(null);
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
if (leader != null) {
leader.shutdown("Forcing shutdown");
setLeader(null);
}
setPeerState(ServerState.LOOKING);
}
break;
}
}
} finally {
}
}
QuorumPeerСостояние узла меняется наLEADINGбудет создан позжеleaderэкземпляр и триггерleadОбработать.
void lead() throws IOException, InterruptedException {
try {
// 省略
/**
* 开启线程用于接收 follower 的连接请求
*/
cnxAcceptor = new LearnerCnxAcceptor();
cnxAcceptor.start();
readyToStart = true;
/**
* 阻塞等待计算新的 epoch 值,并设置 zxid
*/
long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
zk.setZxid(ZxidUtils.makeZxid(epoch, 0));
/**
* 阻塞等待接收过半的 follower 节点发送的 ACKEPOCH 信息; 此时说明已经确定了本轮选举后 epoch 值
*/
waitForEpochAck(self.getId(), leaderStateSummary);
self.setCurrentEpoch(epoch);
try {
/**
* 阻塞等待 超过半数的节点 follower 发送了 NEWLEADER ACK 信息;此时说明过半的 follower 节点已经完成数据同步
*/
waitForNewLeaderAck(self.getId(), zk.getZxid(), LearnerType.PARTICIPANT);
} catch (InterruptedException e) {
// 省略
}
/**
* 启动 zk server,此时集群可以对外正式提供服务
*/
startZkServer();
// 省略
}
отleadРеализация метода может быть известна,leaderа такжеfollowerВ процессе синхронизации данных выполняются следующие процессы:
- Получение подписчиков
- Рассчитать новое значение эпохи
- Сообщить единое значение эпохи
- синхронизация данных
- Запустите zk сервер для предоставления внешних услуг
FOLLOWING
Смотри нижеfollowerзапись узлаFOLLOWINGДействие после статуса:
public void run() {
try {
/*
* Main loop
*/
while (running) {
switch (getPeerState()) {
case LOOKING:
// 省略
case OBSERVING:
// 省略
case FOLLOWING:
try {
LOG.info("FOLLOWING");
setFollower(makeFollower(logFactory));
follower.followLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
follower.shutdown();
setFollower(null);
setPeerState(ServerState.LOOKING);
}
break;
}
}
} finally {
}
}
QuorumPeerСостояние узла меняется наFOLLOWINGбудет создан позжеfollowerэкземпляр и триггерfollowLeaderОбработать.
void followLeader() throws InterruptedException {
// 省略
try {
QuorumServer leaderServer = findLeader();
try {
/**
* follower 与 leader 建立连接
*/
connectToLeader(leaderServer.addr, leaderServer.hostname);
/**
* follower 向 leader 提交节点信息用于计算新的 epoch 值
*/
long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
/**
* follower 与 leader 数据同步
*/
syncWithLeader(newEpochZxid);
// 省略
} catch (Exception e) {
// 省略
}
} finally {
// 省略
}
}
отfollowLeaderРеализация метода может быть известна,followerа такжеleaderВ процессе синхронизации данных выполняются следующие процессы:
- запрос на подключение к лидеру
- Отправьте информацию об узле для расчета нового значения эпохи
- синхронизация данных
Ниже мы рассмотрим детали реализации каждой ссылки;
Лидер-Последователь устанавливает связь
подписчик запрашивает подключение
protected QuorumServer findLeader() {
QuorumServer leaderServer = null;
// Find the leader by id
Vote current = self.getCurrentVote();
for (QuorumServer s : self.getView().values()) {
if (s.id == current.getId()) {
// Ensure we have the leader's correct IP address before
// attempting to connect.
s.recreateSocketAddresses();
leaderServer = s;
break;
}
}
if (leaderServer == null) {
LOG.warn("Couldn't find the leader with id = "
+ current.getId());
}
return leaderServer;
}
protected void connectToLeader(InetSocketAddress addr, String hostname)
throws IOException, ConnectException, InterruptedException {
sock = new Socket();
sock.setSoTimeout(self.tickTime * self.initLimit);
for (int tries = 0; tries < 5; tries++) {
try {
sock.connect(addr, self.tickTime * self.syncLimit);
sock.setTcpNoDelay(nodelay);
break;
} catch (IOException e) {
if (tries == 4) {
LOG.error("Unexpected exception",e);
throw e;
} else {
LOG.warn("Unexpected exception, tries="+tries+
", connecting to " + addr,e);
sock = new Socket();
sock.setSoTimeout(self.tickTime * self.initLimit);
}
}
Thread.sleep(1000);
}
self.authLearner.authenticate(sock, hostname);
leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(
sock.getInputStream()));
bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);
}
followerБудет подтвержден информацией о голосовании после выбораleaderадрес узла, и инициировать соединение (всего 5 попыток соединения, если соединение не удастся, процесс выборов будет запущен заново)
лидер принимает подключения
class LearnerCnxAcceptor extends ZooKeeperThread{
private volatile boolean stop = false;
public LearnerCnxAcceptor() {
super("LearnerCnxAcceptor-" + ss.getLocalSocketAddress());
}
@Override
public void run() {
try {
while (!stop) {
try{
/**
* 接收 follower 的连接,并开启 LearnerHandler 线程用于处理二者之间的通信
*/
Socket s = ss.accept();
s.setSoTimeout(self.tickTime * self.initLimit);
s.setTcpNoDelay(nodelay);
BufferedInputStream is = new BufferedInputStream(
s.getInputStream());
LearnerHandler fh = new LearnerHandler(s, is, Leader.this);
fh.start();
} catch (SocketException e) {
// 省略
} catch (SaslException e){
LOG.error("Exception while connecting to quorum learner", e);
}
}
} catch (Exception e) {
LOG.warn("Exception while accepting follower", e);
}
}
}
отLearnerCnxAcceptorРеализация может быть виднаleaderузел для каждогоfollowerПосле того, как соединение с узлом будет установлено, ему будет присвоенLearnerHandlerПотоки для обработки связи между ними.
Рассчитать новое значение эпохи
followerв сleaderПосле установления соединения выдаетсяFOLLOWERINFOИнформация
long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
protected long registerWithLeader(int pktType) throws IOException{
/**
* 发送 follower info 信息,包括 last zxid 和 sid
*/
long lastLoggedZxid = self.getLastLoggedZxid();
QuorumPacket qp = new QuorumPacket();
qp.setType(pktType);
qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));
/*
* Add sid to payload
*/
LearnerInfo li = new LearnerInfo(self.getId(), 0x10000);
ByteArrayOutputStream bsid = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);
boa.writeRecord(li, "LearnerInfo");
qp.setData(bsid.toByteArray());
/**
* follower 向 leader 发送 FOLLOWERINFO 信息,包括 zxid,sid,protocol version
*/
writePacket(qp, true);
// 省略
}
Далее мы смотрим наleaderпри полученииFOLLOWERINFOчто делать после сообщения (ссылкаLearnerHandler)
public void run() {
try {
// 省略
/**
* leader 接收 follower 发送的 FOLLOWERINFO 信息,包括 follower 节点的 zxid,sid,protocol version
* @see Learner.registerWithleader()
*/
QuorumPacket qp = new QuorumPacket();
ia.readRecord(qp, "packet");
byte learnerInfoData[] = qp.getData();
if (learnerInfoData != null) {
if (learnerInfoData.length == 8) {
// 省略
} else {
/**
* 高版本的 learnerInfoData 包括 long 类型的 sid, int 类型的 protocol version 占用 12 字节
*/
LearnerInfo li = new LearnerInfo();
ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(learnerInfoData), li);
this.sid = li.getServerid();
this.version = li.getProtocolVersion();
}
}
/**
* 通过 follower 发送的 zxid,解析出 foloower 节点的 epoch 值
*/
long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
long peerLastZxid;
StateSummary ss = null;
long zxid = qp.getZxid();
/**
* 阻塞等待计算新的 epoch 值
*/
long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
// 省略
}
Как видно из приведенного выше кода,leaderпри полученииfollowerпослалFOLLOWERINFOПосле информации он будет анализироватьfollowerузлаacceptedEpochценить и участвовать в новомepochзначение вычисляется. (См. специальную логику расчетаgetEpochToPropose)
public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException {
synchronized(connectingFollowers) {
if (!waitingForNewEpoch) {
return epoch;
}
// epoch 用来记录计算后的选举周期值
// follower 或 leader 的 acceptedEpoch 值与 epoch 比较;若前者大则将其加一
if (lastAcceptedEpoch >= epoch) {
epoch = lastAcceptedEpoch+1;
}
// connectingFollowers 用来记录与 leader 已连接的 follower
connectingFollowers.add(sid);
QuorumVerifier verifier = self.getQuorumVerifier();
// 判断是否已计算出新的 epoch 值的条件是 leader 已经参与了 epoch 值计算,以及超过一半的节点参与了计算
if (connectingFollowers.contains(self.getId()) &&
verifier.containsQuorum(connectingFollowers)) {
// 将 waitingForNewEpoch 设置为 false 说明不需要等待计算新的 epoch 值了
waitingForNewEpoch = false;
// 设置 leader 的 acceptedEpoch 值
self.setAcceptedEpoch(epoch);
// 唤醒 connectingFollowers wait 的线程
connectingFollowers.notifyAll();
} else {
long start = Time.currentElapsedTime();
long cur = start;
long end = start + self.getInitLimit()*self.getTickTime();
while(waitingForNewEpoch && cur < end) {
// 若未完成新的 epoch 值计算则阻塞等待
connectingFollowers.wait(end - cur);
cur = Time.currentElapsedTime();
}
if (waitingForNewEpoch) {
throw new InterruptedException("Timeout while waiting for epoch from quorum");
}
}
return epoch;
}
}
из методаgetEpochToProposeзнатьleaderсоберет больше половиныfollower acceptedEpochПосле информации выберите максимальное значение и добавьте к нему 1newEpochстоимость; в процессеleaderперейдет в состояние блокировки до тех пор, пока более половиныfollowerУчаствуйте в расчете, чтобы войти в следующий этап.
Уведомлять о новом значении эпохи
leaderрасчет новогоnewEpochПосле значения он перейдет к следующему этапу отправкиLEADERINFOинформацию (также см.LearnerHandler)
public void run() {
try {
// 省略
/**
* 阻塞等待计算新的 epoch 值
*/
long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
if (this.getVersion() < 0x10000) {
// we are going to have to extrapolate the epoch information
long epoch = ZxidUtils.getEpochFromZxid(zxid);
ss = new StateSummary(epoch, zxid);
// fake the message
leader.waitForEpochAck(this.getSid(), ss);
} else {
byte ver[] = new byte[4];
ByteBuffer.wrap(ver).putInt(0x10000);
/**
* 计算出新的 epoch 值后,leader 向 follower 发送 LEADERINFO 信息;包括新的 newEpoch
*/
QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, ZxidUtils.makeZxid(newEpoch, 0), ver, null);
oa.writeRecord(newEpochPacket, "packet");
bufferedOutput.flush();
// 省略
}
}
// 省略
}
protected long registerWithLeader(int pktType) throws IOException{
// 省略
/**
* follower 向 leader 发送 FOLLOWERINFO 信息,包括 zxid,sid,protocol version
*/
writePacket(qp, true);
/**
* follower 接收 leader 发送的 LEADERINFO 信息
*/
readPacket(qp);
/**
* 解析 leader 发送的 new epoch 值
*/
final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
if (qp.getType() == Leader.LEADERINFO) {
// we are connected to a 1.0 server so accept the new epoch and read the next packet
leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt();
byte epochBytes[] = new byte[4];
final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);
/**
* new epoch > current accepted epoch 则更新 acceptedEpoch 值
*/
if (newEpoch > self.getAcceptedEpoch()) {
wrappedEpochBytes.putInt((int)self.getCurrentEpoch());
self.setAcceptedEpoch(newEpoch);
} else if (newEpoch == self.getAcceptedEpoch()) {
wrappedEpochBytes.putInt(-1);
} else {
throw new IOException("Leaders epoch, " + newEpoch + " is less than accepted epoch, " + self.getAcceptedEpoch());
}
/**
* follower 向 leader 发送 ACKEPOCH 信息,包括 last zxid
*/
QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);
writePacket(ackNewEpoch, true);
return ZxidUtils.makeZxid(newEpoch, 0);
}
}
Из кода выше видно, что завершениеnewEpochПосле расчета стоимостиleaderа такжеfollowerПроцесс взаимодействия:
- leaderКfollowerОтправитьLEADERINFOинформация, информироватьfollowerновыйepochстоимость
- followerАнализ приемаLEADERINFOинформация, еслиnew epochзначение больше, чемcurrent accepted epochзначение обновленоacceptedEpoch
- followerКleaderОтправитьACKEPOCHинформационная обратная связьleaderМы получили новыеepochзначение, сfollowerузлаlast zxid
синхронизация данных
Лидер в LearnerHandler войдет в фазу синхронизации данных после получения более половины информации ACKEPOCH.
public void run() {
try {
// 省略
// peerLastZxid 为 follower 的 last zxid
peerLastZxid = ss.getLastZxid();
/* the default to send to the follower */
int packetToSend = Leader.SNAP;
long zxidToSend = 0;
long leaderLastZxid = 0;
/** the packets that the follower needs to get updates from **/
long updates = peerLastZxid;
ReentrantReadWriteLock lock = leader.zk.getZKDatabase().getLogLock();
ReadLock rl = lock.readLock();
try {
rl.lock();
final long maxCommittedLog = leader.zk.getZKDatabase().getmaxCommittedLog();
final long minCommittedLog = leader.zk.getZKDatabase().getminCommittedLog();
LinkedList<Proposal> proposals = leader.zk.getZKDatabase().getCommittedLog();
if (peerLastZxid == leader.zk.getZKDatabase().getDataTreeLastProcessedZxid()) {
/**
* follower 与 leader 的 zxid 相同说明 二者数据一致;同步方式为差量同步 DIFF,同步的zxid 为 peerLastZxid, 也就是不需要同步
*/
packetToSend = Leader.DIFF;
zxidToSend = peerLastZxid;
} else if (proposals.size() != 0) {
// peerLastZxid 介于 minCommittedLog ,maxCommittedLog 中间
if ((maxCommittedLog >= peerLastZxid)
&& (minCommittedLog <= peerLastZxid)) {
/**
* 在遍历 proposals 时,用来记录上一个 proposal 的 zxid
*/
long prevProposalZxid = minCommittedLog;
boolean firstPacket=true;
packetToSend = Leader.DIFF;
zxidToSend = maxCommittedLog;
for (Proposal propose: proposals) {
// 跳过 follower 已经存在的提案
if (propose.packet.getZxid() <= peerLastZxid) {
prevProposalZxid = propose.packet.getZxid();
continue;
} else {
if (firstPacket) {
firstPacket = false;
if (prevProposalZxid < peerLastZxid) {
/**
* 此时说明有部分 proposals 提案在 leader 节点上不存在,则需告诉 follower 丢弃这部分 proposals
* 也就是告诉 follower 先执行回滚 TRUNC ,需要回滚到 prevProposalZxid 处,也就是 follower 需要丢弃 prevProposalZxid ~ peerLastZxid 范围内的数据
* 剩余的 proposals 则通过 DIFF 进行同步
*/
packetToSend = Leader.TRUNC;
zxidToSend = prevProposalZxid;
updates = zxidToSend;
}
}
/**
* 将剩余待 DIFF 同步的提案放入到队列中,等待发送
*/
queuePacket(propose.packet);
/**
* 每个提案后对应一个 COMMIT 报文
*/
QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(),
null, null);
queuePacket(qcommit);
}
}
} else if (peerLastZxid > maxCommittedLog) {
/**
* follower 的 zxid 比 leader 大 ,则告诉 follower 执行 TRUNC 回滚
*/
packetToSend = Leader.TRUNC;
zxidToSend = maxCommittedLog;
updates = zxidToSend;
} else {
}
}
} finally {
rl.unlock();
}
QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
ZxidUtils.makeZxid(newEpoch, 0), null, null);
if (getVersion() < 0x10000) {
oa.writeRecord(newLeaderQP, "packet");
} else {
// 数据同步完成之后会发送 NEWLEADER 信息
queuedPackets.add(newLeaderQP);
}
bufferedOutput.flush();
//Need to set the zxidToSend to the latest zxid
if (packetToSend == Leader.SNAP) {
zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
}
/**
* 发送数据同步方式信息,告诉 follower 按什么方式进行数据同步
*/
oa.writeRecord(new QuorumPacket(packetToSend, zxidToSend, null, null), "packet");
bufferedOutput.flush();
/* if we are not truncating or sending a diff just send a snapshot */
if (packetToSend == Leader.SNAP) {
/**
* 如果是全量同步的话,则将 leader 本地数据序列化写入 follower 的输出流
*/
leader.zk.getZKDatabase().serializeSnapshot(oa);
oa.writeString("BenWasHere", "signature");
}
bufferedOutput.flush();
/**
* 开启个线程执行 packet 发送
*/
sendPackets();
/**
* 接收 follower ack 响应
*/
qp = new QuorumPacket();
ia.readRecord(qp, "packet");
/**
* 阻塞等待过半的 follower ack
*/
leader.waitForNewLeaderAck(getSid(), qp.getZxid(), getLearnerType());
/**
* leader 向 follower 发送 UPTODATE,告知其可对外提供服务
*/
queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));
// 省略
}
}
Как видно из кода вышеleaderа такжеfollowerВо время синхронизации данных они будут проходить черезpeerLastZxidа такжеmaxCommittedLog,minCommittedLogСравнение двух значений окончательно определяет метод синхронизации данных.
DIFF (дифференциальная синхронизация)
- followerизpeerLastZxidравныйleaderизpeerLastZxid
Описание на данный моментfollowerа такжеleaderданные согласуются сDIFFсинхронно, то есть без синхронизации
- followerизpeerLastZxidмеждуmaxCommittedLog,minCommittedLogмежду
Описание на данный моментfollowerа такжеleaderСуществуют различия в данных, и эти различия необходимо синхронизировать.leaderбудуfollowerОтправитьDIFFСообщение информирует о методе синхронизации, а затем будет отправлено сообщение о разнице и сообщении о представлении предложения.
Процесс взаимодействия выглядит следующим образом:
Leader Follower
| DIFF |
| --------------------> |
| PROPOSAL |
| --------------------> |
| COMMIT |
| --------------------> |
| PROPOSAL |
| --------------------> |
| COMMIT |
| --------------------> |
Пример: ПредположимleaderУзел предложения соответствует буферной очередиzxidС последующим:
0x500000001, 0x500000002, 0x500000003, 0x500000004, 0x500000005
а такжеfollowerузлаpeerLastZxidдля0x500000003, тебе следует0x500000004,0x500000005Два предложения синхронизируются, после чего процесс отправки пакета выглядит следующим образом:
тип сообщения | ZXID |
---|---|
DIFF | 0x500000005 |
PROPOSAL | 0x500000004 |
COMMIT | 0x500000004 |
PROPOSAL | 0x500000005 |
COMMIT | 0x500000005 |
TRUNC+DIFF (сначала откат, а затем дифференциальная синхронизация)
надDIFFСуществует особый сценарий, когда дифференциальная синхронизацияfollowerизpeerLastZxidмеждуmaxCommittedLog,minCommittedLogМежду двумя, ноfollowerизpeerLastZxidсуществуетleaderне существует в узле; в настоящее времяleaderнужно быть информированнымfollowerвернуться кpeerLastZxidпредыдущийzxidПосле прокатки выполняется синхронизация дифференциации.
Процесс взаимодействия выглядит следующим образом:
Leader Follower
| TRUNC |
| --------------------> |
| PROPOSAL |
| --------------------> |
| COMMIT |
| --------------------> |
| PROPOSAL |
| --------------------> |
| COMMIT |
| --------------------> |
Пример: Предположим, что в кластере три узла A, B и C. В определенный момент A является лидером, а цикл выборов равен 5. Zxid включает в себя: (0x500000004, 0x500000005, 0x500000006); В определенный момент узел-лидер А обработал запрос, транзакция которого равна 0x500000007 Во время трансляции сервер узла-лидера А был недоступен, в результате чего 0x500000007.Транзакция не была синхронизирована, после очередного раунда выборов в кластере узел B стал новым лидером, и цикл выборов был 6. Был предоставлен внешний сервис и обработаны новые запросы транзакций, включая 0x600000001, 0x600000002 ;
узел кластера | список ZXID |
---|---|
A | 0x500000004, 0x500000005, 0x500000006, 0x500000007 |
B | 0x500000004, 0x500000005, 0x500000006, 0x600000001, 0x600000002 |
C | 0x500000004, 0x500000005, 0x500000006, 0x600000001, 0x600000002 |
В этот момент, после того как узел A перезапустится и присоединится к кластеру, он обнаружит, что транзакция 0x500000007 не существует в ведущем узле при синхронизации данных с ведущим узлом B. В это время лидер говорит A откатить транзакцию до сначала 0x500000006, а в транзакции дифференциальной синхронизации 0x600000001, 0x600000002, затем процесс отправки пакета данных выглядит следующим образом:
тип сообщения | ZXID |
---|---|
TRUNC | 0x500000006 |
PROPOSAL | 0x600000001 |
COMMIT | 0x600000001 |
PROPOSAL | 0x600000002 |
COMMIT | 0x600000002 |
TRUNC (синхронизация отката)
подобноfollowerизpeerLastZxidбольше, чемleaderизmaxCommittedLogРассказыватьfollowerОткатитьсяmaxCommittedLog; сцену можно рассматривать какTRUNC+DIFFупрощенный режим
Процесс взаимодействия выглядит следующим образом:
Leader Follower
| TRUNC |
| --------------------> |
SNAP (полная синхронизация)
подобноfollowerизpeerLastZxidменьше, чемleaderизminCommittedLogилиleaderЕсли на узле нет очереди кеша предложений, она будет использоватьсяSNAPПолная синхронизация. в этом режимеleaderсначала будетfollowerОтправитьSNAPПакеты, затем приобретают общую сумму переданной последовательности данных из памяти в базу данныхfollower,followerПосле получения полного объема данных они будут десериализованы и загружены в in-memory базу данных.
Процесс взаимодействия выглядит следующим образом:
Leader Follower
| SNAP |
| --------------------> |
| DATA |
| --------------------> |
leaderПосле завершения синхронизации данныхfollowerОтправитьNEWLEADERсообщения, после получения более половиныfollowerОтветACKПосле этого это означает, что более половины узлов завершили синхронизацию данных.leaderбудуfollowerОтправитьUPTODATEуведомление о сообщенииfollowerУзел может предоставлять внешние услуги, в это времяleaderСервер zk будет запущен для предоставления внешних услуг.
ПОСЛЕДОВАТЕЛЬ синхронизация данных
Рассмотрим этап синхронизации данныхFOLLOWERКак это делается, см.Learner.syncWithLeader
protected void syncWithLeader(long newLeaderZxid) throws IOException, InterruptedException{
QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
QuorumPacket qp = new QuorumPacket();
long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid);
/**
* 接收 leader 发送的数据同步方式报文
*/
readPacket(qp);
synchronized (zk) {
if (qp.getType() == Leader.DIFF) {
}
else if (qp.getType() == Leader.SNAP) {
// 执行加载全量数据
} else if (qp.getType() == Leader.TRUNC) {
// 执行回滚
}
else {
}
outerLoop:
while (self.isRunning()) {
readPacket(qp);
switch(qp.getType()) {
case Leader.PROPOSAL:
// 处理提案
break;
case Leader.COMMIT:
// commit proposal
break;
case Leader.INFORM:
// 忽略
break;
case Leader.UPTODATE:
// 设置 zk server
self.cnxnFactory.setZooKeeperServer(zk);
// 退出循环
break outerLoop;
case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery
/**
* follower 响应 NEWLEADER ACK
*/
writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
break;
}
}
}
ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
writePacket(ack, true);
// 启动 zk server
zk.startup();
}
Как видно из кода вышеfollowerПоток обработки на этапе синхронизации данных выглядит следующим образом:
-
followerперениматьleaderОтправлять сообщения режима синхронизации данных (DIFF/TRUNC/SANP) и обрабатывать их соответствующим образом.
-
когдаfollowerполучатьleaderпослалNEWLEADERПосле сообщения оно будет отправлено наleaderоткликACK (leaderполучил более половиныACKсообщение будет отправлено позжеUPTODATE)
-
когдаfollowerполучатьleaderпослалUPTODATEПосле сообщения это означает, что внешняя услуга может быть предоставлена в это время, и сервер zk будет запущен в это время.
резюме
Наконец, используйте изображение, чтобы обобщить процесс синхронизации данных zk после завершения выборов, как показано на следующем рисунке: