Многопоточное программирование (Linux C)

Linux

Можно сказать, что многопоточное программирование является основным навыком каждого программиста, а также одной из трудностей в разработке.В этой статье в качестве примера используется Linux C для описания создания потоков и нескольких часто используемых методов синхронизации потоков.Наконец , выполняется многопоточное программирование.Обобщайте и думайте и приведите примеры кода.

1. Создайте тему

Первым шагом в многопоточном программировании является создание потока. Создание потока фактически добавляет поток управления, так что несколько потоков управления в одном процессе выполняются одновременно или параллельно.

Функция создания потока, другие функции здесь не перечислены, вы можете обратиться кpthread.h.

#include<pthread.h>

int pthread_create(
    pthread_t *restrict thread,  /*线程id*/
	const pthread_attr_t *restrict attr,    /*线程属性,默认可置为NULL,表示线程属性取缺省值*/
	void *(*start_routine)(void*),  /*线程入口函数*/ 
	void *restrict arg  /*线程入口函数的参数*/
	);

Пример кода:

#include<stdio.h>
#include<string.h>
#include<stdlib.h>
#include<unistd.h>
#include<pthread.h>

char* thread_func1(void* arg) {
    pid_t pid = getpid();
    pthread_t tid = pthread_self();
    printf("%s pid: %u, tid: %u (0x%x)\n", (char*)arg, (unsigned int)pid, (unsigned int)tid, (unsigned int)tid);

    char* msg = "thread_func1";
    return msg;
}

void* thread_func2(void* arg) {
    pid_t pid = getpid();
    pthread_t tid = pthread_self();
    printf("%s pid: %u, tid: %u (0x%x)\n", (char*)arg, (unsigned int)pid, (unsigned int)tid, (unsigned int)tid);
    char* msg = "thread_func2 ";
    while(1) {
        printf("%s running\n", msg);
        sleep(1);
    }
    return NULL;
}

int main() {
    pthread_t tid1, tid2;
    if (pthread_create(&tid1, NULL, (void*)thread_func1, "new thread:") != 0) {
        printf("pthread_create error.");
        exit(EXIT_FAILURE);
    }

    if (pthread_create(&tid2, NULL, (void*)thread_func2, "new thread:") != 0) {
        printf("pthread_create error.");
        exit(EXIT_FAILURE);
    }
    pthread_detach(tid2);

    char* rev = NULL;
    pthread_join(tid1, (void *)&rev);
    printf("%s return.\n", rev);
    pthread_cancel(tid2);

    printf("main thread end.\n");
    return 0;
}

2. Синхронизация потоков

Иногда нам нужно, чтобы несколько потоков взаимодействовали друг с другом для выполнения, а затем требуется синхронизация между потоками. Общие методы синхронизации между потоками:

  • взаимоисключающий
  • сигнал
  • переменная условия

Давайте сначала рассмотрим пример без синхронизации потоков:

#include<stdio.h>
#include<string.h>
#include<stdlib.h>
#include<unistd.h>
#include<pthread.h>

#define LEN 100000
int num = 0;

void* thread_func(void* arg) {
    for (int i = 0; i< LEN; ++i) {
        num += 1;
    }
    
    return NULL;
}

int main() {
    pthread_t tid1, tid2;
    pthread_create(&tid1, NULL, (void*)thread_func, NULL);
    pthread_create(&tid2, NULL, (void*)thread_func, NULL);

    char* rev = NULL;
    pthread_join(tid1, (void *)&rev);
    pthread_join(tid2, (void *)&rev);

    printf("correct result=%d, wrong result=%d.\n", 2*LEN, num);
    return 0;
}

результат операции:correct result=200000, wrong result=106860..

[1] Взаимное исключение

Это проще всего понять: при доступе к критическим ресурсам через взаимное исключение не более чем один поток может получить критические ресурсы одновременно.

По сути, логика взаимного исключения такова: если обнаруживается, что ни один другой поток не заблокирован при доступе к уличному ресурсу, он будет заблокирован для получения критического ресурса.В течение этого периода, если другие потоки выполнят мьютекс и обнаружат, что он заблокирован, поток будет зависать и ждать разблокировки, а текущий поток получит доступ к блокировке.После того, как критический ресурс будет завершен, разблокируйте и разбудите другие потоки, приостановленные мьютексом, ожидающие повторного выполнения по расписанию.

Как реализованы операции «приостановить ожидание» и «разбудить ожидающий поток»? У каждого мьютекса есть очередь ожидания. Поток должен приостановить и ожидать в мьютексе. Сначала он добавляет себя в очередь ожидания, затем переводит поток в спящее состояние, а затем вызывает функцию планировщика для переключения на другой поток. Если поток хочет разбудить другие потоки в очереди ожидания, ему нужно только взять элемент из очереди ожидания, изменить свое состояние со сна на готовность и присоединиться к очереди готовности, тогда при следующем выполнении функции планировщика можно переключиться на пробуждение.

Основные функции следующие:

#include <pthread.h>

int pthread_mutex_init(pthread_mutex_t *restrict mutex,     
       const pthread_mutexattr_t *restrict attr);       /*初始化互斥量*/
int pthread_mutex_destroy(pthread_mutex_t *mutex);      /*销毁互斥量*/
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);

Используйте взаимное исключение, чтобы решить проблему неправильных результатов вычислений выше. Пример выглядит следующим образом:

#include<stdio.h>
#include<string.h>
#include<stdlib.h>
#include<unistd.h>
#include<pthread.h>

#define LEN 100000
int num = 0;

void* thread_func(void* arg) {
    pthread_mutex_t* p_mutex = (pthread_mutex_t*)arg;
    for (int i = 0; i< LEN; ++i) {
        pthread_mutex_lock(p_mutex);
        num += 1;
        pthread_mutex_unlock(p_mutex);
    }
    
    return NULL;
}

int main() {
    pthread_mutex_t m_mutex;
    pthread_mutex_init(&m_mutex, NULL);

    pthread_t tid1, tid2;
    pthread_create(&tid1, NULL, (void*)thread_func, (void*)&m_mutex);
    pthread_create(&tid2, NULL, (void*)thread_func, (void*)&m_mutex);

    pthread_join(tid1, NULL);
    pthread_join(tid2, NULL);

    pthread_mutex_destroy(&m_mutex);

    printf("correct result=%d, result=%d.\n", 2*LEN, num);
    return 0;
}

результат операции:correct result=200000, result=200000.

Если в мьютекс вложены другие коды взаимного исключения, вам нужно обратить внимание на проблему взаимоблокировки.

Возможны две ситуации взаимоблокировки:

  • Одна ситуация: если один и тот же поток вызывает блокировку дважды, во втором вызове, поскольку блокировка уже занята, поток приостанавливается, ожидая, пока другие потоки снимут блокировку, но блокировка занята сама по себе. Поток снова приостановлен и не имеет возможности снять блокировку, поэтому всегда находится в состоянии ожидания, что приводит к тупиковой ситуации.
  • Другая типичная ситуация взаимоблокировки: поток A получает блокировку 1, а поток B получает блокировку 2. В это время поток A вызывает блокировку, чтобы попытаться получить блокировку 2. В результате ему необходимо приостановить работу и дождаться, пока поток B освободит блокировку 2. В это время поток B также вызывает блокировку, чтобы попытаться получить блокировку 1. В результате ему необходимо приостановить и дождаться, пока поток A освободит блокировку 1, поэтому потоки A и B навсегда останутся в приостановленном состоянии.

Как избежать взаимоблокировки:

  1. Нет блокировок мьютексов (часто это сложно сделать)
  2. При написании программ следует стараться избегать одновременного получения нескольких блокировок.
  3. Если это абсолютно необходимо, существует принцип: если все потоки получают блокировки в одном и том же порядке, когда им требуется несколько блокировок (обычно в порядке адресов переменных Mutex), взаимоблокировки не будет. (Например, блокировка 1, блокировка 2 и блокировка 3 используются в программе, а адреса соответствующих им переменных мьютекса: блокировка 1 pthread_mutex_trylockпозвони вместо этогоpthread_mutex_lockвызов, чтобы избежать взаимоблокировки.)
[2] Переменная условия

Условные переменные можно обобщить следующим образом: поток должен дождаться выполнения определенного условия (и это условие определяется другими потоками), прежде чем он сможет продолжить выполнение.Теперь, когда это условие не установлено, поток блокируется и ждет, пока другие потоки сделают это во время процесса выполнения.Когда условие установлено, поток пробуждается для продолжения выполнения.

Соответствующие функции следующие:

#include <pthread.h>

int pthread_cond_destroy(pthread_cond_t *cond);
int pthread_cond_init(pthread_cond_t *restrict cond,
       const pthread_condattr_t *restrict attr);
int pthread_cond_timedwait(pthread_cond_t *restrict cond,
       pthread_mutex_t *restrict mutex,
       const struct timespec *restrict abstime);
int pthread_cond_wait(pthread_cond_t *restrict cond,
       pthread_mutex_t *restrict mutex);
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);

В качестве примера самой простой для понимания условной переменной, в режиме «производитель-потребитель» поток-производитель отправляет данные в очередь, а поток-потребитель берет данные из очереди.Когда скорость обработки потока-потребителя выше, чем что из потока производителя, в очереди не будет данных. Одно из решений - снова подождать некоторое время до «опроса», но этот метод не очень хорош. Вы не знаете, как долго вы должны ждать. В настоящее время условные переменные могут очень хорошо решить эту проблему. Вот код:

#include<sys/types.h>
#include<unistd.h>
#include<stdlib.h>
#include<stdio.h>
#include<pthread.h>
#include<errno.h>
#include<string.h>

#define LIMIT 1000

struct data {
    int n;
    struct data* next;
};

pthread_cond_t condv = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mlock = PTHREAD_MUTEX_INITIALIZER; 
struct data* phead = NULL;

void producer(void* arg) {
    printf("producer thread running.\n");
    int count = 0;
    for (;;) {
        int n = rand() % 100;
        struct data* nd = (struct data*)malloc(sizeof(struct data));
        nd->n = n;

        pthread_mutex_lock(&mlock);
        struct data* tmp = phead;
        phead = nd;
        nd->next = tmp;
        pthread_mutex_unlock(&mlock);
        pthread_cond_signal(&condv);

        count += n;

        if(count > LIMIT) {
            break;
        }
        sleep(rand()%5);
    }
    printf("producer count=%d\n", count);
}

void consumer(void* arg) {
    printf("consumer thread running.\n");
    int count = 0;
    for(;;) {
        pthread_mutex_lock(&mlock);
        if (NULL == phead) {
            pthread_cond_wait(&condv, &mlock);
        } else {
            while(phead != NULL) {
                count += phead->n;
                struct data* tmp = phead;
                phead = phead->next;
                free(tmp);
            }
        }
        pthread_mutex_unlock(&mlock);
        if (count > LIMIT)
            break;
    }
    printf("consumer count=%d\n", count);
}

int main() {
    pthread_t tid1, tid2;
    pthread_create(&tid1, NULL, (void*)producer, NULL);
    pthread_create(&tid2, NULL, (void*)consumer, NULL);
    
    pthread_join(tid1, NULL);
    pthread_join(tid2, NULL);

    return 0;
}

Логика выполнения в условной переменной:

Главное — понять реализациюint pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex)Что здесь произошло, все остальное понять проще. Перед выполнением этой функции вам необходимо сначала получить мьютекс и оценить, выполняются ли условия.Если условия выполнения выполнены, блокировка будет снята после продолжения выполнения вниз; если условия выполнения не выполнены, блокировка будет будет освобожден, и поток будет заблокирован здесь и будет ждать, пока другой поток уведомит, что условие выполнения выполнено, разбудит поток, снова заблокирует его и освободит блокировку после выполнения вниз. (Вкратце: снять блокировку --> блокировать ожидание --> заблокировать и вернуться после пробуждения)

Подробности реализации см. в исходном коде.pthread_cond_wait.cиpthread_cond_signal.c

Приведенный выше пример может показаться громоздким, следующий пример кода более лаконичен:

#include<sys/types.h>
#include<unistd.h>
#include<stdlib.h>
#include<stdio.h>
#include<pthread.h>
#include<errno.h>
#include<string.h>

#define NUM 3
pthread_cond_t condv = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mlock = PTHREAD_MUTEX_INITIALIZER; 

void producer(void* arg) {
    int n = NUM;
    while(n--) {
        sleep(1);
        pthread_cond_signal(&condv);
        printf("producer thread send notify signal. %d\t", NUM-n);
    }
}

void consumer(void* arg) {
    int n = 0;
    while (1) {
        pthread_cond_wait(&condv, &mlock);
        printf("recv producer thread notify signal. %d\n", ++n);
        if (NUM == n) {
            break;
        }
    }
}

int main() {
    pthread_t tid1, tid2;
    pthread_create(&tid1, NULL, (void*)producer, NULL);
    pthread_create(&tid2, NULL, (void*)consumer, NULL);
    
    pthread_join(tid1, NULL);
    pthread_join(tid2, NULL);

    return 0;
}

результат операции:

producer thread send notify signal. 1   recv producer thread notify signal. 1
producer thread send notify signal. 2   recv producer thread notify signal. 2
producer thread send notify signal. 3   recv producer thread notify signal. 3

【3】Семафор

Семафоры подходят для управления общим ресурсом, который поддерживает только ограниченное число пользователей. Значение счетчика, которое должно храниться между 0 и указанным максимальным значением. Когда поток завершаетsemaphoreКогда объект ожидает, значение счетчика уменьшается на единицу; когда поток завершаетsemaphoreКогда объект освобождается, значение счетчика увеличивается на единицу. Когда значение счетчика равно 0, поток зависает и ждет, пока значение счетчика не превысит 0.

Основные функции следующие:

#include <semaphore.h>

int sem_init(sem_t *sem, int pshared, unsigned int value);
int sem_wait(sem_t *sem);
int sem_trywait(sem_t *sem);
int sem_post(sem_t * sem);
int sem_destroy(sem_t * sem);

Пример кода выглядит следующим образом:

#include<sys/types.h>
#include<unistd.h>
#include<stdlib.h>
#include<stdio.h>
#include<pthread.h>
#include<errno.h>
#include<string.h>
#include<semaphore.h>

#define NUM 5

int queue[NUM];
sem_t psem, csem; 

void producer(void* arg) {
    int pos = 0;
    int num, count = 0;
    for (int i=0; i<12; ++i) {
        num = rand() % 100;
        count += num;
        sem_wait(&psem);
        queue[pos] = num;
        sem_post(&csem);
        printf("producer: %d\n", num); 
        pos = (pos+1) % NUM;
        sleep(rand()%2);
    }
    printf("producer count=%d\n", count);
}

void consumer(void* arg){
    int pos = 0;
    int num, count = 0;
    for (int i=0; i<12; ++i) {
        sem_wait(&csem);
        num = queue[pos];
        sem_post(&psem);
        printf("consumer: %d\n", num);
        count += num;
        pos = (pos+1) % NUM;
        sleep(rand()%3);
    }
    printf("consumer count=%d\n", count);    
} 

int main() {
    sem_init(&psem, 0, NUM);
    sem_init(&csem, 0, 0);

    pthread_t tid[2];
    pthread_create(&tid[0], NULL, (void*)producer, NULL);
    pthread_create(&tid[1], NULL, (void*)consumer, NULL);
    pthread_join(tid[0], NULL);
    pthread_join(tid[1], NULL);
    sem_destroy(&psem);
    sem_destroy(&csem);

    return 0;
}

Логика выполнения семафора:

Когда необходимо получить общие ресурсы, сначала проверьте семафор.Если значение больше 0, значение уменьшается на 1, и осуществляется доступ к общему ресурсу.После завершения доступа значение увеличивается на 1.Если если обнаружено, что семафор приостановил потоки, он разбудит один из них Thread; приостанавливает ожидание, если семафор проверен на 0.

Может ссылаться на исходный кодsem_post.c

Три, многопоточное программирование, резюме и размышления

Наконец, подведем итоги и подумаем о многопоточном программировании.

  • Первый момент заключается в том, что вы должны обратить внимание на вопрос синхронизации при многопоточном программировании, потому что в большинстве случаев цель создания многопоточности состоит в том, чтобы позволить им работать вместе, и если синхронизация не выполняется, могут возникнуть проблемы.
  • Второй момент – это проблема взаимоблокировки. Когда несколько потоков обращаются к нескольким критическим ресурсам, может возникнуть взаимоблокировка, если ее не обработать должным образом. Если компиляция пройдена, а среда выполнения зависла, может возникнуть взаимоблокировка.Сначала можно подумать о том, какие потоки будут обращаться к нескольким критическим ресурсам, чтобы проблема была найдена быстрее.
  • Третий момент, обработка критических ресурсов, проблема многопоточности во многом вызвана проблемой доступа нескольких потоков к критическим ресурсам.Один из способов решения этой проблемы - поместить весь доступ и обработку критических ресурсов в один поток. Потоки обслуживают запросы от других потоков, поэтому наличие только одного потока, обращающегося к критическому ресурсу, решает множество проблем.
  • Четвертый пункт — это пул потоков. При работе с большим количеством коротких задач мы можем сначала создать пул потоков. Потоки в пуле потоков постоянно выбирают задачи из очереди задач для выполнения, так что нет необходимости создавать и уничтожить большое количество потоков.Подробности.

Справочная документация:pthread.h - threads

Добро пожаловать в личную общедоступную учетную запись WeChat, поехали!

在这里插入图片描述