анализ исходного кода синхронизации данных zookeeper

Java

в предыдущей статьеzookeeperПосле анализа реализации выборов мы знаем, чтоzookeeperПосле избрания кластераleaderузел войдетLEADINGусловие,followerузел войдетFOLLOWINGСтатус; в это время узлы в кластере будут выполнять операции синхронизации данных для обеспечения согласованности данных. Только после завершения синхронизации данныхzookeeperТолько кластеры могут предоставлять внешние сервисы.

LEADING

Когда роль узла подтверждена после избрания в качествеleaderвойдет позжеLEADINGСостояние, исходный код выглядит следующим образом:

public void run() {
    try {
        /*
         * Main loop
         */
        while (running) {
            switch (getPeerState()) {
            case LEADING:
                LOG.info("LEADING");
                try {
                    setLeader(makeLeader(logFactory));
                    leader.lead();
                    setLeader(null);
                } catch (Exception e) {
                    LOG.warn("Unexpected exception",e);
                } finally {
                    if (leader != null) {
                        leader.shutdown("Forcing shutdown");
                        setLeader(null);
                    }
                    setPeerState(ServerState.LOOKING);
                }
                break;
            }
        }
    } finally {
        
    }
}

QuorumPeerСостояние узла меняется наLEADINGбудет создан позжеleaderэкземпляр и триггерleadОбработать.

void lead() throws IOException, InterruptedException {
    try {
		// 省略

        /**
         * 开启线程用于接收 follower 的连接请求
         */
        cnxAcceptor = new LearnerCnxAcceptor();
        cnxAcceptor.start();
        
        readyToStart = true;

        /**
         * 阻塞等待计算新的 epoch 值,并设置 zxid
         */
        long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());          
        zk.setZxid(ZxidUtils.makeZxid(epoch, 0));
        
        
        /**
         * 阻塞等待接收过半的 follower 节点发送的 ACKEPOCH 信息; 此时说明已经确定了本轮选举后 epoch 值
         */
        waitForEpochAck(self.getId(), leaderStateSummary);
        self.setCurrentEpoch(epoch);

        try {
            /**
             * 阻塞等待 超过半数的节点 follower 发送了 NEWLEADER ACK 信息;此时说明过半的 follower 节点已经完成数据同步
             */
            waitForNewLeaderAck(self.getId(), zk.getZxid(), LearnerType.PARTICIPANT);
        } catch (InterruptedException e) {
            // 省略
        }

        /**
         * 启动 zk server,此时集群可以对外正式提供服务
         */
        startZkServer();

        // 省略
}

отleadРеализация метода может быть известна,leaderа такжеfollowerВ процессе синхронизации данных выполняются следующие процессы:

  • Получение подписчиков
  • Рассчитать новое значение эпохи
  • Сообщить единое значение эпохи
  • синхронизация данных
  • Запустите zk сервер для предоставления внешних услуг

FOLLOWING

Смотри нижеfollowerзапись узлаFOLLOWINGДействие после статуса:

public void run() {
    try {
        /*
         * Main loop
         */
        while (running) {
            switch (getPeerState()) {
            case LOOKING:
                // 省略
            case OBSERVING:
                // 省略
            case FOLLOWING:
                try {
                    LOG.info("FOLLOWING");
                    setFollower(makeFollower(logFactory));
                    follower.followLeader();
                } catch (Exception e) {
                    LOG.warn("Unexpected exception",e);
                } finally {
                    follower.shutdown();
                    setFollower(null);
                    setPeerState(ServerState.LOOKING);
                }
                break;
            }
        }
    } finally {
        
    }
}

QuorumPeerСостояние узла меняется наFOLLOWINGбудет создан позжеfollowerэкземпляр и триггерfollowLeaderОбработать.

void followLeader() throws InterruptedException {
    // 省略
    try {
        QuorumServer leaderServer = findLeader();            
        try {
            /**
             * follower 与 leader 建立连接
             */
            connectToLeader(leaderServer.addr, leaderServer.hostname);

            /**
             * follower 向 leader 提交节点信息用于计算新的 epoch 值
             */
            long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);

            
            /**
             * follower 与 leader 数据同步
             */
            syncWithLeader(newEpochZxid);                
            
             // 省略

        } catch (Exception e) {
             // 省略
        }
    } finally {
        // 省略
    }
}

отfollowLeaderРеализация метода может быть известна,followerа такжеleaderВ процессе синхронизации данных выполняются следующие процессы:

  • запрос на подключение к лидеру
  • Отправьте информацию об узле для расчета нового значения эпохи
  • синхронизация данных

Ниже мы рассмотрим детали реализации каждой ссылки;

Лидер-Последователь устанавливает связь

подписчик запрашивает подключение
protected QuorumServer findLeader() {
    QuorumServer leaderServer = null;
    // Find the leader by id
    Vote current = self.getCurrentVote();
    for (QuorumServer s : self.getView().values()) {
        if (s.id == current.getId()) {
            // Ensure we have the leader's correct IP address before
            // attempting to connect.
            s.recreateSocketAddresses();
            leaderServer = s;
            break;
        }
    }
    if (leaderServer == null) {
        LOG.warn("Couldn't find the leader with id = "
                + current.getId());
    }
    return leaderServer;
}           
protected void connectToLeader(InetSocketAddress addr, String hostname)
            throws IOException, ConnectException, InterruptedException {
    sock = new Socket();        
    sock.setSoTimeout(self.tickTime * self.initLimit);
    for (int tries = 0; tries < 5; tries++) {
        try {
            sock.connect(addr, self.tickTime * self.syncLimit);
            sock.setTcpNoDelay(nodelay);
            break;
        } catch (IOException e) {
            if (tries == 4) {
                LOG.error("Unexpected exception",e);
                throw e;
            } else {
                LOG.warn("Unexpected exception, tries="+tries+
                        ", connecting to " + addr,e);
                sock = new Socket();
                sock.setSoTimeout(self.tickTime * self.initLimit);
            }
        }
        Thread.sleep(1000);
    }

    self.authLearner.authenticate(sock, hostname);

    leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(
            sock.getInputStream()));
    bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
    leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);
}

followerБудет подтвержден информацией о голосовании после выбораleaderадрес узла, и инициировать соединение (всего 5 попыток соединения, если соединение не удастся, процесс выборов будет запущен заново)

лидер принимает подключения
class LearnerCnxAcceptor extends ZooKeeperThread{
    private volatile boolean stop = false;

    public LearnerCnxAcceptor() {
        super("LearnerCnxAcceptor-" + ss.getLocalSocketAddress());
    }

    @Override
    public void run() {
        try {
            while (!stop) {
                try{
                	/**
                	 * 接收 follower 的连接,并开启 LearnerHandler 线程用于处理二者之间的通信
                	 */
                    Socket s = ss.accept();
                    s.setSoTimeout(self.tickTime * self.initLimit);
                    s.setTcpNoDelay(nodelay);

                    BufferedInputStream is = new BufferedInputStream(
                            s.getInputStream());
                    LearnerHandler fh = new LearnerHandler(s, is, Leader.this);
                    fh.start();
                } catch (SocketException e) {
                    // 省略
                } catch (SaslException e){
                    LOG.error("Exception while connecting to quorum learner", e);
                }
            }
        } catch (Exception e) {
            LOG.warn("Exception while accepting follower", e);
        }
    }
}

отLearnerCnxAcceptorРеализация может быть виднаleaderузел для каждогоfollowerПосле того, как соединение с узлом будет установлено, ему будет присвоенLearnerHandlerПотоки для обработки связи между ними.

Рассчитать новое значение эпохи

followerв сleaderПосле установления соединения выдаетсяFOLLOWERINFOИнформация


long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);

protected long registerWithLeader(int pktType) throws IOException{
    /**
     * 发送 follower info 信息,包括 last zxid 和 sid
     */
	long lastLoggedZxid = self.getLastLoggedZxid();
    QuorumPacket qp = new QuorumPacket();                
    qp.setType(pktType);
    qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));
    
    /*
     * Add sid to payload
     */
    LearnerInfo li = new LearnerInfo(self.getId(), 0x10000);
    ByteArrayOutputStream bsid = new ByteArrayOutputStream();
    BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);
    boa.writeRecord(li, "LearnerInfo");
    qp.setData(bsid.toByteArray());
    
    /**
	 * follower 向 leader 发送 FOLLOWERINFO 信息,包括 zxid,sid,protocol version
	 */
    writePacket(qp, true);
    
    // 省略
} 

Далее мы смотрим наleaderпри полученииFOLLOWERINFOчто делать после сообщения (ссылкаLearnerHandler)

public void run() {
	try {
	    // 省略
	    /**
	     * leader 接收 follower 发送的 FOLLOWERINFO 信息,包括 follower 节点的 zxid,sid,protocol version
	     * @see Learner.registerWithleader()
	     */
	    QuorumPacket qp = new QuorumPacket();
	    ia.readRecord(qp, "packet");

	    byte learnerInfoData[] = qp.getData();
	    if (learnerInfoData != null) {
	    	if (learnerInfoData.length == 8) {
	    		// 省略
	    	} else {
	            /**
	             * 高版本的 learnerInfoData 包括 long 类型的 sid, int 类型的 protocol version 占用 12 字节
	             */
	    		LearnerInfo li = new LearnerInfo();
	    		ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(learnerInfoData), li);
	    		this.sid = li.getServerid();
	    		this.version = li.getProtocolVersion();
	    	}
	    }

	    /**
	     * 通过 follower 发送的 zxid,解析出 foloower 节点的 epoch 值
	     */
	    long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
	    
	    long peerLastZxid;
	    StateSummary ss = null;
	    long zxid = qp.getZxid();

	    /**
	     * 阻塞等待计算新的 epoch 值
	     */
	    long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
	  
	    // 省略
	}

Как видно из приведенного выше кода,leaderпри полученииfollowerпослалFOLLOWERINFOПосле информации он будет анализироватьfollowerузлаacceptedEpochценить и участвовать в новомepochзначение вычисляется. (См. специальную логику расчетаgetEpochToPropose)

public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException {
    synchronized(connectingFollowers) {
        if (!waitingForNewEpoch) {
            return epoch;
        }
        // epoch 用来记录计算后的选举周期值
        // follower 或 leader 的 acceptedEpoch 值与 epoch 比较;若前者大则将其加一
        if (lastAcceptedEpoch >= epoch) {
            epoch = lastAcceptedEpoch+1;
        }
        // connectingFollowers 用来记录与 leader 已连接的 follower
        connectingFollowers.add(sid);
        QuorumVerifier verifier = self.getQuorumVerifier();
        // 判断是否已计算出新的 epoch 值的条件是 leader 已经参与了 epoch 值计算,以及超过一半的节点参与了计算
        if (connectingFollowers.contains(self.getId()) && 
                                        verifier.containsQuorum(connectingFollowers)) {
            // 将 waitingForNewEpoch 设置为 false 说明不需要等待计算新的 epoch 值了
            waitingForNewEpoch = false;
            // 设置 leader 的 acceptedEpoch 值
            self.setAcceptedEpoch(epoch);
            // 唤醒 connectingFollowers wait 的线程
            connectingFollowers.notifyAll();
        } else {
            long start = Time.currentElapsedTime();
            long cur = start;
            long end = start + self.getInitLimit()*self.getTickTime();
            while(waitingForNewEpoch && cur < end) {
                // 若未完成新的 epoch 值计算则阻塞等待
                connectingFollowers.wait(end - cur);
                cur = Time.currentElapsedTime();
            }
            if (waitingForNewEpoch) {
                throw new InterruptedException("Timeout while waiting for epoch from quorum");        
            }
        }
        return epoch;
    }
}

из методаgetEpochToProposeзнатьleaderсоберет больше половиныfollower acceptedEpochПосле информации выберите максимальное значение и добавьте к нему 1newEpochстоимость; в процессеleaderперейдет в состояние блокировки до тех пор, пока более половиныfollowerУчаствуйте в расчете, чтобы войти в следующий этап.

Уведомлять о новом значении эпохи

leaderрасчет новогоnewEpochПосле значения он перейдет к следующему этапу отправкиLEADERINFOинформацию (также см.LearnerHandler)

public void run() {
	try {
	    // 省略

	    /**
	     * 阻塞等待计算新的 epoch 值
	     */
	    long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
            
        if (this.getVersion() < 0x10000) {
            // we are going to have to extrapolate the epoch information
            long epoch = ZxidUtils.getEpochFromZxid(zxid);
            ss = new StateSummary(epoch, zxid);
            // fake the message
            leader.waitForEpochAck(this.getSid(), ss);
        } else {
            byte ver[] = new byte[4];
            ByteBuffer.wrap(ver).putInt(0x10000);
            /**
             * 计算出新的 epoch 值后,leader 向 follower 发送 LEADERINFO 信息;包括新的 newEpoch
             */
            QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, ZxidUtils.makeZxid(newEpoch, 0), ver, null);
            oa.writeRecord(newEpochPacket, "packet");
            bufferedOutput.flush();

           	// 省略
        }
	}
	// 省略
}
protected long registerWithLeader(int pktType) throws IOException{
	// 省略

    /**
     * follower 向 leader 发送 FOLLOWERINFO 信息,包括 zxid,sid,protocol version
     */
    writePacket(qp, true);

    /**
     * follower 接收 leader 发送的 LEADERINFO 信息
     */
    readPacket(qp);

    /**
     * 解析 leader 发送的 new epoch 值
     */        
    final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
	if (qp.getType() == Leader.LEADERINFO) {
    	// we are connected to a 1.0 server so accept the new epoch and read the next packet
    	leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt();
    	byte epochBytes[] = new byte[4];
    	final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);

    	/**
    	 * new epoch > current accepted epoch 则更新 acceptedEpoch 值
    	 */
    	if (newEpoch > self.getAcceptedEpoch()) {
    		wrappedEpochBytes.putInt((int)self.getCurrentEpoch());
    		self.setAcceptedEpoch(newEpoch);
    	} else if (newEpoch == self.getAcceptedEpoch()) {   		
            wrappedEpochBytes.putInt(-1);
    	} else {
    		throw new IOException("Leaders epoch, " + newEpoch + " is less than accepted epoch, " + self.getAcceptedEpoch());
    	}

    	/**
    	 * follower 向 leader 发送 ACKEPOCH 信息,包括 last zxid
    	 */
    	QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);
    	writePacket(ackNewEpoch, true);
        return ZxidUtils.makeZxid(newEpoch, 0);
    } 
} 

Из кода выше видно, что завершениеnewEpochПосле расчета стоимостиleaderа такжеfollowerПроцесс взаимодействия:

  • leaderКfollowerОтправитьLEADERINFOинформация, информироватьfollowerновыйepochстоимость
  • followerАнализ приемаLEADERINFOинформация, еслиnew epochзначение больше, чемcurrent accepted epochзначение обновленоacceptedEpoch
  • followerКleaderОтправитьACKEPOCHинформационная обратная связьleaderМы получили новыеepochзначение, сfollowerузлаlast zxid

синхронизация данных

Лидер в LearnerHandler войдет в фазу синхронизации данных после получения более половины информации ACKEPOCH.

public void run() {
        try {
            // 省略
            // peerLastZxid 为 follower 的 last zxid
            peerLastZxid = ss.getLastZxid();
            
            /* the default to send to the follower */
            int packetToSend = Leader.SNAP;
            long zxidToSend = 0;
            long leaderLastZxid = 0;
            /** the packets that the follower needs to get updates from **/
            long updates = peerLastZxid;
           
            ReentrantReadWriteLock lock = leader.zk.getZKDatabase().getLogLock();
            ReadLock rl = lock.readLock();
            try {
                rl.lock();        
                final long maxCommittedLog = leader.zk.getZKDatabase().getmaxCommittedLog();
                final long minCommittedLog = leader.zk.getZKDatabase().getminCommittedLog();

                LinkedList<Proposal> proposals = leader.zk.getZKDatabase().getCommittedLog();

                if (peerLastZxid == leader.zk.getZKDatabase().getDataTreeLastProcessedZxid()) {
                    /**
                     * follower 与 leader 的 zxid 相同说明 二者数据一致;同步方式为差量同步 DIFF,同步的zxid 为 peerLastZxid, 也就是不需要同步
                     */
                    packetToSend = Leader.DIFF;
                    zxidToSend = peerLastZxid;
                } else if (proposals.size() != 0) {
                    // peerLastZxid 介于 minCommittedLog ,maxCommittedLog 中间
                    if ((maxCommittedLog >= peerLastZxid)
                            && (minCommittedLog <= peerLastZxid)) {
                        /**
                         * 在遍历 proposals 时,用来记录上一个 proposal 的 zxid
                         */
                        long prevProposalZxid = minCommittedLog;

                        boolean firstPacket=true;
                        packetToSend = Leader.DIFF;
                        zxidToSend = maxCommittedLog;

                        for (Proposal propose: proposals) {
                            // 跳过 follower 已经存在的提案
                            if (propose.packet.getZxid() <= peerLastZxid) {
                                prevProposalZxid = propose.packet.getZxid();
                                continue;
                            } else {
                                if (firstPacket) {
                                    firstPacket = false;
                                    if (prevProposalZxid < peerLastZxid) {
                                        /**
                                         * 此时说明有部分 proposals 提案在 leader 节点上不存在,则需告诉 follower 丢弃这部分 proposals
                                         * 也就是告诉 follower 先执行回滚 TRUNC ,需要回滚到 prevProposalZxid 处,也就是 follower 需要丢弃 prevProposalZxid ~ peerLastZxid 范围内的数据
                                         * 剩余的 proposals 则通过 DIFF 进行同步
                                         */
                                        packetToSend = Leader.TRUNC;                                        
                                        zxidToSend = prevProposalZxid;
                                        updates = zxidToSend;
                                    }
                                }

                                /**
                                 * 将剩余待 DIFF 同步的提案放入到队列中,等待发送
                                 */
                                queuePacket(propose.packet);
                                /**
                                 * 每个提案后对应一个 COMMIT 报文
                                 */
                                QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(),
                                        null, null);
                                queuePacket(qcommit);
                            }
                        }
                    } else if (peerLastZxid > maxCommittedLog) {                    
                        /**
                         * follower 的 zxid 比 leader 大 ,则告诉 follower 执行 TRUNC 回滚
                         */
                        packetToSend = Leader.TRUNC;
                        zxidToSend = maxCommittedLog;
                        updates = zxidToSend;
                    } else {
                    }
                } 

            } finally {
                rl.unlock();
            }

             QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
                    ZxidUtils.makeZxid(newEpoch, 0), null, null);
             if (getVersion() < 0x10000) {
                oa.writeRecord(newLeaderQP, "packet");
            } else {
                 // 数据同步完成之后会发送 NEWLEADER 信息
                queuedPackets.add(newLeaderQP);
            }
            bufferedOutput.flush();
            //Need to set the zxidToSend to the latest zxid
            if (packetToSend == Leader.SNAP) {
                zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
            }
            /**
             * 发送数据同步方式信息,告诉 follower 按什么方式进行数据同步
             */
            oa.writeRecord(new QuorumPacket(packetToSend, zxidToSend, null, null), "packet");
            bufferedOutput.flush();
            
            /* if we are not truncating or sending a diff just send a snapshot */
            if (packetToSend == Leader.SNAP) {
                /**
                 * 如果是全量同步的话,则将 leader 本地数据序列化写入 follower 的输出流
                 */
                leader.zk.getZKDatabase().serializeSnapshot(oa);
                oa.writeString("BenWasHere", "signature");
            }
            bufferedOutput.flush();
            
            /**
             * 开启个线程执行 packet 发送
             */
            sendPackets();
            
            /**
             * 接收 follower ack 响应
             */
            qp = new QuorumPacket();
            ia.readRecord(qp, "packet");

            /**
             * 阻塞等待过半的 follower ack
             */
            leader.waitForNewLeaderAck(getSid(), qp.getZxid(), getLearnerType());

            /**
             * leader 向 follower 发送 UPTODATE,告知其可对外提供服务
             */
            queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));

            // 省略
        } 
    }

Как видно из кода вышеleaderа такжеfollowerВо время синхронизации данных они будут проходить черезpeerLastZxidа такжеmaxCommittedLog,minCommittedLogСравнение двух значений окончательно определяет метод синхронизации данных.

DIFF (дифференциальная синхронизация)

  • followerизpeerLastZxidравныйleaderизpeerLastZxid

Описание на данный моментfollowerа такжеleaderданные согласуются сDIFFсинхронно, то есть без синхронизации

  • followerизpeerLastZxidмеждуmaxCommittedLog,minCommittedLogмежду

Описание на данный моментfollowerа такжеleaderСуществуют различия в данных, и эти различия необходимо синхронизировать.leaderбудуfollowerОтправитьDIFFСообщение информирует о методе синхронизации, а затем будет отправлено сообщение о разнице и сообщении о представлении предложения.

Процесс взаимодействия выглядит следующим образом:

    Leader                 Follower

      |          DIFF         |  
      | --------------------> |
      |        PROPOSAL       |  
      | --------------------> |  
      |         COMMIT        |  
      | --------------------> |
      |        PROPOSAL       |  
      | --------------------> |  
      |         COMMIT        |  
      | --------------------> |
         

Пример: ПредположимleaderУзел предложения соответствует буферной очередиzxidС последующим:

 0x500000001, 0x500000002, 0x500000003, 0x500000004, 0x500000005

а такжеfollowerузлаpeerLastZxidдля0x500000003, тебе следует0x500000004,0x500000005Два предложения синхронизируются, после чего процесс отправки пакета выглядит следующим образом:

тип сообщения ZXID
DIFF 0x500000005
PROPOSAL 0x500000004
COMMIT 0x500000004
PROPOSAL 0x500000005
COMMIT 0x500000005

TRUNC+DIFF (сначала откат, а затем дифференциальная синхронизация)

надDIFFСуществует особый сценарий, когда дифференциальная синхронизацияfollowerизpeerLastZxidмеждуmaxCommittedLog,minCommittedLogМежду двумя, ноfollowerизpeerLastZxidсуществуетleaderне существует в узле; в настоящее времяleaderнужно быть информированнымfollowerвернуться кpeerLastZxidпредыдущийzxidПосле прокатки выполняется синхронизация дифференциации.

Процесс взаимодействия выглядит следующим образом:

    Leader                 Follower

      |         TRUNC         |  
      | --------------------> |
      |        PROPOSAL       |  
      | --------------------> |  
      |         COMMIT        |  
      | --------------------> |
      |        PROPOSAL       |  
      | --------------------> |  
      |         COMMIT        |  
      | --------------------> |
         

Пример: Предположим, что в кластере три узла A, B и C. В определенный момент A является лидером, а цикл выборов равен 5. Zxid включает в себя: (0x500000004, 0x500000005, 0x500000006); В определенный момент узел-лидер А обработал запрос, транзакция которого равна 0x500000007 Во время трансляции сервер узла-лидера А был недоступен, в результате чего 0x500000007.Транзакция не была синхронизирована, после очередного раунда выборов в кластере узел B стал новым лидером, и цикл выборов был 6. Был предоставлен внешний сервис и обработаны новые запросы транзакций, включая 0x600000001, 0x600000002 ;

узел кластера список ZXID
A 0x500000004, 0x500000005, 0x500000006, 0x500000007
B 0x500000004, 0x500000005, 0x500000006, 0x600000001, 0x600000002
C 0x500000004, 0x500000005, 0x500000006, 0x600000001, 0x600000002

В этот момент, после того как узел A перезапустится и присоединится к кластеру, он обнаружит, что транзакция 0x500000007 не существует в ведущем узле при синхронизации данных с ведущим узлом B. В это время лидер говорит A откатить транзакцию до сначала 0x500000006, а в транзакции дифференциальной синхронизации 0x600000001, 0x600000002, затем процесс отправки пакета данных выглядит следующим образом:

тип сообщения ZXID
TRUNC 0x500000006
PROPOSAL 0x600000001
COMMIT 0x600000001
PROPOSAL 0x600000002
COMMIT 0x600000002

TRUNC (синхронизация отката)

подобноfollowerизpeerLastZxidбольше, чемleaderизmaxCommittedLogРассказыватьfollowerОткатитьсяmaxCommittedLog; сцену можно рассматривать какTRUNC+DIFFупрощенный режим

Процесс взаимодействия выглядит следующим образом:

    Leader                 Follower

      |         TRUNC         |  
      | --------------------> |
         

SNAP (полная синхронизация)

подобноfollowerизpeerLastZxidменьше, чемleaderизminCommittedLogилиleaderЕсли на узле нет очереди кеша предложений, она будет использоватьсяSNAPПолная синхронизация. в этом режимеleaderсначала будетfollowerОтправитьSNAPПакеты, затем приобретают общую сумму переданной последовательности данных из памяти в базу данныхfollower,followerПосле получения полного объема данных они будут десериализованы и загружены в in-memory базу данных.

Процесс взаимодействия выглядит следующим образом:

    Leader                 Follower

      |         SNAP          |  
      | --------------------> |
      |         DATA          |  
      | --------------------> |
         

leaderПосле завершения синхронизации данныхfollowerОтправитьNEWLEADERсообщения, после получения более половиныfollowerОтветACKПосле этого это означает, что более половины узлов завершили синхронизацию данных.leaderбудуfollowerОтправитьUPTODATEуведомление о сообщенииfollowerУзел может предоставлять внешние услуги, в это времяleaderСервер zk будет запущен для предоставления внешних услуг.

ПОСЛЕДОВАТЕЛЬ синхронизация данных

Рассмотрим этап синхронизации данныхFOLLOWERКак это делается, см.Learner.syncWithLeader

protected void syncWithLeader(long newLeaderZxid) throws IOException, InterruptedException{
        QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
        QuorumPacket qp = new QuorumPacket();
        long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid);

        /**
         * 接收 leader 发送的数据同步方式报文
         */
        readPacket(qp);
        
        synchronized (zk) {
            if (qp.getType() == Leader.DIFF) {
                
            }
            else if (qp.getType() == Leader.SNAP) {
                // 执行加载全量数据
            } else if (qp.getType() == Leader.TRUNC) {
                // 执行回滚
            }
            else {
            
            }
            
            outerLoop:
            while (self.isRunning()) {
                readPacket(qp);
                switch(qp.getType()) {
                case Leader.PROPOSAL:
                    // 处理提案
                    break;
                case Leader.COMMIT:
                    // commit proposal
                    break;
                case Leader.INFORM:
                    // 忽略
                    break;
                case Leader.UPTODATE:
                    // 设置 zk server
                    self.cnxnFactory.setZooKeeperServer(zk);
                    // 退出循环                
                    break outerLoop;
                case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery 
                    /**
                     * follower 响应 NEWLEADER ACK
                     */
                    writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
                    break;
                }
            }
        }
        ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
        writePacket(ack, true);
        // 启动 zk server
        zk.startup();
        
    }

Как видно из кода вышеfollowerПоток обработки на этапе синхронизации данных выглядит следующим образом:

  • followerперениматьleaderОтправлять сообщения режима синхронизации данных (DIFF/TRUNC/SANP) и обрабатывать их соответствующим образом.

  • когдаfollowerполучатьleaderпослалNEWLEADERПосле сообщения оно будет отправлено наleaderоткликACK (leaderполучил более половиныACKсообщение будет отправлено позжеUPTODATE)

  • когдаfollowerполучатьleaderпослалUPTODATEПосле сообщения это означает, что внешняя услуга может быть предоставлена ​​в это время, и сервер zk будет запущен в это время.

резюме

Наконец, используйте изображение, чтобы обобщить процесс синхронизации данных zk после завершения выборов, как показано на следующем рисунке: