Сравнение производительности блокировки мьютекса и условной переменной в параллельном программировании на C++

C++
Сравнение производительности блокировки мьютекса и условной переменной в параллельном программировании на C++

представлять

В этой статье используется простейшая модель производитель-потребитель для сравнения использования путем запуска программы и наблюдения за использованием процессора процессом.Мьютекса такжеМьютекс + переменная условиясравнение производительности.

Модель производитель-потребитель этого примера,1продюсер,5потребитель. Поток-производитель помещает данные в очередь, а 5 потоков-потребителей извлекают данные из очереди.Перед извлечением данных необходимо определить, есть ли данные в очереди.Эта очередь является глобальной очередью, и данные распределяются между потоками, поэтому требуется взаимное исключение блокировка для защиты. То есть, когда производитель ставит данные в очередь, другие потребители не могут их взять, и наоборот.


Код реализации мьютекса

#include <iostream> // std::cout
#include <deque>    // std::deque
#include <thread>   // std::thread
#include <chrono>   // std::chrono
#include <mutex>    // std::mutex


// 全局队列
std::deque<int> g_deque;

// 全局锁
std::mutex g_mutex;

// 生产者运行标记
bool producer_is_running = true;

// 生产者线程函数
void Producer()
{
    // 库存个数
    int count = 8;
    
    do
    {
        // 智能锁,初始化后即加锁,保护的范围是代码花括号内,花括号退出即会自动解锁
        // 可以手动解锁,从而控制互斥锁的细粒度
        std::unique_lock<std::mutex> locker( g_mutex );
        // 入队一个数据
        g_deque.push_front( count );
        // 提前解锁,缩小互斥锁的细粒度,只针对共享的队列数据进行同步保护
        locker.unlock(); 

        std::cout << "生产者    :我现在库存有 :" << count << std::endl;
            
        // 放慢生产者生产速度,睡1秒
        std::this_thread::sleep_for( std::chrono::seconds( 1 ) );

        // 库存自减少
        count--;
    } while( count > 0 );
    
    // 标记生产者打样了
    producer_is_running = false;

    std::cout << "生产者    : 我的库存没有了,我要打样了!"  << std::endl;
}

// 消费者线程函数
void Consumer(int id)
{
    int data = 0;

    do
    {
        // 智能锁,初始化后即加锁,保护的范围是代码花括号内,花括号退出即会自动解锁
        // 可以手动解锁,从而控制互斥锁的细粒度
        std::unique_lock<std::mutex> locker( g_mutex );
        
        // 队列不为空
        if( !g_deque.empty() )
        {
            // 取出队列里最后一个数据
            data = g_deque.back();
            
            // 删除队列里最后一个数据
            g_deque.pop_back();
            
            // 提前解锁,缩小互斥锁的细粒度,只针对共享的队列数据进行同步保护
            locker.unlock();

            std::cout << "消费者[" << id << "] : 我抢到货的编号是 :" << data << std::endl;
        }
        // 队列为空
        else
        {
            locker.unlock();
        }
    } while( producer_is_running );
	
	std::cout << "消费者[" << id << "] :卖家没有货打样了,真可惜,下次再来抢!"  << std::endl;
}

int main(void)
{
    std::cout << "1 producer start ..." << std::endl;
    std::thread producer( Producer );

    std::cout << "5 consumer start ..." << std::endl;
    std::thread consumer[ 5 ];
    for(int i = 0; i < 5; i++)
    {
        consumer[i] = std::thread(Consumer, i + 1);
    }

    producer.join();

    for(int i = 0; i < 5; i++)
    {
        consumer[i].join();
    }

    std::cout << "All threads joined." << std::endl;

    return 0;
}

Результаты реализации блокировки взаимного исключения:

вывод результата

[root@lincoding condition]# g++ -std=c++0x -pthread -D_GLIBCXX_USE_NANOSLEEP main.cpp -o  main
[root@lincoding condition]# ./main
1 producer start ...
5 consumer start ...
生产者    :我现在库存有 :8
消费者[1] : 我抢到货的编号是 :8
消费者[1] : 我抢到货的编号是 :7
生产者    :我现在库存有 :7
生产者    :我现在库存有 :6
消费者[3] : 我抢到货的编号是 :6
生产者    :我现在库存有 :5
消费者[1] : 我抢到货的编号是 :5
生产者    :我现在库存有 :4
消费者[2] : 我抢到货的编号是 :4
生产者    :我现在库存有 :3
消费者[5] : 我抢到货的编号是 :3
生产者    :我现在库存有 :2
消费者[2] : 我抢到货的编号是 :2
生产者    :我现在库存有 :1
消费者[1] : 我抢到货的编号是 :1
生产者    : 我的库存没有了,我要打样了!消费者[
5] :卖家没有货打样了,真可惜,下次再来抢!
消费者[2] :卖家没有货打样了,真可惜,下次再来抢!
消费者[3] :卖家没有货打样了,真可惜,下次再来抢!
消费者[4] :卖家没有货打样了,真可惜,下次再来抢!
消费者[1] :卖家没有货打样了,真可惜,下次再来抢!
All threads joined.

Видно, что мьютекс действительно может выполнить эту задачу, но есть проблемы с производительностью.

  • ProducerЯвляется потоком-производителем, в процессе производства данных он будет отдыхать1秒, поэтому производственный процесс очень медленный;

  • Consumerэто потребительская нить, естьwhileЦикл завершится только тогда, когда будет установлено, что производитель не работает.whileЦикл, затем каждый раз в теле цикла он сначала блокируется, оценивает, что очередь не пуста, затем берет данные из очереди и, наконец, разблокирует ее. Итак, в продюсерском остатке1秒, потребительский поток на самом деле будет выполнять много бесполезной работы, что приведет к очень высокой загрузке ЦП!

Рабочая среда - 4-ядерный процессор

[root@lincoding ~]# grep 'model name' /proc/cpuinfo | wc -l
4

Верхняя команда для просмотра использования ЦП, можно увидеть, что накладные расходы на использование чистого мьютекса ЦП очень велики.mainЗагрузка процессора процессом достигла357.5%CPU, процессор системных накладных расходов54.5%sy, накладные расходы процессора на пользователя18.2%us

[root@lincoding ~]# top
top - 19:13:41 up 36 min,  3 users,  load average: 0.06, 0.05, 0.01
Tasks: 179 total,   1 running, 178 sleeping,   0 stopped,   0 zombie
Cpu(s): 18.2%us, 54.5%sy,  0.0%ni, 27.3%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
Mem:   1004412k total,   313492k used,   690920k free,    41424k buffers
Swap:  2031608k total,        0k used,  2031608k free,    79968k cached

   PID USER      PR  NI  VIRT  RES  SHR S %CPU %MEM    TIME+  COMMAND                                                                                                                       
 35346 root      20   0  137m 3288 1024 S 357.5  0.3   0:05.92 main                                                                                                                          
     1 root      20   0 19232 1492 1224 S  0.0  0.1   0:02.16 init                                                                                                                           
     2 root      20   0     0    0    0 S  0.0  0.0   0:00.01 kthreadd                                                                                                                       
     3 root      RT   0     0    0    0 S  0.0  0.0   0:00.68 migration/0  

Одно из решений — добавить потребителям небольшую задержку, когда потребители не получат данные, они сделают перерыв.500毫秒, что может уменьшить накладные расходы, вызванные блокировкой мьютекса для ЦП.

// 消费者线程函数
void Consumer(int id)
{
    int data = 0;

    do
    {
        // 智能锁,初始化后即加锁,保护的范围是代码花括号内,花括号退出即会自动解锁
        // 可以手动解锁,从而控制互斥锁的细粒度
        std::unique_lock<std::mutex> locker( g_mutex );
        
        // 队列不为空
        if( !g_deque.empty() )
        {
            // 取出队列里最后一个数据
            data = g_deque.back();
            
            // 删除队列里最后一个数据
            g_deque.pop_back();
            
            // 提前解锁,缩小互斥锁的细粒度,只针对共享的队列数据进行同步保护
            locker.unlock();

            std::cout << "消费者[" << id << "] : 我抢到货的编号是 :" << data << std::endl;
        }
        // 队列为空
        else
        {
            locker.unlock();
            // 休息500毫秒
            std::this_thread::sleep_for( std::chrono::milliseconds( 500 ) );
        }
    } while( producer_is_running );
	
	std::cout << "消费者[" << id << "] :卖家没有货打样了,真可惜,下次再来抢!"  << std::endl;
}

Из результатов работы видно, что загрузка процессора значительно снижается.

[root@lincoding ~]# ps aux | grep -v grep  |grep main
USER        PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
root      61296  0.0  0.1 141068  1244 pts/1    Sl+  19:40   0:00 ./main


Переменная условия + код реализации мьютекса

Итак, вопрос в том, как определить, как долго потребители задерживаются (отдыхают)?

  • Если производитель производит очень быстро, потребитель задерживается500毫秒, тоже не очень
  • Если производитель производит медленнее, то потребление задерживается500毫秒, так же будет бесполезная работа, занимающая ЦП

Это требует введения условных переменныхstd::condition_variable, применительно к потребительской производственной модели, то есть после того, как производитель произвел часть данных, он проходитnotify_one()вставайwait()Поток-потребитель, который заставляет потребителя извлекать часть данных из очереди.

#include <iostream> // std::cout
#include <deque>    // std::deque
#include <thread>   // std::thread
#include <chrono>   // std::chrono
#include <mutex>    // std::mutex

#include <condition_variable> // std::condition_variable


// 全局队列
std::deque<int> g_deque;

// 全局锁
std::mutex g_mutex;

// 全局条件变量
std::condition_variable g_cond;

// 生产者运行标记
bool producer_is_running = true;

// 生产者线程函数
void Producer()
{
    // 库存个数
    int count = 8;
    
    do
    {
        // 智能锁,初始化后即加锁,保护的范围是代码花括号内,花括号退出即会自动解锁
        // 可以手动解锁,从而控制互斥锁的细粒度
        std::unique_lock<std::mutex> locker( g_mutex );
        // 入队一个数据
        g_deque.push_front( count );
        // 提前解锁,缩小互斥锁的细粒度,只针对共享的队列数据进行同步保护
        locker.unlock(); 

        std::cout << "生产者    :我现在库存有 :" << count << std::endl;
        
        // 唤醒一个线程
        g_cond.notify_one();
        
        // 睡1秒
        std::this_thread::sleep_for( std::chrono::seconds( 1 ) );

        // 库存自减少
        count--;
    } while( count > 0 );
    
    // 标记生产者打样了
    producer_is_running = false;
    
    // 唤醒所有消费线程
    g_cond.notify_all();
    
    std::cout << "生产者    : 我的库存没有了,我要打样了!"  << std::endl;
}

// 消费者线程函数
void Consumer(int id)
{
// 消费者线程函数
void Consumer(int id)
{
    // 购买的货品编号
    int data = 0;

    do
    {
        // 智能锁,初始化后即加锁,保护的范围是代码花括号内,花括号退出即会自动解锁
        // 可以手动解锁,从而控制互斥锁的细粒度
        std::unique_lock<std::mutex> locker( g_mutex );
        
        // wait()函数会先调用互斥锁的unlock()函数,然后再将自己睡眠,在被唤醒后,又会继续持有锁,保护后面的队列操作
        // 必须使用unique_lock,不能使用lock_guard,因为lock_guard没有lock和unlock接口,而unique_lock则都提供了
        g_cond.wait(locker); 
        
        // 队列不为空
        if( !g_deque.empty() )
        {
            // 取出队列里最后一个数据
            data = g_deque.back();
            
            // 删除队列里最后一个数据
            g_deque.pop_back();
            
            // 提前解锁,缩小互斥锁的细粒度,只针对共享的队列数据进行同步保护
            locker.unlock(); 

            std::cout << "消费者[" << id << "] : 我抢到货的编号是 :" << data << std::endl;
        }
        // 队列为空
        else
        {
            locker.unlock();
        }
    
    } while( producer_is_running );
    
    std::cout << "消费者[" << id << "] :卖家没有货打样了,真可惜,下次再来抢!"  << std::endl;
}
}

int main(void)
{
    std::cout << "1 producer start ..." << std::endl;
    std::thread producer( Producer );

    std::cout << "5 consumer start ..." << std::endl;
    std::thread consumer[ 5 ];
    for(int i = 0; i < 5; i++)
    {
        consumer[i] = std::thread(Consumer, i + 1);
    }

    producer.join();

    for(int i = 0; i < 5; i++)
    {
        consumer[i].join();
    }

    std::cout << "All threads joined." << std::endl;

    return 0;
}

Переменная условия + результат работы мьютекса

[root@lincoding condition]# g++ -std=c++0x -pthread -D_GLIBCXX_USE_NANOSLEEP main.cpp -o  main
[root@lincoding condition]# 
[root@lincoding condition]# ./main 
1 producer start ...
5 consumer start ...
生产者    :我现在库存有 :8
消费者[4] : 我抢到货的编号是 :8
生产者    :我现在库存有 :7
消费者[2] : 我抢到货的编号是 :7
生产者    :我现在库存有 :6
消费者[3] : 我抢到货的编号是 :6
生产者    :我现在库存有 :5
消费者[5] : 我抢到货的编号是 :5
生产者    :我现在库存有 :4
消费者[1] : 我抢到货的编号是 :4
生产者    :我现在库存有 :3
消费者[4] : 我抢到货的编号是 :3
生产者    :我现在库存有 :2
消费者[2] : 我抢到货的编号是 :2
生产者    :我现在库存有 :1
消费者[3] : 我抢到货的编号是 :1
生产者    : 我的库存没有了,我要打样了!
消费者[5] :卖家没有货打样了,真可惜,下次再来抢!
消费者[1] :卖家没有货打样了,真可惜,下次再来抢!
消费者[4] :卖家没有货打样了,真可惜,下次再来抢!
消费者[2] :卖家没有货打样了,真可惜,下次再来抢!
消费者[3] :卖家没有货打样了,真可惜,下次再来抢!
All threads joined.

Нагрузка на ЦП очень мала

[root@lincoding ~]# ps aux | grep -v grep  |grep main
USER        PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
root      73838  0.0  0.1 141068  1256 pts/1    Sl+  19:54   0:00 ./main


Суммировать

В сценарии, когда производственная скорость производителя не уверена, является ли она быстрой или медленной, невозможно использовать мьютекс только для защиты общих данных, что сильно снижает производительность ЦП.Вы можете использовать мьютекс + условие метод переменной при производстве Когда поток-потребитель создает часть данных, он пробуждает поток-потребитель для потребления, избегая некоторых бесполезных накладных расходов на производительность.