Списки и очереди — поговорим о безопасности потоков

задняя часть Python Безопасность

Эта статья была впервые опубликована вЗнай почти

Ключевые слова: потокобезопасность, GIL, атомарные операции, типы хранения данных (List, Queue.Queue, collections.deque).

Когда несколько потоков модифицируют один и тот же ресурс одновременно, мы должны убедиться, что модификация не будет конфликтовать, а модификация данных не вызовет ошибок, то есть мы должны обеспечить потокобезопасность.

В то же время мы знаем, что из-за наличия GIL в python даже при включении нескольких потоков одновременно выполняется только один поток.

Значит ли это, что при выполнении нескольких потоков в python конфликтов не будет? ответ отрицательный.

Поток небезопасен в GIL

Посмотрите на следующий код

import threading
import time
zero = 0
def change_zero():
    global zero
    for i in range(3000000):
        zero += 1
        zero -= 1

th1 = threading.Thread(target = change_zero)
th2 = threading.Thread(target = change_zero)
th1.start()
th2.start()
th1.join()
th2.join()
print(zero)

Два потока изменяются вместеzeroПеременная, каждый раз, когда операция переменной состоит в том, чтобы добавить 1, а затем вычесть 1, разумно выполнить 3000000 раз,zeroРезультат все равно должен быть 0, но после запуска этого кода обнаруживается, что результат часто не 0, а результат каждого запуска разный, что является результатом конфликта между модификациями данных.

Основная причина в том, что,zero += 1Этот этап операции не является простым этапом, его можно рассматривать как комбинацию двух следующих этапов:

x = zero + 1
zero = x

Так что, возможно, когда поток выполняется, только один шаг выполняется за два шага.x = zero + 1(то есть до того, как он успеет изменить ноль), блокировка GIL отдается другому потоку (если вы не знакомы с концепцией блокировки и GIL, вы можете сначала прочитать егоэта статья), подождите, пока GIL вернется к первому потоку, ноль изменится, а затем выполните следующийzero = x, результат не правильныйzeroДобавлено 1. Полный процесс имитации ошибки выглядит следующим образом

初始:zero = 0
th1: x1 = zero + 1  # x1 = 1
th2: x2 = zero + 1  # x2 = 1
th2: zero = x2      # zero = 1
th1: zero = x1      # zero = 1  问题出在这里,两次赋值,本来应该加2变成了加1
th1: x1 = zero - 1  # x1 = 0
th1: zero = x1      # zero = 0
th2: x2 = zero - 1  # x2 = -1
th2: zero = x2      # zero = -1
结果:zero = -1

Чтобы лучше объяснить, почему python по-прежнему небезопасен для потоков в рамках GIL, здесь необходимо ввести концепцию:атомарная операция

атомарная операция

Атомарная операция относится к операции, которая не будет прервана механизмом планирования потоков.Как только эта операция начнется, она будет выполняться до конца и не будет переключаться на другие потоки в середине.

рисунокzero += 1Такая программа, которую можно разбить на несколько шагов, не является атомарной операцией. Неполное выполнение атомарной операции не является прямым следствием ее выполнения, и можно переключиться на другие потоки.В это время, если другие потоки модифицируют одну и ту же переменную, могут возникнуть конфликты модификации ресурсов.

Одним из решений является блокировка вышеуказанногоchange_zeroОпределение функции изменено на

def change_zero():
    global zero
    for i in range(1000000):
        with lock:
            zero += 1
            zero -= 1

После добавления блокировки программа внутри блокировки либо не будет выполняться, либо будет переключена на другие потоки после завершения выполнения, что фактически эквивалентно реализации «искусственной атомарной операции». Это предотвращает конфликтующие модификации ресурсов. Читатели могут попробовать перезапустить программу после блокировки и увидят результатzeroПеременные всегда выводятся как 0.

Давайте рассмотрим вопрос: если сама программа является атомарной операцией, обеспечивает ли она автоматически потокобезопасность, поэтому нет необходимости ее блокировать? Ответ положительный.

Например, теперь мы хотим поддерживать очередь и запускать несколько потоков, одни потоки отвечают за добавление элементов в очередь, а другие потоки отвечают за извлечение элементов для последующей обработки. В настоящее время эта очередь является объектом, совместно используемым несколькими потоками, и мы должны убедиться, что не будет конфликтов в процессе добавления и извлечения, то есть обеспечить безопасность потоков. Если процесс операции сложен, мы можем заблокировать несколько операций без перерыва. Но если сама программа атомарна, никаких дополнительных защит добавлять не нужно.

Например, мы используемqueueв модулеQueueобъект для обслуживания очереди, черезQueue.putзаполните элемент, черезQueue.getизвлекать элементы, потому чтоQueue.putиQueue.getВсе операции являются атомарными, поэтому либо выполнять, либо не выполнять, нет проблемы прерывания, поэтому нет необходимости добавлять сюда избыточные меры защиты.

Поэтому здесь естественно возникает вопрос: как узнать, какие операции атомарны, а какие нет?Официальный сайттаблица выше

Это все атомарные операции

L.append(x)
L1.extend(L2)
x = L[i]
x = L.pop()
L1[i:j] = L2
L.sort()
x = y
x.field = y
D[x] = y
D1.update(D2)
D.keys()

Следующие операции не являются атомарными

i = i+1
L.append(L[-1])
L[i] = L[j]
D[x] = D[x] + 1

Здесь следует отметить одну вещь: мы иногда слышим, что объект списка в python не является потокобезопасным, Это утверждение не является строгим, потому что независимо от того, безопасен поток или нет, это не объект, а операция. Если мы ссылаемся на операцию, подобную этойL[0] = L[0] + 1, что, конечно, не является атомарной операцией и приведет к небезопасности потока, если оставить ее незащищенной, в то время какL.append(i)Такие операции потокобезопасны.

Поэтому списки можно использовать в качестве объектов хранения в нескольких потоках. Но мы обычно не используем списки, а используемqueue.Queue, потому что последний внутренне реализуетConditionКоммуникационный механизм замка см.эта статья

Вернемся к атомарным операциям.Хотя некоторые общие операции перечислены на официальном сайте, иногда нам все же нужны методы, о которых мы можем судить сами, которые можно использоватьdisмодульныйdisфункция, такая как следующая

from dis import dis
a = 0
def fun():
    global a
    a = a + 1
    
dis(fun)

Результат выглядит следующим образом

  5     0 LOAD_GLOBAL       0 (a)
        3 LOAD_CONST        1 (1)
        6 BINARY_ADD
        7 STORE_GLOBAL      0 (a)
       10 LOAD_CONST        0 (None)
       13 RETURN_VALUE

Нам просто нужно обратить внимание на каждую строку, каждая строка означает выполнить этоfunПроцесс функции можно разделить на следующие шаги: импорт глобальных переменных --> импорт констант --> выполнение сложения --> сохранение переменных..., каждый шаг здесь представляет собой байт-код инструкции, который можно рассматривать как атомарную операцию. . Здесь перечисленыfunПроцесс выполнения функции, и о чем нам нужно позаботиться, этоa = a + 1Этот процесс содержит несколько инструкций, вы можете видеть, что он содержит две, а именноBINARY_ADDиSTORE_GLOBAL, если поток переключается после выполнения первого (операция и суммирование), а второй (присваивание) еще не запущен, возникнет конфликт ресурсов модификации в нашем примере выше.

Давайте взглянемL.append(i)байт-код процедуры

from dis import dis
l = []
def fun():
    global l
    l.append(1)
    
dis(fun)

получил ответ

  5    0 LOAD_GLOBAL       0 (l)
       3 LOAD_ATTR         1 (append)
       6 LOAD_CONST        1 (1)
       9 CALL_FUNCTION     1 (1 positional, 0 keyword pair)
      12 POP_TOP
      13 LOAD_CONST        0 (None)
      16 RETURN_VALUE

можно увидетьappendНа самом деле толькоPOP_TOPЭтот шаг либо выполняется, либо не выполняется, и проблем с прерыванием не будет.

Здесь мы поговорим об атомарных операциях, а затем сравним структуры данных очереди, обычно используемые в python.

Обычное сравнение очередей в python

List VS Queue.Queue VS collections.deque

Во-первых, нам нужноQueue.QueueОн отличается от двух других тем, что в основном используется для связи между потоками, а два других в основном используются в качестве инструментов для хранения данных. когда нам нужно достичьConditionКогда заблокировано, используйтеQueue.Queue, последние два используются просто для хранения данных.

collections.dequeиlistРазница в основном заключается во вставке и извлечении данных. Если вы хотите вставить данные в голову списка или извлечь данные из головы, эффективность первого намного выше, чем у второго.Это функция двусторонней очереди первого, и его преимущества Вне всяких сомнений. Если порядок извлечения не имеет значения, не нужно использоватьcollections.deque

Этот раздел в основном относится к следующим двум ответам

Добро пожаловать, чтобы обратить внимание на мою колонку знаний

Главная страница колонки:программирование на питоне

Каталог столбцов:содержание

Примечания к выпуску:Примечания к выпуску программного обеспечения и пакетов