برنامه نویسی

قلب تپنده SQS – از Heartbeats و Watchdogs

استفاده از SQS به عنوان یک صف برای بافر وظایف احتمالاً رایج ترین مورد استفاده برای این سرویس است. در صورتی که این وظایف دامنه وسیعی از مدت زمان پردازش داشته باشند، همه چیز می تواند مشکل ساز شود. امروز، من به شما نشان خواهم داد که چگونه یک مصرف کننده SQS را پیاده سازی کنید که از ضربان قلب استفاده می کند تا به صورت پویا مدت زمان دید را افزایش دهد تا زمان های مختلف پردازش را تطبیق دهد.

سرویس صف ساده یکی از قدیمی ترین سرویس های AWS است، بنابراین به احتمال زیاد با آن آشنایی دارید. این یکی از معدود خدماتی است که نام ساده آن هنوز دقیق است. SQS یک صف پیام ارائه می دهد که الگوی تولید کننده/مصرف کننده را پیاده سازی می کند. یک یا چند تولید کننده پیام ها را به صف اضافه می کنند و به طور ناهمزمان یک یا چند مصرف کننده آن پیام ها را می خوانند و به نحوی آنها را پردازش می کنند. پیام ها حداقل یک بار پردازش می شوند، به این معنی که مصرف کننده معمولاً دسترسی انحصاری به یک پیام دارد، اما این تضمین نمی شود.

در حالی که مصرف کننده روی یک پیام کار می کند، برای مدتی به نام مهلت دید از سایر مصرف کنندگان پنهان می شود. این وقفه دید حداکثر 12 ساعت دارد، اما می‌توانید آن را روی هر مقداری که می‌خواهید بین چند ثانیه تا 12 ساعت تنظیم کنید. پس از انقضای مهلت دید، پیام برای پردازش در دسترس سایر مصرف کنندگان قرار می گیرد. این یک راه راحت برای رسیدگی به خرابی‌های پردازش است که در آن مشتری قادر به پردازش یک پیام نیست. پس از انقضای مهلت زمانی، دیگری می‌تواند از جایی که قبلی متوقف شد ادامه دهد و دوباره امتحان کند.

یک کار یا صف کار یکی از رایج ترین موارد استفاده برای SQS است. تولیدکنندگان وظایفی را به یک صف اضافه می کنند و گروهی از مصرف کنندگان به صورت ناهمزمان روی آن وظایف کار می کنند. در صورتی که وظایف از نظر پیچیدگی و زمان لازم برای تکمیل آنها یکسان باشد، استفاده از یک بازه زمانی دید یکسان برای همه پیام‌ها به خوبی کار می‌کند. با آگاهی از اینکه پردازش یک پیام بیش از 10 دقیقه طول نمی کشد، می توانید با اطمینان از 10 دقیقه به عنوان مهلت دید هنگام انجام این کار استفاده کنید. ReceiveMessage تماس API برای واکشی پیام ها.

زمان دید با توجه به واریانس کم

تنوع زیاد در مدت زمان پردازش این کار را مشکل ساز می کند. برخی از کارهای شما ممکن است در 30 ثانیه تکمیل شوند و برخی دیگر ممکن است بیش از یک ساعت یا چند ساعت طول بکشد. چه چیزی را به عنوان مهلت دید انتخاب می کنید؟ از آنجایی که از قبل نمی‌دانید صف چه پیام‌هایی را به شما بازمی‌گرداند، نمی‌توانید براساس وظیفه‌ای که دریافت می‌کنید این کار را به صورت پویا انجام دهید. رویکرد ساده لوحانه انتخاب حداکثر مدت زمان پردازش یک پیام است، همانطور که در مثال اول انجام دادیم.

زمان دید با توجه به واریانس بالا

با این حال، معاوضه هایی وجود دارد. فرض کنید پردازش یک پیام 30 ثانیه ای با شکست مواجه می شود. این پیام در پایان مهلت دید برای مصرف کننده بعدی در دسترس خواهد بود. در اینجا حدود 85 دقیقه طول می کشد تا دوباره پردازش پیام را امتحان کنید. اگر کسی منتظر آن کار باشد، این کمتر از حد ایده آل است. سناریوی دیگر این است که ما به داده‌های تاریخی دسترسی نداشته باشیم، بنابراین نمی‌توانیم حدس دقیقی در مورد مقدار مناسبی برای بازه زمانی دید داشته باشیم. چگونه می توانیم این وضعیت را بهبود بخشیم؟

SQS به ما این امکان را می دهد که پس از دریافت پیام با استفاده از آن، زمان نمایان بودن آن را تغییر دهیم ChangeMessageVisibility تماس API. ما می‌توانیم این را با ایده‌های فرآیند Watchdog و ضربان‌های قلب ترکیب کنیم تا رویکرد تطبیقی‌تری ایجاد کنیم که زمان تکرار کارهای ناموفق را کاهش می‌دهد. بیایید این مفاهیم را بررسی کنیم.

اکثر نهادهای غیر هوش مصنوعی که این را می خوانند باید با ضربان قلب آشنا باشند. آنها نشانه بسیار خوبی هستند که شما هنوز زنده هستید. در فناوری اطلاعات، ضربان قلب نوعی سیگنال است که ما به صورت دوره ای ارسال می کنیم تا نشان دهد که هنوز زنده هستیم و در حال لگد زدن هستیم. در زمینه SQS، اگر بدانیم که پردازش هنوز در حال انجام است، می‌توانیم مدت زمان نمایش پیام‌های خود را هر چند دقیقه افزایش دهیم. اگر نتوانیم بازه زمانی دید را تمدید کنیم، این نشان می‌دهد که فرآیند متوقف شده است، و پس از مدت کوتاهی، پیام برای پردازش مجدد توسط مصرف‌کننده دیگری در دسترس قرار می‌گیرد.

سگ های نگهبان نیز باید از دنیای غیر دیجیتالی آشنا باشند. آنها یک منطقه را زیر نظر می گیرند و اگر اتفاقی در حال رخ دادن باشد هشدار می دهند. گاهی اوقات آن‌ها نقش‌های فعال‌تر یا، بهتر است بگوییم، نقش‌های گزنده‌تر را در پاسخ به فعالیت‌های درک شده بر عهده می‌گیرند. می‌توانیم از ایده سگ نگهبان نیز استفاده کنیم. ما فرآیندی خواهیم داشت که بررسی می‌کند که آیا کار هنوز در حال انجام است یا خیر و ضربان‌های قلب دوره‌ای را به SQS ارسال می‌کند تا نشان دهد که این مورد است.

ترکیب این ایده‌ها به ما امکان می‌دهد منطقی را طراحی کنیم که یک فرآیند برای دستیابی به این رفتار تطبیقی ​​باید پیاده‌سازی کند. من از فلوچارت زیر برای نشان دادن آنچه در حال وقوع است استفاده خواهم کرد. یک حلقه داخلی و خارجی وجود دارد. حلقه بیرونی منتظر پیام های بیشتری است و حلقه داخلی پیام های فردی را پردازش می کند. برای هر پیام جدید، حلقه داخلی یک فرآیند ناهمزمان را شروع می کند که روی پیام کار می کند. این می تواند یک رشته یا فرآیند جداگانه باشد که کاری را که پیام درخواست می کند انجام می دهد. این حیاتی است که این موضوع در همان موضوعی که دیده بان ما نیست. در غیر این صورت، این کار نخواهد کرد.

پس از شروع پردازش، حلقه داخلی شروع می شود. اگر پردازش ناموفق باشد، پایان می یابد، بنابراین حلقه بیرونی می تواند یک پیام جدید دریافت کند. تا زمانی که کار در حال انجام است و شکست نخورده است، ما به طور دوره ای زمان نمایش را تا پایان کار تمدید می کنیم و پیام را از صف حذف می کنیم. اگر مشکلی پیش بیاید، به عنوان مثال، نمونه از بین برود، پیام نسبتاً سریع در دسترس مصرف کننده دیگری قرار می گیرد.

فلوچارت

عکس ها ارزان هستند بیایید به کدهایی نگاه کنیم که می توانید در GitHub نیز پیدا کنید. برای اجرای خود به AWS SDK برای پایتون و یک صف استاندارد SQS نیاز دارید. ما با کدی شروع می کنیم که می تواند چند پیام ساختگی را به صف SQS ما ارسال کند.

def create_tasks_in_queue(queue_url, number_of_tasks_to_create) -> None:
    """
    Sends number_of_tasks_to_create to the queue.
    """

    sqs_res = boto3.resource("sqs")

    queue = sqs_res.Queue(queue_url)

    number_of_tasks_to_create = 3
    queue.send_messages(
        Entries=[
            {"Id": str(n), "MessageBody": f"Task #{n}"}
            for n in range(number_of_tasks_to_create)
        ]
    )
    LOGGER.info(
        "📮 Sent %s messages to the queue '%s'", number_of_tasks_to_create, QUEUE_NAME
    )
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

حلقه بیرونی در اجرا شده است main تابع. ابتدا وظایف را به صف اضافه می کنیم و سپس سعی می کنیم پیام ها را بخوانیم. به جای حلقه بی نهایت از فلوچارت، تا زمانی که با موفقیت همان تعداد پیامی را که به صف فرستادیم پردازش کنیم، در حال اجرا هستیم.

def main():
    #...

    queue_url = boto3.client("sqs").get_queue_url(QueueName=QUEUE_NAME)["QueueUrl"]
    LOGGER.debug("Queue-URL: %s", queue_url)

    number_of_tasks_to_create = 3
    create_tasks_in_queue(queue_url, number_of_tasks_to_create)

    sqs_res = boto3.resource("sqs")
    queue = sqs_res.Queue(queue_url)

    messages_successfully_processed = 0

    while messages_successfully_processed < number_of_tasks_to_create:

        messages = queue.receive_messages(
            MaxNumberOfMessages=1,
            VisibilityTimeout=5,
            WaitTimeSeconds=5,
        )

        if messages:
            message = messages[0]

            LOGGER.info("📋 Got message '%s' from the queue", message.body)
            start_processing(message)

            result = monitor_processing_progress(message, visibility_timeout=5)
            messages_successfully_processed = (
                messages_successfully_processed + 1
                if result
                else messages_successfully_processed
            )
        else:
            LOGGER.info("Found no new messages...")
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

منطق نگهبان و ضربان قلب در این است monitor_processing_progress تابع. من تابع را به‌جای یک حلقه به صورت بازگشتی پیاده‌سازی کردم – بیشتر به این دلیل که می‌توانم. یک حلقه در اینجا نیز مشکلی ندارد. شما می توانید به وضوح دو تصمیم حلقه داخلی را در اینجا ببینید processing_failed() و processing_completed() برای بررسی وضعیت پردازش فراخوانی می شوند. اگر قرار بود از این کد استفاده کنید، اینها توابعی هستند که باید خودتان پیاده سازی کنید.

def monitor_processing_progress(sqs_message, visibility_timeout: int) -> bool:
    """
    Check if the message is still being processed or processing failed.
    Provide the heartbeat to SQS if it's still processing.
    """

    if processing_failed():
        LOGGER.info("💔 Processing of %s failed, retrying later.", sqs_message.body)
        return False

    if processing_completed():

        LOGGER.info("✅ Processing of %s complete!", sqs_message.body)
        sqs_message.delete()
        return True

    LOGGER.info("💓 Processing of %s still in progress", sqs_message.body)
    visibility_timeout += 5
    sqs_message.change_visibility(VisibilityTimeout=visibility_timeout)

    time.sleep(5)
    return monitor_processing_progress(sqs_message, visibility_timeout)
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

در حال حاضر، این توابع مکان‌هایی هستند که 20 درصد مواقع وضعیت ناموفق را برمی‌گردانند و 50 درصد مواقع تکمیل آن کامل می‌شود. این فقط به این دلیل است که نسخه ی نمایشی جالب تر است.

def is_successful(chance_in_percent: int) -> bool:
    """Returns true with a chance of chance_in_percent when called."""

    return random.choice(range(1, 101)) <= chance_in_percent


def processing_failed() -> bool:
    """
    This is where you'd determine if processing the message
    failed somehow, this could mean checking logs for errors,
    checking if a process is still running, ...
    """

    percent_chance_of_failure = 20
    return is_successful(percent_chance_of_failure)


def processing_completed() -> bool:
    """
    This is where your watchdog would check if the processing
    is completed, this may mean checking for files/ status entries
    in a database or whatever you come up with.
    """

    percent_chance_of_success = 50
    return is_successful(percent_chance_of_success)
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

اگر کد را اجرا کنیم، ممکن است خروجی مانند زیر را ببینیم:

📮 Sent 3 messages to the queue 'test-queue'
📋 Got message 'Task #0' from the queue
🎬 Starting to process 'Task #0'
💓 Processing of Task #0 still in progress
💓 Processing of Task #0 still in progress
✅ Processing of Task #0 complete!
📋 Got message 'Task #2' from the queue
🎬 Starting to process 'Task #2'
💔 Processing of Task #2 failed, retrying later.
📋 Got message 'Task #1' from the queue
🎬 Starting to process 'Task #1'
✅ Processing of Task #1 complete!
📋 Got message 'Task #2' from the queue
🎬 Starting to process 'Task #2'
✅ Processing of Task #2 complete!
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

ما سه پیام به صف ارسال کردیم، و ابتدا روی کار 0 کار کرد که تکمیل آن چندین ثانیه طول کشید، بنابراین دو ضربان قلب برای افزایش زمان دید ارسال شد. در مرحله بعد، SQS وظیفه 2 را برگرداند که ما نتوانستیم آن را پردازش کنیم، بنابراین به صف برگردانده شد. پس از آن، وظیفه 1 در دسترس است و بلافاصله با موفقیت پردازش می شود. در نهایت، Task 2 دوباره ظاهر می شود و با موفقیت انجام می شود.

من به شما نشان داده‌ام که چگونه می‌توانید یک مصرف‌کننده SQS را پیاده‌سازی کنید که به‌صورت پویا مدت زمان دید را به‌روزرسانی می‌کند تا زمان‌های مختلف پردازش را در نظر بگیرد. باید توجه داشته باشید که این حداکثر محدودیت را تغییر نمی دهد. فقط می تواند پردازش را تا 12 ساعت تمدید کند، سپس SQS با خطا پاسخ می دهد. در حالی که این یک رویکرد انطباقی منطقی است، اما کمی پیچیدگی را نیز به مصرف کننده شما اضافه می کند. اگر مدت زمان پردازش شما بسیار پایدار است و اهداف تأخیر پایینی ندارید، احتمالاً بهتر است به اجرای ساده‌تر پایبند باشید.

از خواندن این مطلب متشکریم. امیدوارم چیز مفیدی یاد گرفته باشید اگر می‌خواهید عمیق‌تر شوید، می‌توانید کد را در GitHub بررسی کنید.

– موریس


عکس عنوان توسط Jair Lázaro در Unsplash

نوشته های مشابه

دیدگاهتان را بنویسید

نشانی ایمیل شما منتشر نخواهد شد. بخش‌های موردنیاز علامت‌گذاری شده‌اند *

دکمه بازگشت به بالا