قلب تپنده SQS – از Heartbeats و Watchdogs

استفاده از SQS به عنوان یک صف برای بافر وظایف احتمالاً رایج ترین مورد استفاده برای این سرویس است. در صورتی که این وظایف دامنه وسیعی از مدت زمان پردازش داشته باشند، همه چیز می تواند مشکل ساز شود. امروز، من به شما نشان خواهم داد که چگونه یک مصرف کننده SQS را پیاده سازی کنید که از ضربان قلب استفاده می کند تا به صورت پویا مدت زمان دید را افزایش دهد تا زمان های مختلف پردازش را تطبیق دهد.
سرویس صف ساده یکی از قدیمی ترین سرویس های AWS است، بنابراین به احتمال زیاد با آن آشنایی دارید. این یکی از معدود خدماتی است که نام ساده آن هنوز دقیق است. SQS یک صف پیام ارائه می دهد که الگوی تولید کننده/مصرف کننده را پیاده سازی می کند. یک یا چند تولید کننده پیام ها را به صف اضافه می کنند و به طور ناهمزمان یک یا چند مصرف کننده آن پیام ها را می خوانند و به نحوی آنها را پردازش می کنند. پیام ها حداقل یک بار پردازش می شوند، به این معنی که مصرف کننده معمولاً دسترسی انحصاری به یک پیام دارد، اما این تضمین نمی شود.
در حالی که مصرف کننده روی یک پیام کار می کند، برای مدتی به نام مهلت دید از سایر مصرف کنندگان پنهان می شود. این وقفه دید حداکثر 12 ساعت دارد، اما میتوانید آن را روی هر مقداری که میخواهید بین چند ثانیه تا 12 ساعت تنظیم کنید. پس از انقضای مهلت دید، پیام برای پردازش در دسترس سایر مصرف کنندگان قرار می گیرد. این یک راه راحت برای رسیدگی به خرابیهای پردازش است که در آن مشتری قادر به پردازش یک پیام نیست. پس از انقضای مهلت زمانی، دیگری میتواند از جایی که قبلی متوقف شد ادامه دهد و دوباره امتحان کند.
یک کار یا صف کار یکی از رایج ترین موارد استفاده برای SQS است. تولیدکنندگان وظایفی را به یک صف اضافه می کنند و گروهی از مصرف کنندگان به صورت ناهمزمان روی آن وظایف کار می کنند. در صورتی که وظایف از نظر پیچیدگی و زمان لازم برای تکمیل آنها یکسان باشد، استفاده از یک بازه زمانی دید یکسان برای همه پیامها به خوبی کار میکند. با آگاهی از اینکه پردازش یک پیام بیش از 10 دقیقه طول نمی کشد، می توانید با اطمینان از 10 دقیقه به عنوان مهلت دید هنگام انجام این کار استفاده کنید. ReceiveMessage
تماس API برای واکشی پیام ها.
تنوع زیاد در مدت زمان پردازش این کار را مشکل ساز می کند. برخی از کارهای شما ممکن است در 30 ثانیه تکمیل شوند و برخی دیگر ممکن است بیش از یک ساعت یا چند ساعت طول بکشد. چه چیزی را به عنوان مهلت دید انتخاب می کنید؟ از آنجایی که از قبل نمیدانید صف چه پیامهایی را به شما بازمیگرداند، نمیتوانید براساس وظیفهای که دریافت میکنید این کار را به صورت پویا انجام دهید. رویکرد ساده لوحانه انتخاب حداکثر مدت زمان پردازش یک پیام است، همانطور که در مثال اول انجام دادیم.
با این حال، معاوضه هایی وجود دارد. فرض کنید پردازش یک پیام 30 ثانیه ای با شکست مواجه می شود. این پیام در پایان مهلت دید برای مصرف کننده بعدی در دسترس خواهد بود. در اینجا حدود 85 دقیقه طول می کشد تا دوباره پردازش پیام را امتحان کنید. اگر کسی منتظر آن کار باشد، این کمتر از حد ایده آل است. سناریوی دیگر این است که ما به دادههای تاریخی دسترسی نداشته باشیم، بنابراین نمیتوانیم حدس دقیقی در مورد مقدار مناسبی برای بازه زمانی دید داشته باشیم. چگونه می توانیم این وضعیت را بهبود بخشیم؟
SQS به ما این امکان را می دهد که پس از دریافت پیام با استفاده از آن، زمان نمایان بودن آن را تغییر دهیم ChangeMessageVisibility
تماس API. ما میتوانیم این را با ایدههای فرآیند Watchdog و ضربانهای قلب ترکیب کنیم تا رویکرد تطبیقیتری ایجاد کنیم که زمان تکرار کارهای ناموفق را کاهش میدهد. بیایید این مفاهیم را بررسی کنیم.
اکثر نهادهای غیر هوش مصنوعی که این را می خوانند باید با ضربان قلب آشنا باشند. آنها نشانه بسیار خوبی هستند که شما هنوز زنده هستید. در فناوری اطلاعات، ضربان قلب نوعی سیگنال است که ما به صورت دوره ای ارسال می کنیم تا نشان دهد که هنوز زنده هستیم و در حال لگد زدن هستیم. در زمینه SQS، اگر بدانیم که پردازش هنوز در حال انجام است، میتوانیم مدت زمان نمایش پیامهای خود را هر چند دقیقه افزایش دهیم. اگر نتوانیم بازه زمانی دید را تمدید کنیم، این نشان میدهد که فرآیند متوقف شده است، و پس از مدت کوتاهی، پیام برای پردازش مجدد توسط مصرفکننده دیگری در دسترس قرار میگیرد.
سگ های نگهبان نیز باید از دنیای غیر دیجیتالی آشنا باشند. آنها یک منطقه را زیر نظر می گیرند و اگر اتفاقی در حال رخ دادن باشد هشدار می دهند. گاهی اوقات آنها نقشهای فعالتر یا، بهتر است بگوییم، نقشهای گزندهتر را در پاسخ به فعالیتهای درک شده بر عهده میگیرند. میتوانیم از ایده سگ نگهبان نیز استفاده کنیم. ما فرآیندی خواهیم داشت که بررسی میکند که آیا کار هنوز در حال انجام است یا خیر و ضربانهای قلب دورهای را به SQS ارسال میکند تا نشان دهد که این مورد است.
ترکیب این ایدهها به ما امکان میدهد منطقی را طراحی کنیم که یک فرآیند برای دستیابی به این رفتار تطبیقی باید پیادهسازی کند. من از فلوچارت زیر برای نشان دادن آنچه در حال وقوع است استفاده خواهم کرد. یک حلقه داخلی و خارجی وجود دارد. حلقه بیرونی منتظر پیام های بیشتری است و حلقه داخلی پیام های فردی را پردازش می کند. برای هر پیام جدید، حلقه داخلی یک فرآیند ناهمزمان را شروع می کند که روی پیام کار می کند. این می تواند یک رشته یا فرآیند جداگانه باشد که کاری را که پیام درخواست می کند انجام می دهد. این حیاتی است که این موضوع در همان موضوعی که دیده بان ما نیست. در غیر این صورت، این کار نخواهد کرد.
پس از شروع پردازش، حلقه داخلی شروع می شود. اگر پردازش ناموفق باشد، پایان می یابد، بنابراین حلقه بیرونی می تواند یک پیام جدید دریافت کند. تا زمانی که کار در حال انجام است و شکست نخورده است، ما به طور دوره ای زمان نمایش را تا پایان کار تمدید می کنیم و پیام را از صف حذف می کنیم. اگر مشکلی پیش بیاید، به عنوان مثال، نمونه از بین برود، پیام نسبتاً سریع در دسترس مصرف کننده دیگری قرار می گیرد.
عکس ها ارزان هستند بیایید به کدهایی نگاه کنیم که می توانید در GitHub نیز پیدا کنید. برای اجرای خود به AWS SDK برای پایتون و یک صف استاندارد SQS نیاز دارید. ما با کدی شروع می کنیم که می تواند چند پیام ساختگی را به صف SQS ما ارسال کند.
def create_tasks_in_queue(queue_url, number_of_tasks_to_create) -> None:
"""
Sends number_of_tasks_to_create to the queue.
"""
sqs_res = boto3.resource("sqs")
queue = sqs_res.Queue(queue_url)
number_of_tasks_to_create = 3
queue.send_messages(
Entries=[
{"Id": str(n), "MessageBody": f"Task #{n}"}
for n in range(number_of_tasks_to_create)
]
)
LOGGER.info(
"📮 Sent %s messages to the queue '%s'", number_of_tasks_to_create, QUEUE_NAME
)
حلقه بیرونی در اجرا شده است main
تابع. ابتدا وظایف را به صف اضافه می کنیم و سپس سعی می کنیم پیام ها را بخوانیم. به جای حلقه بی نهایت از فلوچارت، تا زمانی که با موفقیت همان تعداد پیامی را که به صف فرستادیم پردازش کنیم، در حال اجرا هستیم.
def main():
#...
queue_url = boto3.client("sqs").get_queue_url(QueueName=QUEUE_NAME)["QueueUrl"]
LOGGER.debug("Queue-URL: %s", queue_url)
number_of_tasks_to_create = 3
create_tasks_in_queue(queue_url, number_of_tasks_to_create)
sqs_res = boto3.resource("sqs")
queue = sqs_res.Queue(queue_url)
messages_successfully_processed = 0
while messages_successfully_processed < number_of_tasks_to_create:
messages = queue.receive_messages(
MaxNumberOfMessages=1,
VisibilityTimeout=5,
WaitTimeSeconds=5,
)
if messages:
message = messages[0]
LOGGER.info("📋 Got message '%s' from the queue", message.body)
start_processing(message)
result = monitor_processing_progress(message, visibility_timeout=5)
messages_successfully_processed = (
messages_successfully_processed + 1
if result
else messages_successfully_processed
)
else:
LOGGER.info("Found no new messages...")
منطق نگهبان و ضربان قلب در این است monitor_processing_progress
تابع. من تابع را بهجای یک حلقه به صورت بازگشتی پیادهسازی کردم – بیشتر به این دلیل که میتوانم. یک حلقه در اینجا نیز مشکلی ندارد. شما می توانید به وضوح دو تصمیم حلقه داخلی را در اینجا ببینید processing_failed()
و processing_completed()
برای بررسی وضعیت پردازش فراخوانی می شوند. اگر قرار بود از این کد استفاده کنید، اینها توابعی هستند که باید خودتان پیاده سازی کنید.
def monitor_processing_progress(sqs_message, visibility_timeout: int) -> bool:
"""
Check if the message is still being processed or processing failed.
Provide the heartbeat to SQS if it's still processing.
"""
if processing_failed():
LOGGER.info("💔 Processing of %s failed, retrying later.", sqs_message.body)
return False
if processing_completed():
LOGGER.info("✅ Processing of %s complete!", sqs_message.body)
sqs_message.delete()
return True
LOGGER.info("💓 Processing of %s still in progress", sqs_message.body)
visibility_timeout += 5
sqs_message.change_visibility(VisibilityTimeout=visibility_timeout)
time.sleep(5)
return monitor_processing_progress(sqs_message, visibility_timeout)
در حال حاضر، این توابع مکانهایی هستند که 20 درصد مواقع وضعیت ناموفق را برمیگردانند و 50 درصد مواقع تکمیل آن کامل میشود. این فقط به این دلیل است که نسخه ی نمایشی جالب تر است.
def is_successful(chance_in_percent: int) -> bool:
"""Returns true with a chance of chance_in_percent when called."""
return random.choice(range(1, 101)) <= chance_in_percent
def processing_failed() -> bool:
"""
This is where you'd determine if processing the message
failed somehow, this could mean checking logs for errors,
checking if a process is still running, ...
"""
percent_chance_of_failure = 20
return is_successful(percent_chance_of_failure)
def processing_completed() -> bool:
"""
This is where your watchdog would check if the processing
is completed, this may mean checking for files/ status entries
in a database or whatever you come up with.
"""
percent_chance_of_success = 50
return is_successful(percent_chance_of_success)
اگر کد را اجرا کنیم، ممکن است خروجی مانند زیر را ببینیم:
📮 Sent 3 messages to the queue 'test-queue'
📋 Got message 'Task #0' from the queue
🎬 Starting to process 'Task #0'
💓 Processing of Task #0 still in progress
💓 Processing of Task #0 still in progress
✅ Processing of Task #0 complete!
📋 Got message 'Task #2' from the queue
🎬 Starting to process 'Task #2'
💔 Processing of Task #2 failed, retrying later.
📋 Got message 'Task #1' from the queue
🎬 Starting to process 'Task #1'
✅ Processing of Task #1 complete!
📋 Got message 'Task #2' from the queue
🎬 Starting to process 'Task #2'
✅ Processing of Task #2 complete!
ما سه پیام به صف ارسال کردیم، و ابتدا روی کار 0 کار کرد که تکمیل آن چندین ثانیه طول کشید، بنابراین دو ضربان قلب برای افزایش زمان دید ارسال شد. در مرحله بعد، SQS وظیفه 2 را برگرداند که ما نتوانستیم آن را پردازش کنیم، بنابراین به صف برگردانده شد. پس از آن، وظیفه 1 در دسترس است و بلافاصله با موفقیت پردازش می شود. در نهایت، Task 2 دوباره ظاهر می شود و با موفقیت انجام می شود.
من به شما نشان دادهام که چگونه میتوانید یک مصرفکننده SQS را پیادهسازی کنید که بهصورت پویا مدت زمان دید را بهروزرسانی میکند تا زمانهای مختلف پردازش را در نظر بگیرد. باید توجه داشته باشید که این حداکثر محدودیت را تغییر نمی دهد. فقط می تواند پردازش را تا 12 ساعت تمدید کند، سپس SQS با خطا پاسخ می دهد. در حالی که این یک رویکرد انطباقی منطقی است، اما کمی پیچیدگی را نیز به مصرف کننده شما اضافه می کند. اگر مدت زمان پردازش شما بسیار پایدار است و اهداف تأخیر پایینی ندارید، احتمالاً بهتر است به اجرای سادهتر پایبند باشید.
از خواندن این مطلب متشکریم. امیدوارم چیز مفیدی یاد گرفته باشید اگر میخواهید عمیقتر شوید، میتوانید کد را در GitHub بررسی کنید.
– موریس
عکس عنوان توسط Jair Lázaro در Unsplash