Давайте поговорим о синхронизации Golang. Пул от простого к сложному

Go

предисловие

Сегодня я думал о рутинной оптимизации GC, и я увидел sync.Pool, поэтому я подытожу его и надеюсь сделать перерыв.

Простыми словами, ясно объясните знания. Следующие очки знаний приходят через 10 секунд.

1. Что такое пул? 2. Зачем нужен sync.Pool? 3. Как пользоваться sync.Pool? 4. Возьмите волну исходного кода 5. Анализ ключевых моментов исходного кода

текст

1. Что такое sync.Pool?

В Golang версии 1.3 в пакет синхронизации была добавлена ​​новая функция: пул. Проще говоря: авременный пул объектов.

2. Зачем нужен sync.Pool?

Сохраняйте и повторно используйте временные объекты, сократите выделение памяти и уменьшите нагрузку на сборщик мусора.

(Чем больше объектов, тем медленнее GC, потому что, когда Golang выполняет переработку трехцветных меток, чем больше меток нужно пометить, тем медленнее, естественно)

3. Как пользоваться sync.Pool?

func main() {
	// 初始化一个pool
	pool := &sync.Pool{
		// 默认的返回值设置,不写这个参数,默认是nil
		New: func() interface{} {
			return 0
		},
	}

	// 看一下初始的值,这里是返回0,如果不设置New函数,默认返回nil
	init := pool.Get()
	fmt.Println(init)

	// 设置一个参数1
	pool.Put(1)

	// 获取查看结果
	num := pool.Get()
	fmt.Println(num)

	// 再次获取,会发现,已经是空的了,只能返回默认的值。
	num = pool.Get()
	fmt.Println(num)
}

Это проще в использовании. Общая идея такова:Создайте пул, заранее поместите временно сгенерированные объекты, а затем вынесите их для использования.

Некоторые одноклассники, возможно, спросили, эта штука официально выпущена, значит, он сам ею пользуется? ответнемного, на самом деле, вы использовали его.

Это пакет fmt.Поскольку для fmt всегда требуется много объектов []byte, мы просто создали пул объектов []byte для обхода волны кода.

type buffer []byte
// printer状态的结构体()
type pp struct {
    ...
}

// pp的对象池, 《====这里用到了。
var ppFree = sync.Pool{
    New: func() interface{} { return new(pp) },
}

// 每次需要pp结构体的时候,都过sync.Pool进行获取。
func newPrinter() *pp {
    p := ppFree.Get().(*pp)
    p.panicking = false
    p.erroring = false
    p.fmt.init(&p.buf)
    return p
}

4. Возьмите волну исходного кода

4.1 Базовая структура данных

type Pool struct {
	// noCopy,防止当前类型被copy,是一个有意思的字段,后文详说。
	noCopy noCopy

    // [P]poolLocal 数组指针
	local     unsafe.Pointer
	// 数组大小
	localSize uintptr        

	// 选填的自定义函数,缓冲池无数据的时候会调用,不设置默认返回nil
	New func() interface{} //新建对象函数
}

type poolLocalInternal struct {
    // 私有缓存区
	private interface{}   
	// 公共缓存区
	shared  []interface{} 
	// 锁
	Mutex               
}

type poolLocal struct {
	// 每个P对应的pool
	poolLocalInternal

	// 这个字段很有意思,是为了防止“false sharing/伪共享”,后文详讲。
	pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}

Подойдите к панораме, лучше увидеть эту структуру с глобальной точки зрения:

在这里插入图片描述
Здесь есть две небольшие проблемы:

  1. Что делает noCopy?
  2. Роль пэда в poolLocal?
  3. Как определить, в каком poolLocal находятся данные, которые нужно получить?

С вопросами продолжайте читать дальше, и вы сможете понять эти две небольшие проблемы после их прочтения.

4.2 pin

Перед введением Get / Put, ключевой базовый функциональный PIN-код должен быть понят сначала. Одно предложение объясняет его полезность:Определите объект localPool, связанный с текущим P(Здесь P означает P в MPG, если вы не понимаете, нажмите здесь:Некоторые небольшие сведения о горутинах)

func (p *Pool) pin() *poolLocal {
	// 返回当前 P.id && 设置禁止抢占(避免GC)
	pid := runtime_procPin()
	
	// 根据locaSize来获取当前指针偏移的位置
	s := atomic.LoadUintptr(&p.localSize) 
	l := p.local         
	
	// 有可能在运行中动调调整P,所以这里进行需要判断是否越界
	if uintptr(pid) < s {
	    // 没越界,直接返回
		return indexLocal(l, pid)
	}
	
    // 越界时,会涉及全局加锁,重新分配poolLocal,添加到全局列表
	return p.pinSlow()
}

var (
	allPoolsMu Mutex
	allPools   []*Pool
)


func (p *Pool) pinSlow() *poolLocal {
	// 取消P的禁止抢占(因为后面要进行metux加锁)
	runtime_procUnpin()
	
	// 加锁
	allPoolsMu.Lock()
	defer allPoolsMu.Unlock()
	
	// 返回当前 P.id && 设置禁止抢占(避免GC)
	pid := runtime_procPin()
	
	// 再次检查是否符合条件,有可能中途已被其他线程调用
	s := p.localSize
	l := p.local
	if uintptr(pid) < s {
		return indexLocal(l, pid)
	}
	
	// 如果数组为空,则新建Pool,将其添加到 allPools,GC以此获取所有 Pool 实例
	if p.local == nil {
		allPools = append(allPools, p)
	}
	
    // 根据 P 数量创建 slice
	size := runtime.GOMAXPROCS(0)
	local := make([]poolLocal, size)
	
	 // 将底层数组起始指针保存到 Pool.local,并设置 P.localSize
	 // 这里需要关注的是:如果GOMAXPROCS在GC间发生变化,则会重新分配的时候,直接丢弃老的,等待GC回收。
	atomic.StorePointer(&p.local, unsafe.Pointer(&local[0]))
	atomic.StoreUintptr(&p.localSize, uintptr(size))         
	
	// 返回本次所需的 poolLocal
	return &local[pid]
}

// 根据数据结构的大小来计算指针的偏移量
func indexLocal(l unsafe.Pointer, i int) *poolLocal {
	lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{}))
	return (*poolLocal)(lp)
}

Примечания к процессу:

  禁止抢占GC -> 寻找偏移量 -> 检查越界 ->返回poolLocal
                                   ->加锁重建pool,并添加到allPool

4.3 put

Вывод первый:Сначала поместите в личное пространство, а затем поместите его в общее пространствоТеперь приступайте к анализу:

func (p *Pool) Put(x interface{}) {
	if x == nil {
		return
	}
    
    // 这段代码,不需要关心,降低竞争的
	if race.Enabled {
		if fastrand()%4 == 0 {
			// Randomly drop x on floor.
			return
		}
		race.ReleaseMerge(poolRaceAddr(x))
		race.Disable()
	}

    // 获取当前的poolLocal
	l := p.pin()

    // 如果private为nil,则优先进行设置,并标记x
	if l.private == nil {
		l.private = x
		x = nil
	}
	runtime_procUnpin()

    // 如果标记x不为nil,则将x设置到shared中
	if x != nil {
		l.Lock()
		l.shared = append(l.shared, x)
		l.Unlock()
	}
    
    // 设置竞争可用了。
	if race.Enabled {
		race.Enable()
	}
}

4.4 get

Вывод первый:Приоритет берется из приватного пространства, а затем блокировка берется из общего пространства, если она не была взята из общего пространства других PoolLocals, она будет возвращена сразу с новой.Теперь для анализа:

func (p *Pool) Get() interface{} {
    // 竞争相关的设置
	if race.Enabled {
		race.Disable()
	}
    
    // 获取当前的poolLocal
	l := p.pin()

    // 从private中获取
	x := l.private
	l.private = nil
	runtime_procUnpin()

    // 不存在,则继续从shared空间拿,
	if x == nil {
	    // 加锁了,防止并发 
		l.Lock()
		last := len(l.shared) - 1
		if last >= 0 {
			x = l.shared[last]
            // 从尾巴开始拿起
			l.shared = l.shared[:last]
		}
		l.Unlock()
		if x == nil {
		    // 从其他的poolLocal中的shared空间看看有没有可返回的。
			x = p.getSlow()
		}
	}
    
    // 竞争解除
	if race.Enabled {
		race.Enable()
		if x != nil {
			race.Acquire(poolRaceAddr(x))
		}
	}
    
    // 如果还是没有的话,就直接new一个了
	if x == nil && p.New != nil {
		x = p.New()
	}
	return x
}

func (p *Pool) getSlow() (x interface{}) {
    // 获取poolLocal数组的大小
	size := atomic.LoadUintptr(&p.localSize) // load-acquire
	local := p.local                         // load-consume
	
	// 尝试从其他procs获取一个P对象
	pid := runtime_procPin()
	runtime_procUnpin()
	
	for i := 0; i < int(size); i++ {
        // 获取一个poolLocal,注意这里是从当前的local的位置开始获取的,目的是防止取到自身
		l := indexLocal(local, (pid+i+1)%int(size))
		// 加锁从尾部获取shared的数据
		l.Lock()
		last := len(l.shared) - 1
        // 若长度大于1
		if last >= 0 {
			x = l.shared[last]
			l.shared = l.shared[:last]
			l.Unlock()
			break
		}
		l.Unlock()
	}
	return x
}

5. Анализ ключевых моментов исходного кода

5.1 Регулярная очистка

В: Сохраняются ли здесь пулы навсегда? все еще? О: Он будет очищен.Время — это время между двумя GC.

// 注册清理函数,随着runtime进行的,也就是每次GC都会跑一下
func init() {
	runtime_registerPoolCleanup(poolCleanup)
}

// 清理函数也很粗暴,直接遍历全局维护的allPools将private和shared置为nil
func poolCleanup() {
    // 遍历allPools
	for i, p := range allPools {
	    // pool置为nil
		allPools[i] = nil
  
        // 遍历localSIze的数量次
		for i := 0; i < int(p.localSize); i++ {
			l := indexLocal(p.local, i)
            // private置为nil
			l.private = nil
            
            // 遍历shared,都置为nil
			for j := range l.shared {
				l.shared[j] = nil
			}
			l.shared = nil
		}
		p.local = nil
		p.localSize = 0
	}
 
    // allPools重置
	allPools = []*Pool{}
}

Так,Это также объясняет, почему sync.Pool не подходит для постоянных данных, таких как «пул соединений с базой данных», потому что он будет регулярно перерабатываться ~

5.2 Почему вам нужно блокировать, когда вы приобретаете общий, а не частный?

Мы знаем, что golang работает по принципу MPG, (Некоторые небольшие сведения о горутинах)

Это может выглядеть так:

M------P----- poolLocal    
       |        
       G - G
           |
           G
          ...
M------P----- poolLocal  
       |
       G---G
           |
           G
          ...

Другими словами, каждому P назначается локальный пул, и только одна Gouroutine работает под одним и тем же P, поэтому частный здесь может быть получен только одной Gouroutine одновременно.

Общий доступ отличается. Он может быть получен другими P и может быть получен только несколькими Gouroutines одновременно. Чтобы обеспечить конкуренцию данных, необходимо добавить блокировку, чтобы гарантировать, что только один G заберет его.

5.3 Какова роль noCopy?

Предотвратите копирование пула, потому что пул уникален во всей игре в Голанге.

Вот еще вопрос, как здесь noCopy предотвращает копирование? ? ?

Нативного способа запретить копирование в Golang нет, поэтому структура копироваться не хочет, поэтому автор go сделал такую ​​условность:Если он содержит структуру noCopy, которая реализует интерфейс sync.Locker, go vet может помочь нам проверить, был ли он скопирован..

5.4 Какова роль пэда?

Это довольно интересно, в исходниках есть такое слово:false sharing, что переводится как «псевдообмен». То есть это поле в основном используется дляПредотвращение «ложного обмена».

Почему существует ложный обмен?

Кратко поясню: система кеширования основана настрока кэшахранится в единицах. Строка кэша обычно 64 байта, когда строка кэша загружается 1 байтом, остальные 63 тоже будут загружены,Блокировка также заблокирует всю строку кэша., когда переменные x и y находятся в одной строке кэша, как показано на рисунке ниже, когда X заблокирован, другой независимый поток просто хочет оперировать Y, и теперь Y должен ждать X, в это время это не может быть одновременный .

Поскольку конфликт соперничества здесь возникает из-за совместного использования, он называется ложным совместным использованием.

在这里插入图片描述
(картинка с https://www.cnblogs.com/cyfonly/p/5800758.html)

Как это предотвратить?

заполнить строку кэша, чтобы каждые данные были отдельной строкой кеша, ложного шардирования не будет.

5.5 Как определить, в какой ячейке массива LocalPool должны храниться мои данные?

Вычислить смещение указателя в соответствии с размером структуры данных, а затем вычислить, какой из массивов LocalPool является.

5.6 Философия дизайна sync.Pool?

Количество горутин, которые можно распараллелить одновременно, ограничено, что задается параметром runtime.GOMAXPROCS(0). Здесь пул привязывает данные к P и распределяет их в каждом действительно параллельном потоке. Каждый поток получает приоритет от своего собственного Получение данных из poolLocal значительно снижает конкуренцию блокировок.

在这里插入图片描述