برنامه نویسی

GO: چگونه یک استخر کارگر بنویسیم

معرفی

ادغام یک تکنیک مدیریت منابع است که شامل ایجاد و نگهداری مجموعه‌ای از نمونه‌های منابع قابل استفاده مجدد است که امکان تخصیص و تخصیص سریع این منابع را در صورت نیاز فراهم می‌کند. گوروتین ها در زبان برنامه نویسی Go، “رشته های” سبک وزن هستند، اما تعداد زیادی از گوروتین ها همچنان می توانند منابع قابل توجهی را مصرف کنند. یک Worker Pool، با استفاده از تکنیک‌های ادغام، می‌تواند تعداد معینی از گوروتین‌ها را حفظ کند و فقط به آنها اجازه دهد وظایف را اجرا کنند و اتلاف منابع را کاهش دهد.

این مقاله یک رویکرد پیاده‌سازی را برای Worker Pool معرفی می‌کند و کد منبع یک کتابخانه محبوب Worker Pool، gammazero/workerpool را تحلیل می‌کند. در نهایت، ما یک Worker Pool مشابه به نام VIOLIN را اجرا خواهیم کرد تا درک بهتری از Worker Pools به دست آوریم.

چگونه یک استخر کارگر بنویسیم؟

بر اساس معرفی مختصر Worker Pool در بخش قبل، ما آموختیم که پیاده سازی Worker Pool شامل حفظ تعداد معینی از گوروتین ها و اجازه دادن به آنها برای رسیدگی به وظایف / مشاغل ارسال شده است. رویکرد نوشتن یک Worker Pool ساده می شود:

  • محدودیت خاصی برای تعداد گوروتین ها در Worker Pool تعیین کنید. تعداد گوروتین های نگهداری شده توسط Worker Pool نباید از این حد تجاوز کند.
  • هنگامی که یک کار ارسال می شود، یک گوروتین از Worker Pool برای اجرای آن اختصاص دهید.
  • اگر کارگر بیکار (گوروتین) وجود دارد، کار را به کارگر بیکار محول کنید. در غیر این صورت، بررسی کنید که آیا به حد مجاز گوروتین ها رسیده است یا خیر.
  • اگر به حد مجاز رسید، کار را در یک صف کش ذخیره کنید (یا از مکانیزم مدیریت دیگری استفاده کنید). اگر به حد مجاز نرسیده اید، یک کارگر جدید برای اجرای کار ایجاد کنید.

برای درک دقیق تر منطق، لطفاً به نمودار زیر مراجعه کنید. توضیح زمان مصرف وظایف از صف وظایف در قسمت تجزیه و تحلیل کد مورد بحث قرار خواهد گرفت.

توضیحات تصویر

تجزیه و تحلیل کد

اکنون که درک کلی از نحوه کار یک Worker Pool داریم، می‌توانیم شروع به نوشتن کد کنیم. در اینجا، ابتدا یک پیاده‌سازی منبع باز عالی از Worker Pool با 1.1k ستاره در GitHub را بررسی می‌کنیم. پروژه را می توان در اینجا یافت.

ساختار اصلی WorkerPool

بیایید نگاهی به ساختار اصلی این Worker Pool بیندازیم:

type WorkerPool struct {
    maxWorkers   int
    taskQueue    chan func()
    workerQueue  chan func()
    stoppedChan  chan struct{}
    stopSignal   chan struct{}
    waitingQueue deque.Deque[func()]
    stopLock     sync.Mutex
    stopOnce     sync.Once
    stopped      bool
    waiting      int32
    wait         bool
}
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

ما بر روی خواص زیر تمرکز خواهیم کرد:

  • maxWorkers: حداکثر تعداد گوروتین ها (یعنی کارگران) نگهداری شده توسط Worker Pool.
  • taskQueue: صف وظایف که در آن وظایف ارسال و ذخیره می شوند.
  • صف کارگر: صف کارگری که کارگران از آن وظایف را برای اجرا بازیابی می کنند.
  • صف انتظار: صف انتظار، برای ذخیره وظایف زمانی که کارگر در دسترس نیست و تعداد کارگران فعلی به حداکثر حد مجاز رسیده است استفاده می شود.

چند نکته مهم قابل توجه:

  • نوع داده خاص عناصر ذخیره شده در هر سه صف است func()، نشان دهنده نوع وظایف ارسالی است (آن را به عنوان یک تابع در نظر بگیرید).
  • اینجا، دو صف، taskQueue و workerQueue، استفاده می شود. هنگامی که یک کار ارسال می شود، ابتدا به آن اضافه می شود taskQueue. سپس، یک وظیفه از بازیابی می شود taskQueue و به workerQueue برای اعدام توسط کارگر این است که به طور مستقیم توسط کارگران از مصرف نمی شود taskQueue.
  • نوع داده از waitingQueue یک ساختار داده سفارشی است که توسط نویسنده کتابخانه workpool پیاده سازی شده است. شما می توانید آن را به عنوان یک صف عمومی FIFO در نظر بگیرید. هنگامی که بعداً Worker Pool خود را پیاده سازی می کنیم، می توانیم آن را با ساختارهای داده موجود جایگزین کنیم chan، list.List، و غیره.

جدید

در مرحله بعد، بیایید نگاهی به آن بیندازیم New تابع، که تابعی برای ایجاد a است WorkerPool هدف – شی.

func New(maxWorkers int) *WorkerPool {
    // There must be at least one worker.
    if maxWorkers < 1 {
        maxWorkers = 1
    }

    pool := &WorkerPool{
        maxWorkers:  maxWorkers,
        taskQueue:   make(chan func()),
        workerQueue: make(chan func()),
        stopSignal:  make(chan struct{}),
        stoppedChan: make(chan struct{}),
    }

    // Start the task dispatcher.
    go pool.dispatch()

    return pool
}
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

همانطور که می بینیم، ساختار کلی کاملاً واضح است. شرایط اولیه تضمین می کند که WorkerPool حداقل یک گوروتین را حفظ می کند. سپس، شی WorkerPool مقداردهی اولیه می شود. شایان ذکر است که هر دو taskQueue و workerQueue ویژگی ها کانال های بافر نشده هستند. پس از آن، یک گوروتین برای اجرای متد اصلی WorkerPool شروع می شود. pool.dispatch. در نهایت، شی WorkerPool برگردانده می شود.

ارسال

قبل از اینکه به روش اصلی بپردازیم pool.dispatch، بیایید نگاهی به چند روش ساده برای افزایش اعتماد به نفس بیندازیم.

func (p *WorkerPool) Submit(task func()) {
    if task != nil {
        p.taskQueue <- task
    }
}
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

را Submit روش برای ارسال کارها استفاده می شود و می توان با بررسی مستقیم اینکه آیا کار انجام نشده است، فراخوانی کرد nil و سپس آن را به taskQueue. به هر حال، اجازه دهید به طور خلاصه بحث کنیم که اگر عبور کنیم چه اتفاقی می افتد nil به کانال اگرچه کانال می تواند دریافت کند nil بدون هیچ مشکلی، تلاش برای اجرای آن منجر به یک استثنای اشاره گر تهی می شود.

func main() {
    ch := make(chan func())
    go func() {
        ch <- nil
    }()
    res, ok := <-ch
    fmt.Println(res, ok) // <nil> true
    res()                // panic: runtime error: invalid memory address or nil pointer dereference
}
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

ارسال صبر کنید

یک روش دوقلو برای وجود دارد Submit، که مسدود می کند و منتظر می ماند تا کار ارسالی کامل شود.

func (p *WorkerPool) SubmitWait(task func()) {
    if task == nil {
        return
    }
    doneChan := make(chan struct{})
    p.taskQueue <- func() {
        task()
        close(doneChan)
    }
    <-doneChan
}
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

پیاده سازی خاص شامل یک بار بسته بندی وظیفه ارسال شده و سپس استفاده از یک کانال برای انجام این کار است که بسیار هوشمندانه است.

ارسال

روش اصلی WorkerPool شامل گردش کار کامل Worker Pool است که قبلا تحلیل کردیم.

این یک روش بسیار طولانی است. برای سهولت در توضیح و درک، ابتدا روش کامل را ارائه می کنم و سپس تک تک کدها را توضیح می دهم. خوانندگان می توانند پروژه را شبیه سازی کنند و در حین خواندن به کد مراجعه کنند، یا می توانند مستقیماً تحلیل کد زیر را دنبال کنند.

func (p *WorkerPool) dispatch() {
    defer close(p.stoppedChan)
    timeout := time.NewTimer(idleTimeout)
    var workerCount int
    var idle bool
    var wg sync.WaitGroup

Loop:
    for {
        // As long as tasks are in the waiting queue, incoming tasks are put
        // into the waiting queue and tasks to run are taken from the waiting
        // queue. Once the waiting queue is empty, then go back to submitting
        // incoming tasks directly to available workers.
        if p.waitingQueue.Len() != 0 {
            if !p.processWaitingQueue() {
                break Loop
            }
            continue
        }

        select {
        case task, ok := <-p.taskQueue:
            if !ok {
                break Loop
            }
            // Got a task to do.
            select {
            case p.workerQueue <- task:
            default:
                // Create a new worker, if not at max.
                if workerCount < p.maxWorkers {
                    wg.Add(1)
                    go worker(task, p.workerQueue, &wg)
                    workerCount++
                } else {
                    // Enqueue task to be executed by next available worker.
                    p.waitingQueue.PushBack(task)
                    atomic.StoreInt32(&p.waiting, int32(p.waitingQueue.Len()))
                }
            }
            idle = false
        case <-timeout.C:
            // Timed out waiting for work to arrive. Kill a ready worker if
            // pool has been idle for a whole timeout.
            if idle && workerCount > 0 {
                if p.killIdleWorker() {
                    workerCount--
                }
            }
            idle = true
            timeout.Reset(idleTimeout)
        }
    }

    // If instructed to wait, then run tasks that are already queued.
    if p.wait {
        p.runQueuedTasks()
    }

    // Stop all remaining workers as they become ready.
    for workerCount > 0 {
        p.workerQueue <- nil
        workerCount--
    }
    wg.Wait()

    timeout.Stop()
}
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

در واقع، وقتی به کل روش نگاه می کنیم، هنوز کاملاً واضح است و نویسنده با تأمل نظراتی را در شاخه های منطق اصلی اضافه کرده است.

  • بیایید با بررسی قسمت اعلام متغیر در ابتدای این روش شروع کنیم:
  defer close(p.stoppedChan)
  timeout := time.NewTimer(idleTimeout)
  var workerCount int
  var idle bool
  var wg sync.WaitGroup
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

  LOOP:
    for {
          // ... (暂时跳过的部分)

          select {
          case task, ok := <-p.taskQueue:
              if !ok {
                  break Loop
              }
              // Got a task to do.
              select {
              case p.workerQueue <- task:
              default:
                  // Create a new worker, if not at max.
                  if workerCount < p.maxWorkers {
                      wg.Add(1)
                      go worker(task, p.workerQueue, &wg)
                      workerCount++
                  } else {
                      // Enqueue task to be executed by next available worker.
                      p.waitingQueue.PushBack(task)
                      atomic.StoreInt32(&p.waiting, int32(p.waitingQueue.Len()))
                  }
              }
              idle = false
          case <-timeout.C:
              // Timed out waiting for work to arrive. Kill a ready worker if
              // pool has been idle for a whole timeout.
              if idle && workerCount > 0 {
                  if p.killIdleWorker() {
                      workerCount--
                  }
              }
              idle = true
              timeout.Reset(idleTimeout)
          }
      }
  // ...
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

  • اولین select بیانیه در for حلقه دو تا دارد case گزینه ها: یکی برای دریافت یک کار و دیگری برای مهلت زمانی کارگر (هیچ کار جدیدی در عرض دو ثانیه دریافت نمی شود).

  • بیایید با پرونده دریافت وظیفه شروع کنیم. ابتدا بررسی می کند که آیا یک کار واقعی دریافت شده است یا خیر. اگر ok است false، به این معنی است که taskQueue کانال بسته شده است در این حالت از حلقه خارج می شود. دلیل استفاده از برچسب این است که در داخل تودرتو است select داخل for-sect، یک منظم break بیانیه فقط از جریان خارج می شود select مسدود کرده و اجرای کد را در خارج از آن ادامه دهید select، از جمله تکرار بعدی از for حلقه برای خروج از بیرون for حلقه مستقیم، یک برچسب استفاده می شود.

    سپس، دیگری select بیانیه استفاده می شود. اگر کارگر بیکار وجود داشته باشد، وظیفه به آن کارگر محول می شود (p.workerQueue <- task). اگر کارگر بیکار وجود نداشته باشد، بررسی می کند که آیا تعداد فعلی کارگران در حال اجرا به حداکثر حد مجاز رسیده است یا خیر. اگر نه، یک گوروتین (کارگر) جدید شروع می شود، و workerCount و sync.WaitGroup با 1 افزایش می یابند dispatch روش مشکلات همزمانی ندارد، می توان آن را مستقیماً افزایش داد. اگر به حداکثر تعداد کارگران رسیده باشد، وظیفه فعلی در صف ذخیره می شود و تعداد کارهای انتظار ثبت می شود. از آنجا که p.waiting یک منبع مشترک است، عملیات اتمی برای اطمینان از ایمنی همزمان استفاده می شود.

  • مورد مربوط به زمان استراحت کارگر ابتدا بررسی می کند که آیا idle است true و تعداد کارگران بیشتر از 0 باشد. بنابراین در اولین تایم اوت کارگر کشته نمی شود. بجای، idle تنظیم شده است true، و تایمر تنظیم مجدد می شود. شایان ذکر است که p.killIdleWorker روش ممکن است لزوماً یک کارگر را بکشد. سعی می کند این کار را انجام دهد و نتیجه را برمی گرداند. اگر یک کارگر با موفقیت کشته شود، تعداد کارگران یک نفر کاهش می یابد.

    • حالا بیایید قسمتی را که قبلاً از آن صرف نظر کردیم، که ابتدای آن است، نگاه کنیم for حلقه:
  if p.waitingQueue.Len() != 0 {
      if !p.processWaitingQueue() {
          break Loop
      }
      continue
  }
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

این بخش وظیفه مصرف وظایف کش شده را بر عهده دارد. را p.processWaitingQueue روش یا یک کار ذخیره شده را مصرف می کند یا یک کار ورودی را در حافظه پنهان ذخیره می کند. به طور مشابه، بررسی می کند که آیا taskQueue بسته شده است، و اگر چنین باشد، مستقیماً حلقه را می شکند و خارج می شود.

  • قسمت آخر کد پاکسازی خارج از آن است for حلقه:
  // If instructed to wait, then run tasks that are already queued.
  if p.wait {
      p.runQueuedTasks()
  }

  // Stop all remaining workers as they become ready.
  for workerCount > 0 {
      p.workerQueue <- nil
      workerCount--
  }
  wg.Wait()

  timeout.Stop()
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

را p.wait پرچم به عنوان یک نشانگر در طول استفاده می شود Stop عملیات، اما فعلا می توانیم از آن صرف نظر کنیم. کد زیر وظیفه توقف تمامی کارگران و توقف تایمر را بر عهده دارد.

کارگر

بله، WorkerPool با فراخوانی یک کارگر جدید ایجاد می کند go worker(task, p.workerQueue, &wg)، انتقال وظیفه برای اجرا، workerQueue و WaitGroup به عنوان پارامتر. این کار را هوشمندانه اجرا می کند اگر اینطور نباشد nil و سپس برای دریافت یک کار جدید تا زمانی که a nil وظیفه دریافت می شود که به حلقه پایان می دهد. WaitGroup با یک کاهش می یابد و نشان می دهد که کارگر اجرای خود را به پایان رسانده است. این یک پیاده سازی هوشمندانه است و من خودم هم به آن فکر نمی کردم!

// worker executes tasks and stops when it receives a nil task.
func worker(task func(), workerQueue chan func(), wg *sync.WaitGroup) {
    for task != nil {
        task()
        task = <-workerQueue
    }
    wg.Done()
}
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

این به تجزیه و تحلیل کد منبع اصلی WorkerPool پایان می دهد. اکنون، می توانید Worker Pool خود را پیاده سازی کنید.

توسط خودم پیاده سازی شده است

ویولن یک استخر کارگری است که من نوشتم، با الهام از gammazero/workerpool. API و گزینه های غنی تری را ارائه می دهد. بی زحمت بهش ستاره بدی 🙂

شماره کارگر

WorkerNum یک رابط ارائه شده توسط VIOLIN برای بازیابی تعداد فعلی کارگران در حال اجرا است. بجای استفاده از workerCount به عنوان یک متغیر محلی در dispatch روش، VIOLIN تعداد کارگران را به عنوان ویژگی ساختار اصلی حفظ می کند. با این حال، برای اطمینان از ایمنی همزمان، از عملیات اتمی استفاده می شود که ممکن است منجر به سربار عملکرد شود.

type Violin struct {
    options *options

    mu   sync.Mutex
    once sync.Once

    workerNum      uint32
    taskNum        uint32
    waitingTaskNum uint32
    status         uint32

    workerChan   chan func()
    taskChan     chan func()
    waitingChan  chan func()
    dismissChan  chan struct{}
    pauseChan    chan struct{}
    shutdownChan chan struct{}
}
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

الگوی گزینه های کاربردی

Functional Options یک الگوی برنامه نویسی است که من شخصا از استفاده از آن لذت می برم. VIOLIN از الگوی گزینه‌های عملکردی استفاده می‌کند تا سه گزینه پیکربندی را با مقادیر پیش‌فرض مربوطه ارائه دهد، همانطور که در جدول زیر نشان داده شده است:

گزینه پیش فرض شرح
WithMaxWorkers 5 حداکثر تعداد کارگران را تنظیم کنید
WithWaitingQueueSize 64 اندازه صف انتظار را تنظیم کنید
WithWorkerIdleTimeout time.Second * 3 بازه زمانی تخریب شده کارگران بیکار را تنظیم کنید
  • در مقایسه با 2 ثانیه ثابت idleTimeout در Workerpool، VIOLIN به کاربران این امکان را می دهد که آن را مطابق با نیاز خود پیکربندی کنند.
  • علاوه بر این، VIOLIN ساختار داده صف کش وظیفه را با کانال ها جایگزین می کند، بنابراین کاربران باید اندازه کانال را تنظیم کنند یا از مقدار پیش فرض استفاده کنند. در آینده، ممکن است به اجرای صف برگردد و محدودیت اندازه صف کش را از بین ببرد.

وضعیت

ویولن حفظ می کند status زمینه در ساختار اصلی برای تسهیل کاربران و اجزای داخلی در دسترسی به وضعیت فعلی VIOLIN. چهار حالت ممکن وجود دارد:

const (
    _ uint32 = iota
    statusInitialized
    statusPlaying
    statusCleaning
    statusShutdown
)
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

  • statusInitialized: حالت اولیه
  • وضعیت در حال پخش: ویولن به طور عادی اجرا می شود
  • وضعیت تمیز کردن: بعد از تماس ShutdownWait، VIOLIN منتظر است تا کارهای باقی مانده در صف کش وظیفه تکمیل شود.
  • وضعیت خاموش شدن: ویولن خاموش شده است.

کاربران می توانند با استفاده از روش های ارائه شده وضعیت ویولن را تعیین کنند: IsPlaying، IsCleaning، و IsShutdown.

مشکل مواجه شد

مشکل زیر در هنگام نوشتن ویولن با آن مواجه شدم. بسیاری از این مسائل ناشی از پیچیدگی بیش از حد منطق و گم شدن در افکار خودم است که منجر به مشکلات مختلف همزمانی و گردش کار می شود. به عنوان مثال می توان به خطاهای “ارسال در کانال بسته” و مسابقه داده اشاره کرد.

if int(v.WorkerNum()) < v.MaxWorkerNum() {
    _ = atomic.AddUint32(&v.workerNum, 1)
    go v.recruit(wg)
}
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

این منطق برای ایجاد یک کارگر جدید زمانی است که تعداد فعلی کارگران در حال اجرا کمتر از حداکثر مقدار تنظیم شده باشد. در نسخه اولیه کد، کدی که به تدریج افزایش می یابد workerNum از طریق عملیات اتمی در داخل قرار گرفت recruit تابع. این باعث شد که تعداد واقعی کارگران در حال اجرا چندین برابر از حداکثر مقدار بیشتر شود. بعد از مدتی جستجو متوجه شدم که موضوع به دلیل حکم اعدام بوده است.

راه حل این است که بلافاصله پس از بررسی شرطی مقدار را افزایش دهید. در غیر این صورت، چون حلقه سریعتر از شروع یک گوروتین و افزایش مقدار اتمی در داخل recruit تابع، چندین گوروتین را می توان برای یک بررسی شرطی ایجاد کرد.

نتیجه

که تمام محتوای این مقاله را به پایان می رساند. ما با رویکرد ساخت یک Worker Pool شروع کردیم، سپس یک پیاده‌سازی منبع باز یک Worker Pool را بررسی کردیم و در نهایت Worker Pool خودمان را پیاده‌سازی کردیم. امیدوارم این مطلب در درک و استفاده شما از Worker Pools مفید بوده باشد. در صورت وجود هر گونه اشتباه یا سوال، لطفاً نظرات خود را بیان کنید یا از طریق پیام خصوصی در ارتباط باشید.

فهرست مرجع

نوشته های مشابه

دیدگاهتان را بنویسید

نشانی ایمیل شما منتشر نخواهد شد. بخش‌های موردنیاز علامت‌گذاری شده‌اند *

دکمه بازگشت به بالا