Хранилище состояния реализовано алгоритмом Raft - на основе etcd

Raft etcd
Хранилище состояния реализовано алгоритмом Raft - на основе etcd
Post Views = 5

Эта статья взята из: [блог InTheWorld] (Добро пожаловать, чтобы оставить сообщение и обменяться)

Алгоритм Paxos, пожалуй, самый известный алгоритм распределенного консенсуса, а Raft, вероятно, самый популярный алгоритм распределенного консенсуса. Из-за ограниченности опыта и уровня невозможно добиться дальнейшего понимания, просто прочитав статью. Я слышал, что Kubernetes, Docker Swarm, CockroachDB и другие замечательные проекты используют Raft. Ведь это технология, опробованная в крупномасштабной производственной среде, и я считаю, что ее необходимо изучить. А реализация etcd на Raft — с открытым исходным кодом, в конце концов, «нет секрета перед исходным кодом».

image

Будь то Paxos или Raft, они обязуются поддерживать RSM (Replicated State Machine), как показано на рисунке выше. Для RSM хранение состояния очень важно. В этом блоге я собираюсь проанализировать хранилище состояний Raft на основе реализации etcd. Хранение состояния Raft в основном реализуется Snapshot и WAL (журнал с упреждающей записью).

  • Как и многие базы данных, для обеспечения безопасности данных (сбой или восстановление при простое) используется WAL, и etcd не исключение. Каждая транзакционная операция (т. е. операция записи) в etcd будет предварительно записана в файл транзакций, которым является WAL.
  • Кроме того, как высокодоступная система хранения KV, etcd не может полагаться исключительно на воспроизведение журнала для восстановления данных. Поэтому etcd также предоставляет функцию моментального снимка (snapshot). Снапшот предназначен для периодического сохранения всей базы данных в виде одного файла моментального снимка, что не только сокращает время воспроизведения журнала, но и уменьшает емкость хранилища WAL, а преждевременный WAL можно удалить.

etcd использует protobuf для определения формата протокола, включая снимок и журнал. Часть файла raft/raft.proto выглядит следующим образом:

enum EntryType {
    EntryNormal     = 0;
    EntryConfChange = 1;
}

message Entry {
    optional uint64     Term  = 2 [(gogoproto.nullable) = false]; // must be 64-bit aligned for atomic operations
    optional uint64     Index = 3 [(gogoproto.nullable) = false]; // must be 64-bit aligned for atomic operations
    optional EntryType  Type  = 1 [(gogoproto.nullable) = false];
    optional bytes      Data  = 4;
}

message SnapshotMetadata {
    optional ConfState conf_state = 1 [(gogoproto.nullable) = false];
    optional uint64    index      = 2 [(gogoproto.nullable) = false];
    optional uint64    term       = 3 [(gogoproto.nullable) = false];
}

message Snapshot {
    optional bytes            data     = 1;
    optional SnapshotMetadata metadata = 2 [(gogoproto.nullable) = false];
}

Среди них запись — logEntry, что означает журнал.

1. Интерфейс, предоставляемый библиотекой Raft

Библиотека Raft для etcd не является готовой.Приложению необходимо реализовать хранение и обмен данными по сети. Storage io определяется как интерфейс Storage в библиотеке Raft, который является интерфейсом, используемым библиотекой Raft для чтения данных, таких как журнал, моментальный снимок и т. д. Сама библиотека Raft предоставляет реализацию MemoryStorage, основанную на хранении в памяти, и на нее нельзя полагаться исключительно для хранения постоянных данных.

Интерфейс этого хранилища определяется следующим образом:

type Storage interface {
    // InitialState returns the saved HardState and ConfState information.
    InitialState() (pb.HardState, pb.ConfState, error)
    // Entries returns a slice of log entries in the range [lo,hi).
    // MaxSize limits the total size of the log entries returned, but
    // Entries returns at least one entry if any.
    Entries(lo, hi, maxSize uint64) ([]pb.Entry, error)
    // Term returns the term of entry i, which must be in the range
    // [FirstIndex()-1, LastIndex()]. The term of the entry before
    // FirstIndex is retained for matching purposes even though the
    // rest of that entry may not be available.
    Term(i uint64) (uint64, error)
    // LastIndex returns the index of the last entry in the log.
    LastIndex() (uint64, error)
    // FirstIndex returns the index of the first log entry that is
    // possibly available via Entries (older entries have been incorporated
    // into the latest Snapshot; if storage only contains the dummy entry the
    // first log entry is not available).
    FirstIndex() (uint64, error)
    // Snapshot returns the most recent snapshot.
    // If snapshot is temporarily unavailable, it should return ErrSnapshotTemporarilyUnavailable,
    // so raft state machine could know that Storage needs some time to prepare
    // snapshot and call Snapshot later.
    Snapshot() (pb.Snapshot, error)
}

Поскольку одного memoryStorage недостаточно, давайте посмотрим, как сам etcd использует Raft libray. Интерфейс Storage в etcd фактически повторно использует memoryStorage, но рассматривает его только как слой кеша памяти. В каждой транзакционной операции etcd заранее сбрасывает содержимое хранилища на постоянное запоминающее устройство, а затем записывает в memoryStorage. Как упоминалось выше, хранилище используется только для передачи содержимого в библиотеку Raft, если можно гарантировать его согласованность с постоянным содержимым. И это легко гарантировать на одной машине. Кроме того, библиотека Raft управляет Storage через raftlog.Подробнее см.etcd/raft/raft.go.

2. Конкретная реализация etcd

Сервер etcd реализует постоянное хранилище через WAL и моментальные снимки. etcd использует слой-оболочку, структуру, называемую хранилищем. Чтобы избежать путаницы, вставьте немного кода (etcd/etcdserver/storage.go).

type Storage interface {
    // Save function saves ents and state to the underlying stable storage.
    // Save MUST block until st and ents are on stable storage.
    Save(st raftpb.HardState, ents []raftpb.Entry) error
    // SaveSnap function saves snapshot to the underlying stable storage.
    SaveSnap(snap raftpb.Snapshot) error
    // Close closes the Storage and performs finalization.
    Close() error
}

type storage struct {
    *wal.WAL
    *snap.Snapshotter
}

func NewStorage(w *wal.WAL, s *snap.Snapshotter) Storage {
    return &storage{w, s}
}

Обратите внимание, что это Хранилище не имеет ничего общего с предыдущим Хранилищем, так что не путайте его.

Из-за особенностей языка golang структура хранилища может напрямую использовать методы WAL и Snapshotter, поскольку имя переменной-члена не объявлено. Так как же etcd использует memoryStorage библиотеки Raft со структурой хранилища здесь? ответetcd/etcdserver/плот.го.etcd дополнительно инкапсулирует библиотеку Raft, называемую raftNode, которая содержит анонимный элемент raftNodeConfig. Определение raftNodeConfig выглядит так:

type raftNodeConfig struct {
    // to check if msg receiver is removed from cluster
    isIDRemoved func(id uint64) bool
    raft.Node
    raftStorage *raft.MemoryStorage
    storage     Storage
    heartbeat   time.Duration // for logging
    // transport specifies the transport to send and receive msgs to members.
    // Sending messages MUST NOT block. It is okay to drop messages, since
    // clients should timeout and reissue their messages.
    // If transport is nil, server will panic.
    transport rafthttp.Transporter
}

Исходный код выглядит понятным с первого взгляда, raftStorage предоставляется библиотеке Raft, а хранилище — это постоянное хранилище, реализованное etcd. При использовании etcd реализует ту же логику в виде непрерывных вызовов. Взяв в качестве примера перезапуск сервера etcd, давайте посмотрим, как реализована синхронизация, и посмотрим на реализацию restartNode().

func restartNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
    var walsnap walpb.Snapshot
    if snapshot != nil {
        walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
    }
    w, id, cid, st, ents := readWAL(cfg.WALDir(), walsnap)

    plog.Infof("restarting member %s in cluster %s at commit index %d", id, cid, st.Commit)
    cl := membership.NewCluster("")
    cl.SetID(cid)
    s := raft.NewMemoryStorage()
    if snapshot != nil {
        s.ApplySnapshot(*snapshot)
    }
    s.SetHardState(st)
    s.Append(ents)
    c := &raft.Config{
        ID:              uint64(id),
        ElectionTick:    cfg.ElectionTicks,
        HeartbeatTick:   1,
        Storage:         s,
        MaxSizePerMsg:   maxSizePerMsg,
        MaxInflightMsgs: maxInflightMsgs,
        CheckQuorum:     true,
    }

    n := raft.RestartNode(c)
    raftStatusMu.Lock()
    raftStatus = n.Status
    raftStatusMu.Unlock()
    advanceTicksForElection(n, c.ElectionTick)
    return id, cl, n, s, w
}

Основная логика этой функции заключается в восстановлении состояния memoryStorage путем чтения моментального снимка и WAL, а затем с помощью s.SetHardState() и s.Append(). Аналогично и в рабочем процессе etcd, если не верите, посмотрите метод start() в raft.go:

    if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
        plog.Fatalf("raft save state and entries error: %v", err)
    }
    if !raft.IsEmptyHardState(rd.HardState) {
        proposalsCommitted.Set(float64(rd.HardState.Commit))
    }
    // gofail: var raftAfterSave struct{}
    r.raftStorage.Append(rd.Entries)

Я удалил часть кода, общая логика видна более четко. Последовательные вызовы r.storage.Save() и r.raftStorage.Append() обеспечивают согласованность хранилища и raftStorage.

Отлично! Хранилище состояния здесь, но это только основное содержание Raft.Позже мы продолжим изучение реализации репликации журналов Raft, выбора лидера, отправки транзакций и, конечно же, RPC.

 

 

Использованная литература:

【1】 Докторская диссертация Диего Онгаро из Стэнфордского университета "КОНСЕНСУС: СОЕДИНЕНИЕ ТЕОРИИ И ПРАКТИКИ"

[2] Страница Raft на github. https://raft.github.io/

[3] Визуализация плота. http://thesecretlivesofdata.com/raft/

[4] Проект etcd на github. https://github.com/coreos/etcd   (last but not least)