Обзор
Традиционная многопоточная схема будет использовать стратегию «мгновенное создание, мгновенное уничтожение». Хотя время создания потока значительно сократилось по сравнению с созданием процесса, если задача, переданная в поток, имеет короткое время выполнения и выполняется очень часто, сервер будет находиться в состоянии постоянного создания и уничтожения потоков.
Время выполнения потока можно разделить на три части: время запуска потока, время выполнения тела потока и время уничтожения потока. В случае многопоточности, если потоки нельзя использовать повторно, это означает, что каждое создание должно пройти через три процесса: запуск, уничтожение и запуск. Это неизбежно увеличит соответствующее время работы системы и снизит эффективность.
Использование пула потоков: поскольку поток создается заранее и помещается в пул потоков, а после обработки текущей задачи он не уничтожается, а организуется для обработки следующей задачи, поэтому можно избежать многократного создания потоков, тем самым экономя накладные расходы на создание и уничтожение потоков, что может повысить производительность и стабильность системы.
модель пула потоков
Это достигается путем создания экземпляра Thread(), и ниже будет использоваться класс, наследующий threading.Thread().
# 创建队列实例, 用于存储任务
queue = Queue()
# 定义需要线程池执行的任务
def do_job():
while True:
i = queue.get()
time.sleep(1)
print 'index %s, curent: %s' % (i, threading.current_thread())
queue.task_done()
if __name__ == '__main__':
# 创建包括3个线程的线程池
for i in range(3):
t = Thread(target=do_job)
t.daemon=True # 设置线程daemon 主线程退出,daemon线程也会推出,即时正在运行
t.start()
# 模拟创建线程池3秒后塞进10个任务到队列
time.sleep(3)
for i in range(10):
queue.put(i)
queue.join()
- описание демона:
Если атрибут демона дочернего потока имеет значение False, когда основной поток завершается, он определяет, завершается ли дочерний поток.Если дочерний поток все еще выполняется, основной поток будет ждать его завершения перед выходом;
Если атрибут демона подпотока равен True, основной поток завершится без проверки подпотока в конце выполнения, и все подпотоки, значение демона которых равно True, завершатся с основным потоком, независимо от того, операция завершена или нет.
daemon=True означает, что поток является потоком демона, и его выход не может быть инициирован вне потока демона, поэтому, когда основной поток завершается, дочерний поток выходит непосредственно вместе с ним. - queue.task_done() Описание:
Роль queue.join() состоит в том, чтобы позволить основной программе заблокироваться и дождаться завершения очереди, а затем выйти, но как сообщить основной программе, что очередь извлечена и завершена? queue.get() может только сообщить основной программе, что очередь завершена, но это не означает, что задачи в очереди завершены, поэтому программе необходимо вызвать queue.task_done(), чтобы сообщить основной программе, что другая задача завершена, пока все задачи не будут выполнены, выход из основной программы
выходной результат
index 1, curent: <Thread(Thread-2, started daemon 139652180764416)>
index 0, curent: <Thread(Thread-1, started daemon 139652189157120)>
index 2, curent: <Thread(Thread-3, started daemon 139652172371712)>
index 4, curent: <Thread(Thread-1, started daemon 139652189157120)>
index 3, curent: <Thread(Thread-2, started daemon 139652180764416)>
index 5, curent: <Thread(Thread-3, started daemon 139652172371712)>
index 6, curent: <Thread(Thread-1, started daemon 139652189157120)>
index 7, curent: <Thread(Thread-2, started daemon 139652180764416)>
index 8, curent: <Thread(Thread-3, started daemon 139652172371712)>
index 9, curent: <Thread(Thread-1, started daemon 139652189157120)>
finish
Вы можете видеть, что все задачи выполнены в этих потоках Thread-(1-3)
Принцип пула потоков
Основной принцип пула потоков: мы ставим задачу в очередь, а затем открываем N потоков, каждый поток идет в очередь, чтобы получить задачу, после завершения выполнения сообщаем системе, что я закончил выполнение, а затем перейти в очередь, чтобы получить следующую задачу, пока все задачи в очереди не опустеют и не выйдут из потока.
В приведенном выше примере создается пул потоков с 3 потоками, и каждый поток блокирует задачи, читающие очередь очереди, в бесконечном цикле.Все задачи будут обрабатываться только этими 3 предварительно созданными потоками.
Конкретное описание работы выглядит следующим образом:
- Создайте экземпляр Queue.Queue() и заполните его данными или задачами.
- Создайте пул потоков демона и установите поток в поток демона демона.
- Каждый поток блокируется в бесконечном цикле, чтобы прочитать элемент элемента очереди очереди и обработать его.
- После завершения каждого задания используйте функцию queue.task_done(), чтобы отправить в очередь сигнал о том, что задание выполнено.
- Основной поток устанавливает для queue.join() блокировку до тех пор, пока очередь задач не будет очищена, разблокирована и выполнена вниз.
В этом режиме есть несколько моментов:
- Установка потока пула потоков в качестве процесса демона демона означает, что при завершении основного потока поток демона также завершится автоматически.
Если daemon=False, поток, не являющийся демоном, заблокирует выход основного потока, поэтому даже если задача очереди очереди была завершена
Пул потоков по-прежнему блокирует бесконечный цикл ожидания задач, так что основной поток не завершится. - Когда основной поток использует queue.join(), это означает, что основной поток будет блокироваться до тех пор, пока очередь не будет очищена, и как основной поток узнает, что очередь была очищена? То есть после каждого потока queue.get() и обработки задачи отправляется сигнал queue.task_done(), и данные очереди будут уменьшаться на 1 до тех пор, пока данные очереди не станут пустыми, queue.join() разблокируется, вниз воплощать в жизнь.
- В этом режиме в основном преобладает задача очереди очереди, и он завершается после завершения задачи.Поскольку пул потоков является демоном, все потоки основного пула потоков выхода будут завершены. В отличие от нашей обычной блокировки thread.join(), в которой может доминировать очередь, этот тип потока блокирует основной поток до его завершения. Посмотрите, какой join() использовать:
Если вы хотите завершить очередь из определенного количества задач, используйте функцию queue.join(), например, сканирование определенного количества веб-страниц.
Если вы хотите, чтобы поток завершил задачу и завершился, используйте thread.join()
Пример: Написание веб-сервера с использованием пула потоков
import socket
import threading
from threading import Thread
import threading
import sys
import time
import random
from Queue import Queue
host = ''
port = 8888
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((host, port))
s.listen(3)
class ThreadPoolManger():
"""线程池管理器"""
def __init__(self, thread_num):
# 初始化参数
self.work_queue = Queue()
self.thread_num = thread_num
self.__init_threading_pool(self.thread_num)
def __init_threading_pool(self, thread_num):
# 初始化线程池,创建指定数量的线程池
for i in range(thread_num):
thread = ThreadManger(self.work_queue)
thread.start()
def add_job(self, func, *args):
# 将任务放入队列,等待线程池阻塞读取,参数是被执行的函数和函数的参数
self.work_queue.put((func, args))
class ThreadManger(Thread):
"""定义线程类,继承threading.Thread"""
def __init__(self, work_queue):
Thread.__init__(self)
self.work_queue = work_queue
self.daemon = True
def run(self):
# 启动线程
while True:
target, args = self.work_queue.get()
target(*args)
self.work_queue.task_done()
# 创建一个有4个线程的线程池
thread_pool = ThreadPoolManger(4)
# 处理http请求,这里简单返回200 hello world
def handle_request(conn_socket):
recv_data = conn_socket.recv(1024)
reply = 'HTTP/1.1 200 OK \r\n\r\n'
reply += 'hello world'
print 'thread %s is running ' % threading.current_thread().name
conn_socket.send(reply)
conn_socket.close()
# 循环等待接收客户端请求
while True:
# 阻塞等待请求
conn_socket, addr = s.accept()
# 一旦有请求了,把socket扔到我们指定处理函数handle_request处理,等待线程池分配线程处理
thread_pool.add_job(handle_request, *(conn_socket, ))
s.close()
# 运行进程
[master][/data/web/advance_python/socket]$ python sock_s_threading_pool.py
# 查看线程池状况
[master][/data/web/advance_python/socket]$ ps -eLf|grep sock_s_threading_pool
lisa+ 27488 23705 27488 0 5 23:22 pts/30 00:00:00 python sock_s_threading_pool.py
lisa+ 27488 23705 27489 0 5 23:22 pts/30 00:00:00 python sock_s_threading_pool.py
lisa+ 27488 23705 27490 0 5 23:22 pts/30 00:00:00 python sock_s_threading_pool.py
lisa+ 27488 23705 27491 0 5 23:22 pts/30 00:00:00 python sock_s_threading_pool.py
lisa+ 27488 23705 27492 0 5 23:22 pts/30 00:00:00 python sock_s_threading_pool.py
# 跟我们预期一样一共有5个线程,一个主线程,4个线程池线程
Эта структура написания веб-сервера пула потоков включает следующие компоненты и шаги:
- Определите диспетчер пула потоков ThreadPoolManger для создания пула потоков и управления им, предоставления интерфейса add_job() и добавления задач в пул потоков.
- Определите рабочий поток ThreadManger, определите метод run(), отвечающий за рабочую очередь бесконечного цикла, и выполните задачи очереди.
- Определите запрос прослушивания сокета s.accept() и обработайте задачу handle_requests() запроса.
- Инициализировать четыре потока пула потоков заблокированы в ожидании этой очереди очереди задач чтения
- Когда Socket.Accept () имеет запрос, поставить CONсокет как параметр, дескрипторметод запроса, бросаем его в пул потоков, ждем, пока пул потоков выделит обработку потока
Влияние GIL на многопоточность
Поскольку потоки Python являются реальными потоками, когда интерпретатор выполняет код, существует блокировка GIL: глобальная блокировка интерпретатора. Перед выполнением любого потока Python сначала должна быть получена блокировка GIL. Затем каждые 100 байт кода выполняются интерпретатором. will Блокировка GIL снимается автоматически, давая возможность другим потокам выполняться. Эта глобальная блокировка GIL фактически блокирует исполняемый код всех потоков, поэтому многопоточность в Python может выполняться только попеременно, даже если 100 потоков выполняются на 100-ядерном процессоре, может использоваться только одно ядро.
Но для задач с интенсивным вводом-выводом многопоточность по-прежнему играет большую роль в повышении эффективности.Это совместная многозадачность.Когда начинается задача, такая как сетевой ввод-вывод, и в течение длительного или неопределенного времени код Python не выполняется. При необходимости один поток выдает GIL, чтобы другие потоки могли получить GIL и запустить Python. Такое вежливое поведение называется кооперативной многозадачностью и допускает параллелизм; несколько потоков одновременно ожидают разных событий.
Две потоки одновременно только одна реализация Python, но после того, как нить запускается для подключения, она даст GIL, так что другие потоки могут работать. Это означает, что две одновременные потоки могут дождаться соединения сокета, которое хорошо. В то же время они могут сделать больше работы.
Насколько должен быть установлен пул потоков?
Количество ядер ЦП сервера ограничено, и количество одновременных потоков, которые могут быть параллельными, ограничено.Он не более открыт, тем лучше, и переключение потоков стоит дорого.Если переключение потоков слишком частое, это уменьшит представление.
Во время выполнения потока время вычислений делится на две части:
- Расчет ЦП, занимающий ЦП
- Не требует вычисления ЦП, не занимает ЦП, ожидает возврата ввода-вывода, например, recv(), accept(), sleep() и других операций, конкретная операция, такая как
Доступ к кэше, RPC вызовов ниже по течению услуг, доступ к БД и другие операции, которые требуют сетевых звонков
Тогда, если время расчета составляет 50 %, а время ожидания 50 %, то для достижения наибольшего коэффициента использования можно открыть 2 потока: если время работы составляет 2 секунды, после того, как ЦП вычислит 1 секунду, он потоку требуется 1 секунда для ожидания ввода-вывода. Когда ЦП простаивает, вы можете переключиться на другой поток. После того, как ЦП поработает в течение 1 секунды, поток ожидает 1 секунду для ввода-вывода. В это время ЦП может быть переключился обратно. В это время первый поток только что завершил 1 секунду ввода-вывода. Подождите, вы можете позволить ЦП продолжать работу и, таким образом, переключать операции перед двумя потоками в цикле.
Тогда если время расчета составляет 20%, а время ожидания 80%, то для достижения наибольшего коэффициента использования можно открыть 5 потоков: можно представить, что на выполнение задачи уходит 5 секунд, процессор занимает 1 секунда, время ожидания 4 секунды, и ЦП ожидает поток. , вы можете активировать 4 потока одновременно, чтобы время ожидания ЦП и ввода-вывода могло максимально перекрываться.
Резюмируя, формула для расчета количества потоков: N-ядерный сервер, локальное вычислительное время равно x, а время ожидания равно y посредством однопоточного анализа бизнес-исполнения, затем количество рабочих потоков (количество потоков в пуле потоков) устанавливается равным N*(x +y)/x, чтобы максимизировать загрузку ЦП. Из-за влияния GIL python может использовать только 1 ядро, поэтому установите здесь N=1.
обо мне
Если статья окажется для вас полезной, вы можете ее собрать и переслать, что станет для меня большим стимулом! Кроме того, вы можете обратить внимание на мой паблик [Code Farmer Fu Ge] (coder2025), я продолжу выкладывать оригинальные алгоритмы и базовые компьютерные статьи!