GO: چگونه یک استخر کارگر بنویسیم

معرفی
ادغام یک تکنیک مدیریت منابع است که شامل ایجاد و نگهداری مجموعهای از نمونههای منابع قابل استفاده مجدد است که امکان تخصیص و تخصیص سریع این منابع را در صورت نیاز فراهم میکند. گوروتین ها در زبان برنامه نویسی Go، “رشته های” سبک وزن هستند، اما تعداد زیادی از گوروتین ها همچنان می توانند منابع قابل توجهی را مصرف کنند. یک Worker Pool، با استفاده از تکنیکهای ادغام، میتواند تعداد معینی از گوروتینها را حفظ کند و فقط به آنها اجازه دهد وظایف را اجرا کنند و اتلاف منابع را کاهش دهد.
این مقاله یک رویکرد پیادهسازی را برای Worker Pool معرفی میکند و کد منبع یک کتابخانه محبوب Worker Pool، gammazero/workerpool را تحلیل میکند. در نهایت، ما یک Worker Pool مشابه به نام VIOLIN را اجرا خواهیم کرد تا درک بهتری از Worker Pools به دست آوریم.
چگونه یک استخر کارگر بنویسیم؟
بر اساس معرفی مختصر Worker Pool در بخش قبل، ما آموختیم که پیاده سازی Worker Pool شامل حفظ تعداد معینی از گوروتین ها و اجازه دادن به آنها برای رسیدگی به وظایف / مشاغل ارسال شده است. رویکرد نوشتن یک Worker Pool ساده می شود:
- محدودیت خاصی برای تعداد گوروتین ها در Worker Pool تعیین کنید. تعداد گوروتین های نگهداری شده توسط Worker Pool نباید از این حد تجاوز کند.
- هنگامی که یک کار ارسال می شود، یک گوروتین از Worker Pool برای اجرای آن اختصاص دهید.
- اگر کارگر بیکار (گوروتین) وجود دارد، کار را به کارگر بیکار محول کنید. در غیر این صورت، بررسی کنید که آیا به حد مجاز گوروتین ها رسیده است یا خیر.
- اگر به حد مجاز رسید، کار را در یک صف کش ذخیره کنید (یا از مکانیزم مدیریت دیگری استفاده کنید). اگر به حد مجاز نرسیده اید، یک کارگر جدید برای اجرای کار ایجاد کنید.
برای درک دقیق تر منطق، لطفاً به نمودار زیر مراجعه کنید. توضیح زمان مصرف وظایف از صف وظایف در قسمت تجزیه و تحلیل کد مورد بحث قرار خواهد گرفت.
تجزیه و تحلیل کد
اکنون که درک کلی از نحوه کار یک Worker Pool داریم، میتوانیم شروع به نوشتن کد کنیم. در اینجا، ابتدا یک پیادهسازی منبع باز عالی از Worker Pool با 1.1k ستاره در GitHub را بررسی میکنیم. پروژه را می توان در اینجا یافت.
ساختار اصلی WorkerPool
بیایید نگاهی به ساختار اصلی این Worker Pool بیندازیم:
type WorkerPool struct {
maxWorkers int
taskQueue chan func()
workerQueue chan func()
stoppedChan chan struct{}
stopSignal chan struct{}
waitingQueue deque.Deque[func()]
stopLock sync.Mutex
stopOnce sync.Once
stopped bool
waiting int32
wait bool
}
ما بر روی خواص زیر تمرکز خواهیم کرد:
- maxWorkers: حداکثر تعداد گوروتین ها (یعنی کارگران) نگهداری شده توسط Worker Pool.
- taskQueue: صف وظایف که در آن وظایف ارسال و ذخیره می شوند.
- صف کارگر: صف کارگری که کارگران از آن وظایف را برای اجرا بازیابی می کنند.
- صف انتظار: صف انتظار، برای ذخیره وظایف زمانی که کارگر در دسترس نیست و تعداد کارگران فعلی به حداکثر حد مجاز رسیده است استفاده می شود.
چند نکته مهم قابل توجه:
- نوع داده خاص عناصر ذخیره شده در هر سه صف است
func()
، نشان دهنده نوع وظایف ارسالی است (آن را به عنوان یک تابع در نظر بگیرید). - اینجا، دو صف،
taskQueue
وworkerQueue
، استفاده می شود. هنگامی که یک کار ارسال می شود، ابتدا به آن اضافه می شودtaskQueue
. سپس، یک وظیفه از بازیابی می شودtaskQueue
و بهworkerQueue
برای اعدام توسط کارگر این است که به طور مستقیم توسط کارگران از مصرف نمی شودtaskQueue
. - نوع داده از
waitingQueue
یک ساختار داده سفارشی است که توسط نویسنده کتابخانه workpool پیاده سازی شده است. شما می توانید آن را به عنوان یک صف عمومی FIFO در نظر بگیرید. هنگامی که بعداً Worker Pool خود را پیاده سازی می کنیم، می توانیم آن را با ساختارهای داده موجود جایگزین کنیمchan
،list.List
، و غیره.
جدید
در مرحله بعد، بیایید نگاهی به آن بیندازیم New
تابع، که تابعی برای ایجاد a است WorkerPool
هدف – شی.
func New(maxWorkers int) *WorkerPool {
// There must be at least one worker.
if maxWorkers < 1 {
maxWorkers = 1
}
pool := &WorkerPool{
maxWorkers: maxWorkers,
taskQueue: make(chan func()),
workerQueue: make(chan func()),
stopSignal: make(chan struct{}),
stoppedChan: make(chan struct{}),
}
// Start the task dispatcher.
go pool.dispatch()
return pool
}
همانطور که می بینیم، ساختار کلی کاملاً واضح است. شرایط اولیه تضمین می کند که WorkerPool حداقل یک گوروتین را حفظ می کند. سپس، شی WorkerPool مقداردهی اولیه می شود. شایان ذکر است که هر دو taskQueue
و workerQueue
ویژگی ها کانال های بافر نشده هستند. پس از آن، یک گوروتین برای اجرای متد اصلی WorkerPool شروع می شود. pool.dispatch
. در نهایت، شی WorkerPool برگردانده می شود.
ارسال
قبل از اینکه به روش اصلی بپردازیم pool.dispatch
، بیایید نگاهی به چند روش ساده برای افزایش اعتماد به نفس بیندازیم.
func (p *WorkerPool) Submit(task func()) {
if task != nil {
p.taskQueue <- task
}
}
را Submit
روش برای ارسال کارها استفاده می شود و می توان با بررسی مستقیم اینکه آیا کار انجام نشده است، فراخوانی کرد nil
و سپس آن را به taskQueue
. به هر حال، اجازه دهید به طور خلاصه بحث کنیم که اگر عبور کنیم چه اتفاقی می افتد nil
به کانال اگرچه کانال می تواند دریافت کند nil
بدون هیچ مشکلی، تلاش برای اجرای آن منجر به یک استثنای اشاره گر تهی می شود.
func main() {
ch := make(chan func())
go func() {
ch <- nil
}()
res, ok := <-ch
fmt.Println(res, ok) // <nil> true
res() // panic: runtime error: invalid memory address or nil pointer dereference
}
ارسال صبر کنید
یک روش دوقلو برای وجود دارد Submit
، که مسدود می کند و منتظر می ماند تا کار ارسالی کامل شود.
func (p *WorkerPool) SubmitWait(task func()) {
if task == nil {
return
}
doneChan := make(chan struct{})
p.taskQueue <- func() {
task()
close(doneChan)
}
<-doneChan
}
پیاده سازی خاص شامل یک بار بسته بندی وظیفه ارسال شده و سپس استفاده از یک کانال برای انجام این کار است که بسیار هوشمندانه است.
ارسال
روش اصلی WorkerPool شامل گردش کار کامل Worker Pool است که قبلا تحلیل کردیم.
این یک روش بسیار طولانی است. برای سهولت در توضیح و درک، ابتدا روش کامل را ارائه می کنم و سپس تک تک کدها را توضیح می دهم. خوانندگان می توانند پروژه را شبیه سازی کنند و در حین خواندن به کد مراجعه کنند، یا می توانند مستقیماً تحلیل کد زیر را دنبال کنند.
func (p *WorkerPool) dispatch() {
defer close(p.stoppedChan)
timeout := time.NewTimer(idleTimeout)
var workerCount int
var idle bool
var wg sync.WaitGroup
Loop:
for {
// As long as tasks are in the waiting queue, incoming tasks are put
// into the waiting queue and tasks to run are taken from the waiting
// queue. Once the waiting queue is empty, then go back to submitting
// incoming tasks directly to available workers.
if p.waitingQueue.Len() != 0 {
if !p.processWaitingQueue() {
break Loop
}
continue
}
select {
case task, ok := <-p.taskQueue:
if !ok {
break Loop
}
// Got a task to do.
select {
case p.workerQueue <- task:
default:
// Create a new worker, if not at max.
if workerCount < p.maxWorkers {
wg.Add(1)
go worker(task, p.workerQueue, &wg)
workerCount++
} else {
// Enqueue task to be executed by next available worker.
p.waitingQueue.PushBack(task)
atomic.StoreInt32(&p.waiting, int32(p.waitingQueue.Len()))
}
}
idle = false
case <-timeout.C:
// Timed out waiting for work to arrive. Kill a ready worker if
// pool has been idle for a whole timeout.
if idle && workerCount > 0 {
if p.killIdleWorker() {
workerCount--
}
}
idle = true
timeout.Reset(idleTimeout)
}
}
// If instructed to wait, then run tasks that are already queued.
if p.wait {
p.runQueuedTasks()
}
// Stop all remaining workers as they become ready.
for workerCount > 0 {
p.workerQueue <- nil
workerCount--
}
wg.Wait()
timeout.Stop()
}
در واقع، وقتی به کل روش نگاه می کنیم، هنوز کاملاً واضح است و نویسنده با تأمل نظراتی را در شاخه های منطق اصلی اضافه کرده است.
- بیایید با بررسی قسمت اعلام متغیر در ابتدای این روش شروع کنیم:
defer close(p.stoppedChan)
timeout := time.NewTimer(idleTimeout)
var workerCount int
var idle bool
var wg sync.WaitGroup
LOOP:
for {
// ... (暂时跳过的部分)
select {
case task, ok := <-p.taskQueue:
if !ok {
break Loop
}
// Got a task to do.
select {
case p.workerQueue <- task:
default:
// Create a new worker, if not at max.
if workerCount < p.maxWorkers {
wg.Add(1)
go worker(task, p.workerQueue, &wg)
workerCount++
} else {
// Enqueue task to be executed by next available worker.
p.waitingQueue.PushBack(task)
atomic.StoreInt32(&p.waiting, int32(p.waitingQueue.Len()))
}
}
idle = false
case <-timeout.C:
// Timed out waiting for work to arrive. Kill a ready worker if
// pool has been idle for a whole timeout.
if idle && workerCount > 0 {
if p.killIdleWorker() {
workerCount--
}
}
idle = true
timeout.Reset(idleTimeout)
}
}
// ...
-
اولین
select
بیانیه درfor
حلقه دو تا داردcase
گزینه ها: یکی برای دریافت یک کار و دیگری برای مهلت زمانی کارگر (هیچ کار جدیدی در عرض دو ثانیه دریافت نمی شود). -
بیایید با پرونده دریافت وظیفه شروع کنیم. ابتدا بررسی می کند که آیا یک کار واقعی دریافت شده است یا خیر. اگر
ok
استfalse
، به این معنی است کهtaskQueue
کانال بسته شده است در این حالت از حلقه خارج می شود. دلیل استفاده از برچسب این است که در داخل تودرتو استselect
داخل for-sect، یک منظمbreak
بیانیه فقط از جریان خارج می شودselect
مسدود کرده و اجرای کد را در خارج از آن ادامه دهیدselect
، از جمله تکرار بعدی ازfor
حلقه برای خروج از بیرونfor
حلقه مستقیم، یک برچسب استفاده می شود.سپس، دیگری
select
بیانیه استفاده می شود. اگر کارگر بیکار وجود داشته باشد، وظیفه به آن کارگر محول می شود (p.workerQueue <- task
). اگر کارگر بیکار وجود نداشته باشد، بررسی می کند که آیا تعداد فعلی کارگران در حال اجرا به حداکثر حد مجاز رسیده است یا خیر. اگر نه، یک گوروتین (کارگر) جدید شروع می شود، وworkerCount
وsync.WaitGroup
با 1 افزایش می یابندdispatch
روش مشکلات همزمانی ندارد، می توان آن را مستقیماً افزایش داد. اگر به حداکثر تعداد کارگران رسیده باشد، وظیفه فعلی در صف ذخیره می شود و تعداد کارهای انتظار ثبت می شود. از آنجا کهp.waiting
یک منبع مشترک است، عملیات اتمی برای اطمینان از ایمنی همزمان استفاده می شود. -
مورد مربوط به زمان استراحت کارگر ابتدا بررسی می کند که آیا
idle
استtrue
و تعداد کارگران بیشتر از 0 باشد. بنابراین در اولین تایم اوت کارگر کشته نمی شود. بجای،idle
تنظیم شده استtrue
، و تایمر تنظیم مجدد می شود. شایان ذکر است کهp.killIdleWorker
روش ممکن است لزوماً یک کارگر را بکشد. سعی می کند این کار را انجام دهد و نتیجه را برمی گرداند. اگر یک کارگر با موفقیت کشته شود، تعداد کارگران یک نفر کاهش می یابد.- حالا بیایید قسمتی را که قبلاً از آن صرف نظر کردیم، که ابتدای آن است، نگاه کنیم
for
حلقه:
- حالا بیایید قسمتی را که قبلاً از آن صرف نظر کردیم، که ابتدای آن است، نگاه کنیم
if p.waitingQueue.Len() != 0 {
if !p.processWaitingQueue() {
break Loop
}
continue
}
این بخش وظیفه مصرف وظایف کش شده را بر عهده دارد. را p.processWaitingQueue
روش یا یک کار ذخیره شده را مصرف می کند یا یک کار ورودی را در حافظه پنهان ذخیره می کند. به طور مشابه، بررسی می کند که آیا taskQueue
بسته شده است، و اگر چنین باشد، مستقیماً حلقه را می شکند و خارج می شود.
- قسمت آخر کد پاکسازی خارج از آن است
for
حلقه:
// If instructed to wait, then run tasks that are already queued.
if p.wait {
p.runQueuedTasks()
}
// Stop all remaining workers as they become ready.
for workerCount > 0 {
p.workerQueue <- nil
workerCount--
}
wg.Wait()
timeout.Stop()
را p.wait
پرچم به عنوان یک نشانگر در طول استفاده می شود Stop
عملیات، اما فعلا می توانیم از آن صرف نظر کنیم. کد زیر وظیفه توقف تمامی کارگران و توقف تایمر را بر عهده دارد.
کارگر
بله، WorkerPool با فراخوانی یک کارگر جدید ایجاد می کند go worker(task, p.workerQueue, &wg)
، انتقال وظیفه برای اجرا، workerQueue و WaitGroup به عنوان پارامتر. این کار را هوشمندانه اجرا می کند اگر اینطور نباشد nil
و سپس برای دریافت یک کار جدید تا زمانی که a nil
وظیفه دریافت می شود که به حلقه پایان می دهد. WaitGroup با یک کاهش می یابد و نشان می دهد که کارگر اجرای خود را به پایان رسانده است. این یک پیاده سازی هوشمندانه است و من خودم هم به آن فکر نمی کردم!
// worker executes tasks and stops when it receives a nil task.
func worker(task func(), workerQueue chan func(), wg *sync.WaitGroup) {
for task != nil {
task()
task = <-workerQueue
}
wg.Done()
}
این به تجزیه و تحلیل کد منبع اصلی WorkerPool پایان می دهد. اکنون، می توانید Worker Pool خود را پیاده سازی کنید.
توسط خودم پیاده سازی شده است
ویولن یک استخر کارگری است که من نوشتم، با الهام از gammazero/workerpool. API و گزینه های غنی تری را ارائه می دهد. بی زحمت بهش ستاره بدی 🙂
شماره کارگر
WorkerNum
یک رابط ارائه شده توسط VIOLIN برای بازیابی تعداد فعلی کارگران در حال اجرا است. بجای استفاده از workerCount
به عنوان یک متغیر محلی در dispatch
روش، VIOLIN تعداد کارگران را به عنوان ویژگی ساختار اصلی حفظ می کند. با این حال، برای اطمینان از ایمنی همزمان، از عملیات اتمی استفاده می شود که ممکن است منجر به سربار عملکرد شود.
type Violin struct {
options *options
mu sync.Mutex
once sync.Once
workerNum uint32
taskNum uint32
waitingTaskNum uint32
status uint32
workerChan chan func()
taskChan chan func()
waitingChan chan func()
dismissChan chan struct{}
pauseChan chan struct{}
shutdownChan chan struct{}
}
الگوی گزینه های کاربردی
Functional Options یک الگوی برنامه نویسی است که من شخصا از استفاده از آن لذت می برم. VIOLIN از الگوی گزینههای عملکردی استفاده میکند تا سه گزینه پیکربندی را با مقادیر پیشفرض مربوطه ارائه دهد، همانطور که در جدول زیر نشان داده شده است:
گزینه | پیش فرض | شرح |
---|---|---|
WithMaxWorkers |
5 |
حداکثر تعداد کارگران را تنظیم کنید |
WithWaitingQueueSize |
64 |
اندازه صف انتظار را تنظیم کنید |
WithWorkerIdleTimeout |
time.Second * 3 |
بازه زمانی تخریب شده کارگران بیکار را تنظیم کنید |
- در مقایسه با 2 ثانیه ثابت
idleTimeout
در Workerpool، VIOLIN به کاربران این امکان را می دهد که آن را مطابق با نیاز خود پیکربندی کنند. - علاوه بر این، VIOLIN ساختار داده صف کش وظیفه را با کانال ها جایگزین می کند، بنابراین کاربران باید اندازه کانال را تنظیم کنند یا از مقدار پیش فرض استفاده کنند. در آینده، ممکن است به اجرای صف برگردد و محدودیت اندازه صف کش را از بین ببرد.
وضعیت
ویولن حفظ می کند status
زمینه در ساختار اصلی برای تسهیل کاربران و اجزای داخلی در دسترسی به وضعیت فعلی VIOLIN. چهار حالت ممکن وجود دارد:
const (
_ uint32 = iota
statusInitialized
statusPlaying
statusCleaning
statusShutdown
)
- statusInitialized: حالت اولیه
- وضعیت در حال پخش: ویولن به طور عادی اجرا می شود
-
وضعیت تمیز کردن: بعد از تماس
ShutdownWait
، VIOLIN منتظر است تا کارهای باقی مانده در صف کش وظیفه تکمیل شود. - وضعیت خاموش شدن: ویولن خاموش شده است.
کاربران می توانند با استفاده از روش های ارائه شده وضعیت ویولن را تعیین کنند: IsPlaying
، IsCleaning
، و IsShutdown
.
مشکل مواجه شد
مشکل زیر در هنگام نوشتن ویولن با آن مواجه شدم. بسیاری از این مسائل ناشی از پیچیدگی بیش از حد منطق و گم شدن در افکار خودم است که منجر به مشکلات مختلف همزمانی و گردش کار می شود. به عنوان مثال می توان به خطاهای “ارسال در کانال بسته” و مسابقه داده اشاره کرد.
if int(v.WorkerNum()) < v.MaxWorkerNum() {
_ = atomic.AddUint32(&v.workerNum, 1)
go v.recruit(wg)
}
این منطق برای ایجاد یک کارگر جدید زمانی است که تعداد فعلی کارگران در حال اجرا کمتر از حداکثر مقدار تنظیم شده باشد. در نسخه اولیه کد، کدی که به تدریج افزایش می یابد workerNum
از طریق عملیات اتمی در داخل قرار گرفت recruit
تابع. این باعث شد که تعداد واقعی کارگران در حال اجرا چندین برابر از حداکثر مقدار بیشتر شود. بعد از مدتی جستجو متوجه شدم که موضوع به دلیل حکم اعدام بوده است.
راه حل این است که بلافاصله پس از بررسی شرطی مقدار را افزایش دهید. در غیر این صورت، چون حلقه سریعتر از شروع یک گوروتین و افزایش مقدار اتمی در داخل recruit
تابع، چندین گوروتین را می توان برای یک بررسی شرطی ایجاد کرد.
نتیجه
که تمام محتوای این مقاله را به پایان می رساند. ما با رویکرد ساخت یک Worker Pool شروع کردیم، سپس یک پیادهسازی منبع باز یک Worker Pool را بررسی کردیم و در نهایت Worker Pool خودمان را پیادهسازی کردیم. امیدوارم این مطلب در درک و استفاده شما از Worker Pools مفید بوده باشد. در صورت وجود هر گونه اشتباه یا سوال، لطفاً نظرات خود را بیان کنید یا از طریق پیام خصوصی در ارتباط باشید.
فهرست مرجع