Анализ исходного кода Zookeeper (4) ----- работа в кластерном режиме (реплицированная)

задняя часть сервер исходный код ZooKeeper
Анализ исходного кода Zookeeper (4) ----- работа в кластерном режиме (реплицированная)

zookeeperСерия статей по анализу исходного кода:

Оригинальный блог, чисто ручной стук, источник для перепечатки указывайте, спасибо!

Как вы знаете, zk работает в двух режимах: автономном режиме и режиме репликации. Очевидно, что режим репликации используется для построения zk-кластера, поэтому я называю режим репликации кластерным режимом. В предыдущих статьях мы разобрали исходный код запуска zk в автономном режиме, далее изучим исходный код в режиме кластера Zk.

Отладка в режиме кластера не так проста, как в автономном режиме.Может быть, вы спросите, нужно ли вам несколько физических машин для построения zk-кластера? На самом деле нет необходимости, одна физическая машина может также имитировать работу кластера. Поэтому ниже мы обсудим по следующим категориям:

1. Создание кластера ZK и соответствующая конфигурация

На самом деле настроить кластер с помощью zk очень просто.Как упоминалось в предыдущем блоге, zk определит, есть ли сходство в вашем конфигурационном файле при анализе конфигурационного файла.server.пункт конфига, если нет похожегоserver.элементы конфигурации, запускайте zk в автономном режиме по умолчанию. Вместо этого режим кластера требует, чтобы вы настроили его соответствующим образом. Далее будет объяснено шаг за шагом, как построить среду:

  • 1. Добавьте 3 конфигурационных файла в каталог conf zk с соответствующими именамиzoo1.cfg,zoo2.cfgа такжеzoo3.cfg

Его содержание следующее:

zoo1.cfg

tickTime=200000
initLimit=10
syncLimit=5
dataDir=E:\\resources\\Zookeeper\\zookeeper-3.4.11\\conf\\data\\1
dataLogDir=E:\\resources\\Zookeeper\\zookeeper-3.4.11\\conf\\log\\1
maxClientCnxns=2
# 服务器监听客户端连接的端口
clientPort=2181 

server.1=127.0.0.1:2887:3887
server.2=127.0.0.1:2888:3888
server.3=127.0.0.1:2889:3889

zoo2.cfg

tickTime=200000
initLimit=10
syncLimit=5
dataDir=E:\\resources\\Zookeeper\\zookeeper-3.4.11\\conf\\data\\2
dataLogDir=E:\\resources\\Zookeeper\\zookeeper-3.4.11\\conf\\log\\2
maxClientCnxns=2
# 服务器监听客户端连接的端口
clientPort=2182

server.1=127.0.0.1:2887:3887
server.2=127.0.0.1:2888:3888
server.3=127.0.0.1:2889:3889

zoo3.cfg


tickTime=200000
initLimit=10
syncLimit=5
dataDir=E:\\resources\\Zookeeper\\zookeeper-3.4.11\\conf\\data\\3
dataLogDir=E:\\resources\\Zookeeper\\zookeeper-3.4.11\\conf\\log\\3
maxClientCnxns=2
# 服务器监听客户端连接的端口
clientPort=2183

server.1=127.0.0.1:2887:3887
server.2=127.0.0.1:2888:3888
server.3=127.0.0.1:2889:3889

Я не буду объяснять вышеперечисленные элементы общей конфигурации один за другим, соответствующие значения упомянуты в предыдущей статье. Ниже мы сосредоточимся наclientPortсвойства иserver.xАтрибуты.

clientPortПредставляет порт, на котором сервер прослушивает клиентские подключения, другими словами порт, на котором клиент подключается к серверу. Конфигурация по умолчанию для этого свойства обычно2181, то почему мы пишем здесь как2181,2182,2183Шерстяная ткань? На самом деле причина очень проста, потому что наш кластер построен на одной физической машине, и чтобы предотвратить конфликты портов, мы настроили три zk-сервера для прослушивания разных портов соответственно.

Что касаетсяserver.xСвойства для настройки адреса и номера порта каждого сервера, участвующего в кластере. Его формат:

server.x addressIP:port1:port2

вxУказывает уникальный номер узла zk, который является значением sid, о котором мы часто говорим, что будет объяснено ниже, когда мы будем говорить о выборе zk. вам может быть любопытноport1а такжеport2В чем разница между, в zk,port1выражатьfllowersСоединен сleaderпорт,port2Указывает порт, на котором текущий узел участвует в выборах. Причина такой конструкции, собственно, я думаю вZABВ протоколе, когда операция записи, выданная клиентом, выполняется на стороне сервера,leaderУзлы должны синхронизировать состояние со всемиfllowers,leaderа такжеfllowersНужно общаться друг с другом! Другой заключается в том, что когда все узлы проводят быстрые выборы, каждый узел должен проголосовать, и после голосования выбирается один.leaderЗатем узлы должны уведомить другие узлы. Поэтому достаточно понять значение портов, они и есть разница.

  • 2. Создайте 3myidдокумент

При работе в кластерном режиме zk читает сdataDirв каталогеmyidЕсли файл не найден, будет сообщено об ошибке. Поэтому в дальнейшем будем соответственноdataDirпод новымmyidфайл, содержимое файла заполняет номер текущего сервера, о чем мы упоминали вышеserver.xсерединаxстоимость.

E:\\resources\\Zookeeper\\zookeeper-3.4.11\\conf\\data\\1Создайте файл ниже, содержимое файла - серийный номер1 E:\\resources\\Zookeeper\\zookeeper-3.4.11\\conf\\data\\2Создайте файл ниже, содержимое файла - серийный номер2 E:\\resources\\Zookeeper\\zookeeper-3.4.11\\conf\\data\\3Создайте файл ниже, содержимое файла - серийный номер3

  • 3. Запустите с разными файлами конфигурации соответственноQuorumPeerMainизmain()метод. Путь к файлу конфигурации можно передатьeclipse,Как показано ниже:

Упомянутый выше контент не соответствует исходному коду. Не волнуйтесь, давайте поговорим об исходном коде.

Во-первых, давайте посмотрим, как zk разбираетserver.xметка, введитеQuorumPeerConfigКатегорияparseProperties()метод, вы увидите следующий фрагмент кода:

// 判断属性是否以server.开始
if (key.startsWith("server.")) {
    int dot = key.indexOf('.');
    // 获取sid的值,也就是我们server.x中的x值
    long sid = Long.parseLong(key.substring(dot + 1));
    // 将配置值拆分为数组,格式为[addressIP,port1,port2]
    String parts[] = splitWithLeadingHostname(value);
    if ((parts.length != 2) && (parts.length != 3) && (parts.length != 4)) {
    	LOG.error(value + " does not have the form host:port or host:port:port "
    			+ " or host:port:port:type");
    }
    // 代表当前结点的类型,可以是观察者类型(不需要参与投票),也可以是PARTICIPANT(表示该节点后期可能成为follower和leader)
    LearnerType type = null;
    String hostname = parts[0];
    Integer port = Integer.parseInt(parts[1]);
    Integer electionPort = null;
    if (parts.length > 2) {
    	electionPort = Integer.parseInt(parts[2]);
    }
} 

Приведенный выше исходный код будет анализировать каждый в соответствии с вашей конфигурацией.serverКонфигурация, исходники не очень сложные, далее посмотрим как читается zkdataDirв каталогеmyidфайл, продолжить вQuorumPeerConfigизparseProperties()метод, найдите следующий фрагмент кода:

File myIdFile = new File(dataDir, "myid");
// 必须在快照目录下创建myid文件,否则报错
if (!myIdFile.exists()) {
    throw new IllegalArgumentException(myIdFile.toString() + " file is missing");
}
// 读取myid的值
BufferedReader br = new BufferedReader(new FileReader(myIdFile));
String myIdString;
try {
    myIdString = br.readLine();
} finally {
    br.close();// 注意,优秀的人都不会丢三落四,对于打开的各种io流,不用的时候记得关闭,不要浪费资源
}

2. Инициализация в режиме кластера zk

В zk, независимо от того, работает ли он в независимом режиме или в режиме репликации, шаги инициализации можно классифицировать следующим образом:

  • 1. Разобрать файл конфигурации
  • 2. Инициализировать работающий сервер

Для разбора конфигурационных файлов мы сделали соответствующий анализ в предыдущей статье и предыдущем разделе этой статьи. Давайте сосредоточимся на соответствующем исходном коде работы в режиме кластера zk, давайте введемQuormPeerMainКатегорияrunFromConfig()метод, исходный код выглядит следующим образом:

/**
* 加载配置运行服务器
* @param config
* @throws IOException
*/
public void runFromConfig(QuorumPeerConfig config) throws IOException {
    LOG.info("Starting quorum peer");
    try {
    	// 创建一个ServerCnxnFactory,默认为NIOServerCnxnFactory
    	ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
    
    	// 对ServerCnxnFactory进行相关配置
    	cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns());
    
    	// 初始化QuorumPeer,代表服务器节点server运行时的各种信息,如节点状态state,哪些服务器server参与竞选了,我们    可以将它理解为集群模式下运行的容器
    	quorumPeer = getQuorumPeer();

    	// 设置参与竞选的所有服务器
    	quorumPeer.setQuorumPeers(config.getServers());
    	// 设置事务日志和数据快照工厂
        quorumPeer.setTxnFactory(
    	    new FileTxnSnapLog(new File(config.getDataDir()), new File(config.getDataLogDir())));

    	// 设置选举的算法
    	quorumPeer.setElectionType(config.getElectionAlg());
    	// 设置当前服务器的id,也就是在data目录下的myid文件
    	quorumPeer.setMyid(config.getServerId());
    	// 设置心跳时间
    	quorumPeer.setTickTime(config.getTickTime());
    	// 设置允许follower同步和连接到leader的时间总量,以ticket为单位
    	quorumPeer.setInitLimit(config.getInitLimit());
    	// 设置follower与leader之间同步的时间量
    	quorumPeer.setSyncLimit(config.getSyncLimit());
    	// 当设置为true时,ZooKeeper服务器将侦听来自所有可用IP地址的对等端的连接,而不仅仅是在配置文件的服务器列表中配置的地址(即集群中配置的server.1,server.2。。。。)。 它会影响处理ZAB协议和Fast Leader Election协议的连接。 默认值为false
    	quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
    	// 设置工厂,默认是NIO工厂
    	quorumPeer.setCnxnFactory(cnxnFactory);
    	// 设置集群数量验证器,默认为半数原则
    	quorumPeer.setQuorumVerifier(config.getQuorumVerifier());
    	// 设置客户端连接的服务器ip地址
    	quorumPeer.setClientPortAddress(config.getClientPortAddress());
    	// 设置最小Session过期时间,默认是2*ticket
    	quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
    	// 设置最大Session过期时间,默认是20*ticket
    	quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
    	// 设置zkDataBase
    	quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
    	quorumPeer.setLearnerType(config.getPeerType());
    	quorumPeer.setSyncEnabled(config.getSyncEnabled());

    	// 设置NIO处理链接的线程数
    	quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
    
    	quorumPeer.start();
    	quorumPeer.join();
    } catch (InterruptedException e) {
    	// warn, but generally this is ok
    	LOG.warn("Quorum Peer interrupted", e);
    }
}

Что касается обработки клиентских запросов, то как кластерный, так и автономный режим используютServerCnxnFactoryСоответствующая реализация подкласса , по умолчанию основанная на NIO.NIOServerCnxnFactory,дляQuormPeerclass, вы можете думать о нем как о контейнере или контексте, который содержит всю информацию о конфигурации текущего узла в режиме кластера, например, какие серверы участвуют в выборах, статус каждого узла и так далее. Когда метод работает доquorumPeer.join();, текущий поток блокируется до тех пор, пока все остальные потоки не завершат работу.

давайте войдемquorumPeer.start()метод, чтобы увидеть, какое действие он делает:

public synchronized void start() {
    // 初始化是内存数据库
    loadDataBase();
    // 用于处理程序为捕获的异常和处理客户端请求
    cnxnFactory.start();
    // 选举前相关配置
    startLeaderElection();
    // 线程调用本类的run()方法,实施选举
    super.start();
}

Когда сам zk запущен, он будет поддерживать в памяти дерево каталогов, то есть базу данных в памяти.При инициализации сервера zk будет загружать данные из локального конфигурационного файла в базу данных в памяти.Если локальной базы данных нет запись, это создаст пустую базу данных в памяти. , сохранение данных снимка также выполняется на основе базы данных в памяти.

3. Как провести выборы в режиме зк кластера?

После визуального осмотра кода редактор обнаружил, что zk должен использовать JMX для управления функцией выборов, но поскольку редактор пока не знаком с JMX, эта часть не будет объяснять исходный код, а будет непосредственно объяснять процесс выборов. в зк.

Во-первых, после запуска каждого сервера он будет входитьLOOKINGсостояние, начать избирать нового лидера или искать существующего лидера.Если лидер существует, другие серверы уведомят только что запущенный сервер и сообщат, какой сервер является лидером.В то же время новый сервер будет связываться с группой. Лидер устанавливает связь, чтобы убедиться, что его статус соответствует статусу лидера группы.

Для сообщения, отправляемого во время выборов лидера, мы называем его сообщением уведомления. когда сервер входитLOOKINGКогда он находится в состоянии, он отправит уведомление всем другим узлам в кластере.Товарищ включает в себя свое собственное голосование с информацией о голосовании.Структура данных голосования очень проста, как правило, состоит из sid и zxid, sid представляет число текущего сервера, zxid представляет максимальное значение текущего сервера. Номер транзакции и правила обмена информацией о голосовании следующие:

  • 1. Если voiceZxid > myzxid или (voteZxid = myZxid и voiceId > mySid), сохраните текущую информацию о голосовании.
  • 2. В противном случае измените информацию о голосовании, назначьте voiceZxid для myZxid и назначьте voiceId для mySid.

Короче говоря, сначала сравните идентификатор транзакции, а если они равны, то сравните номер сервера Sid. Если все уведомления, полученные сервером, одинаковы, это означает, что выбор лидера прошел успешно (самый большой zxid или наибольший sid)

4. Почему говорят, что количество узлов, образующих zk-кластер, нечетное, а рекомендуется иметь 3 узла?

Кластер Zk рекомендует нечетное количество серверов, и принцип большинства принят внутри, потому что это может сделать весь кластер более доступным. Конечно, это также определяется алгоритмом выбора zk.Хотя узел может предоставлять услуги внешнему миру, может ли zk только с одним узлом считаться кластером? Явно нет, можно только сказать, что zk работает в автономном режиме.

Предположим, у нас настроено 5 машин, тогда мы считаем, что пока доступно больше половины (т.е. 3) серверов, доступен весь кластер.Что касается того, почему должна быть половина числа, это связано с правило большинства в зк., можно посмотретьQuorumMajclass, этот класс имеет метод проверки принципа большинства, код выглядит следующим образом:

/**
* 这个类实现了对法定服务器的校验
* This class implements a validator for majority quorums. The 
* implementation is straightforward.
*/
public class QuorumMaj implements QuorumVerifier {
    
    private static final Logger LOG = LoggerFactory.getLogger(QuorumMaj.class);
    
    // 一半的服务器数量,如果是5,则half=2,只要可用的服务器数量大于2,则整个zk就是可用的
    int half;
    /**
    * Defines a majority to avoid computing it every time.
    */
    public QuorumMaj(int n) {
    this.half = n / 2;
    }
    
    /**
    * Returns weight of 1 by default.权重
    */
    public long getWeight(long id) {
    return (long) 1;
    }

    /**
    * Verifies if a set is a majority.
    */
    public boolean containsQuorum(HashSet<Long> set) {
    return (set.size() > half);//传入一组服务器id,校验必须大于半数才能正常提供服务
    }
}

Посмотрим еще разQuormPeerConfigв классеparseProperties()Фрагмент кода в методе:

// 只有2台服务器server
if (servers.size() == 2) {
    // 打印日志,警告至少需要3台服务器,但不会报错
    LOG.warn("No server failure will be tolerated. " + "You need at least 3 servers.");
    } else if (servers.size() % 2 == 0) {
    LOG.warn("Non-optimial configuration, consider an odd number of servers.");
}

Этот фрагмент кода проверяет количество серверов, настроенных в вашем файле конфигурации. Если оно четно или равно 2, будет выдано предупреждение, например "Эта конфигурация не рекомендуется". Если количество серверов равно 2, будет не терпит даже 1 сервер.

Чтобы произвести впечатление, давайте посмотрим, почему zk рекомендует нечетное количество серверов.

  • Если настроить 3 сервера, то при зависании одного из серверов 2 из 3 серверов имеют больше половины голосов, и можно выбрать лидера;
  • Если настроить 4 сервера, то допустить отказ 1 сервера, что равносильно тому, что иметь всего 3 сервера.В целях экономии почему бы не выбрать 3 сервера, а когда 2 из 4 серверов зависнут количество голосов доступный для 2 из 4 серверов будет недоступен.Больше половины лидеров нельзя выбрать