برنامه نویسی

چرا ما یک صف پیام MongoDB ساختیم و چرخ را دوباره اختراع کردیم

Summarize this content to 400 words in Persian Lang
هی👋

من مدز کویست هستم، بنیانگذار All Quiet. ما یک صف پیام خانگی را بر اساس MongoDB پیاده سازی کرده ایم و من اینجا هستم تا در مورد آن صحبت کنم:

چرا چرخ را دوباره اختراع کردیم
چگونه چرخ را دوباره اختراع کردیم

چرا به صف پیام نیاز داریم؟

All Quiet یک پلت فرم مدرن مدیریت حوادث است که شبیه به PagerDuty است. پلتفرم ما به ویژگی هایی مانند:

پس از ثبت نام کاربر، یک ایمیل با انتخاب دوگانه به صورت ناهمزمان ارسال می شود
ارسال ایمیل یادآوری 24 ساعت پس از ثبت نام
ارسال اعلان‌های فشاری با Firebase Cloud Messaging (FCM)، که ممکن است به دلیل مشکلات شبکه یا بارگذاری با شکست مواجه شود. از آنجایی که اعلان‌های فشار برای برنامه ما بسیار مهم هستند، در صورت بروز مشکل، باید دوباره ارسال آن‌ها را امتحان کنیم.
پذیرش ایمیل از خارج از ادغام ما و پردازش آنها به حوادث. این فرآیند ممکن است شکست بخورد، بنابراین ما می‌خواستیم آن را جدا کنیم و هر بار ایمیل را در یک صف پردازش کنیم.

پشته فناوری ما

برای درک نیازهای خاص ما، مهم است که در مورد پشته فناوری خود اطلاعاتی کسب کنیم:

ما یک برنامه وب یکپارچه را بر اساس NET Core 7 اجرا می کنیم. برنامه .NET Core در یک ظرف Docker اجرا می شود.
چندین کانتینر را به صورت موازی اجرا می کنیم.
یک نمونه HAProxy درخواست های HTTP را به طور مساوی در هر کانتینر توزیع می کند و از راه اندازی بسیار در دسترس اطمینان می دهد.
ما از MongoDB به عنوان پایگاه داده زیربنایی خود استفاده می کنیم که در مناطق در دسترس تکرار می شود.
همه اجزای فوق توسط AWS روی ماشین های مجازی EC2 عمومی میزبانی می شوند.

چرا چرخ را دوباره اختراع کردیم

ما یک مکانیسم صف ساده را می خواستیم که بتواند در چندین فرآیند به طور همزمان اجرا شود و در عین حال تضمین کند که هر پیام فقط یک بار پردازش می شود.
ما به الگوی میخانه/فرعی نیاز نداشتیم.
هدف ما یک سیستم توزیع پیچیده مبتنی بر منبع یابی CQRS / رویداد نبود، زیرا، می دانید، اولین قانون سیستم های توزیع شده توزیع نکردن است.
ما می‌خواستیم با پیروی از فلسفه انتخاب “تکنولوژی خسته کننده” همه چیز را تا حد امکان ساده نگه داریم.

در نهایت، این در مورد به حداقل رساندن تعداد قطعات متحرک در زیرساخت شما است. هدف ما ایجاد ویژگی‌های فوق‌العاده برای مشتریان عالی‌مان است و حفظ مطمئن خدمات خود ضروری است. مدیریت یک سیستم پایگاه داده واحد برای دستیابی به بیش از پنج نهن زمان آپتایم به اندازه کافی چالش برانگیز است. پس چرا خود را با مدیریت یک خوشه اضافی HA RabbitMQ متحمل شوید؟

چرا فقط از AWS SQS استفاده نمی کنید؟

بله… راه حل های ابری مانند AWS SQS، Google Cloud Tasks یا Azure Queue Storage فوق العاده هستند! با این حال، آنها منجر به قفل شدن فروشنده می شدند. ما صرفاً آرزوی مستقل بودن و مقرون به صرفه بودن را داریم و در عین حال خدماتی مقیاس‌پذیر به مشتریان خود ارائه می‌کنیم.

صف پیام چیست؟

صف پیام سیستمی است که پیام ها را ذخیره می کند. تولیدکنندگان پیام‌ها این پیام‌ها را در صف ذخیره می‌کنند، که بعداً توسط مصرف‌کنندگان برای پردازش از صف خارج می‌شوند. این برای جدا کردن اجزاء بسیار سودمند است، به خصوص زمانی که پردازش پیام ها یک کار منابع فشرده است.

صف ما باید چه ویژگی هایی را نشان دهد؟

استفاده از MongoDB به عنوان ذخیره سازی داده ما
تضمین این که هر پیام فقط یک بار مصرف شود
به چندین مصرف کننده اجازه می دهد پیام ها را به طور همزمان پردازش کنند
اطمینان از اینکه اگر پردازش پیام با شکست مواجه شود، امکان تکرار وجود دارد
امکان برنامه ریزی مصرف پیام برای آینده
بدون نیاز به سفارش تضمینی
اطمینان از در دسترس بودن بالا
اطمینان از بادوام بودن پیام‌ها و وضعیت آنها و مقاومت در برابر راه‌اندازی مجدد یا توقف طولانی‌مدت

MongoDB در طول سال ها به طور قابل توجهی تکامل یافته است و می تواند معیارهای ذکر شده در بالا را برآورده کند.

پیاده سازی

در بخش‌های بعدی، من شما را از طریق پیاده‌سازی اختصاصی MongoDB صف پیام خود راهنمایی می‌کنم. در حالی که شما به یک کتابخانه کلاینت مناسب برای زبان برنامه نویسی ترجیحی خود مانند NodeJS، Go، یا C# در مورد All Quiet نیاز دارید، مفاهیمی که من به اشتراک خواهم گذاشت، پلتفرم آگنوستیک هستند.

صف ها

هر صفی که می خواهید استفاده کنید به عنوان یک مجموعه اختصاصی در پایگاه داده MongoDB شما نشان داده می شود.مدل پیام

در اینجا یک مثال از یک پیام پردازش شده است:

{
“_id” : NumberLong(638269014234217933),
“Statuses” : [
{
“Status” : “Processed”,
“Timestamp” : ISODate(“2023-08-06T06:50:23.753+0000”),
“NextReevaluation” : null
},
{
“Status” : “Processing”,
“Timestamp” : ISODate(“2023-08-06T06:50:23.572+0000”),
“NextReevaluation” : null
},
{
“Status” : “Enqueued”,
“Timestamp” : ISODate(“2023-08-06T06:50:23.421+0000”),
“NextReevaluation” : null
}
],
“Payload” : {
“YourData” : “abc123”
}
}

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

بیایید به هر ویژگی پیام نگاه کنیم.

_id

را _id فیلد ویژگی شناسه منحصر به فرد متعارف MongoDB است. در اینجا، شامل یک NumberLong، نه یک ObjectId . نیاز داریم NumberLong بجای ObjectId زیرا:

در حالی که ObjectId مقادیر باید در طول زمان افزایش یابد، آنها لزوما یکنواخت نیستند. این به این دلیل است که آنها:

تنها حاوی یک ثانیه وضوح زمانی است، بنابراین مقادیر ObjectId ایجاد شده در همان ثانیه دارای ترتیب تضمینی نیستند و توسط کلاینت‌هایی که ممکن است ساعت‌های سیستم متفاوتی داشته باشند، تولید می‌شوند.

در پیاده سازی سی شارپ خود، یک عدد ایجاد می کنیم Id با دقت میلی ثانیه و سفارش تضمینی بر اساس زمان درج. اگرچه ما به ترتیب پردازش دقیق در یک محیط چند مصرف کننده (مشابه RabbitMQ) نیاز نداریم، حفظ نظم FIFO هنگام کار فقط با یک مصرف کننده ضروری است. دستیابی به این امر با ObjectId قابل اجرا نیست اگر این برای شما حیاتی نیست، همچنان می توانید استفاده کنید ObjectId.

وضعیت ها

را Statuses ویژگی شامل یک آرایه حاوی تاریخچه پردازش پیام است. در شاخص 0، وضعیت فعلی را پیدا خواهید کرد که برای نمایه سازی بسیار مهم است.

شی وضعیت خود شامل سه ویژگی است:

Status: می تواند “در نوبت”، “پردازش”، “پردازش شده” یا “شکست خورده” باشد.

Timestamp: این نشان دهنده زمان فعلی است.

NextReevaluation: زمانی را ثبت می کند که ارزیابی بعدی باید انجام شود، که هم برای تلاش های مجدد و هم برای اجرای برنامه ریزی شده در آینده ضروری است.

ظرفیت ترابری

این ویژگی حاوی بار خاص پیام شما است.

در نوبت گذاشتن پیام

افزودن یک پیام یک عملیات درج ساده در مجموعه با وضعیت تنظیم شده است “Enqueued”.

برای پردازش فوری، تنظیم کنید NextReevaluation باطل شدن
برای پردازش آینده، تنظیم کنید NextReevaluation به یک مهر زمانی در آینده، زمانی که می خواهید پیام شما پردازش شود.

db.yourQueueCollection.insert({
“_id” : NumberLong(638269014234217933),
“Statuses” : [
{
“Status” : “Enqueued”,
“Timestamp” : ISODate(“2023-08-06T06:50:23.421+0000”),
“NextReevaluation” : null
}
],
“Payload” : {
“YourData” : “abc123”
}
});

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

جدا کردن یک پیام

Dequeuing کمی پیچیده تر است اما هنوز نسبتاً ساده است. این به شدت به قابلیت های همزمان خواندن و به روز رسانی اتمی MongoDB متکی است.

این ویژگی ضروری MongoDB تضمین می کند:

هر پیام فقط یک بار پردازش می شود.
چندین مصرف کننده می توانند پیام ها را به طور همزمان با خیال راحت پردازش کنند.

db.yourQueueCollection.findAndModify({
“query”: {
“$and”: [
{
“Statuses.0.Status”: “Enqueued”
},
{
“Statuses.0.NextReevaluation”: null
}
] },
“update”: {
“$push”: {
“Statuses”: {
“$each”: [
{
“Status”: “Processing”,
“Timestamp”: ISODate(“2023-08-06T06:50:23.800+0000”),
“NextReevaluation”: null
}
],
“$position”: 0
}
}
}
});

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

بنابراین ما در حال خواندن یک پیام هستیم که در حالت است “Enqueued” و در عین حال با تنظیم وضعیت آن را اصلاح کنید “Processing” در موقعیت 0. از آنجایی که این عملیات اتمی است، تضمین می کند که پیام توسط مصرف کننده دیگری دریافت نخواهد شد.

علامت گذاری پیام به عنوان پردازش شده

هنگامی که پردازش پیام کامل شد، به روز رسانی وضعیت پیام به یک موضوع ساده است “Processed” با استفاده از پیام id.

db.yourQueueCollection.findAndModify({
“query”: {
“_id”: NumberLong(638269014234217933)
},
“update”: {
“$push”: {
“Statuses”: {
“$each”: [
{
“Status”: “Processed”,
“Timestamp”: ISODate(“2023-08-06T06:50:24.100+0000”),
“NextReevaluation”: null
}
],
“$position”: 0
}
}
}
});

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

علامت گذاری یک پیام به عنوان ناموفق

اگر پردازش ناموفق بود، باید پیام را مطابق با آن علامت گذاری کنیم. اغلب، ممکن است بخواهید دوباره پردازش پیام را امتحان کنید. این را می توان با قرار دادن مجدد پیام به دست آورد. در بسیاری از سناریوها، بسته به ماهیت شکست پردازش، پردازش مجدد پیام پس از یک تاخیر خاص، مانند 10 ثانیه منطقی است.

db.yourQueueCollection.findAndModify({
“query”: {
“_id”: NumberLong(638269014234217933)
},
“update”: {
“$push”: {
“Statuses”: {
“$each”: [
{
“Status”: “Failed”,
“Timestamp”: ISODate(“2023-08-06T06:50:24.100+0000”),
“NextReevaluation”: ISODate(“2023-08-06T07:00:24.100+0000”)
}
],
“$position”: 0
}
}
}
});

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

حلقه dequeuing

ما مشخص کرده‌ایم که چگونه می‌توانیم به راحتی آیتم‌ها را از «صف» خود، که در واقع صرفاً یک مجموعه MongoDB است، در صف قرار دهیم و از صف خارج کنیم. ما حتی می‌توانیم با استفاده از این پیام‌ها را برای آینده «زمان‌بندی» کنیم NextReevaluation رشته.

چیزی که کم است این است که چگونه مرتباً صف می کشیم. مصرف کنندگان باید آن را اجرا کنند findAndModify فرمان در نوعی حلقه یک رویکرد ساده ایجاد یک حلقه بی پایان است که در آن یک پیام را رد کرده و پردازش می کنیم. این روش ساده و موثر است. با این حال، فشار قابل توجهی بر پایگاه داده و شبکه وارد می کند.

یک جایگزین، معرفی یک تاخیر، به عنوان مثال، 100 میلی ثانیه، بین تکرارهای حلقه است. این امر بار را به میزان قابل توجهی کاهش می دهد، اما سرعت دک کردن را نیز کاهش می دهد.

راه حل مشکل همان چیزی است که MongoDB از آن به عنوان جریان تغییر یاد می کند.

MongoDB Change Streams

جریان های تغییر چیست؟ من نمی توانم آن را بهتر از بچه های MongoDB توضیح دهم:

جریان‌های تغییر به برنامه‌ها اجازه می‌دهد به تغییرات داده‌های بلادرنگ دسترسی داشته باشند […]. برنامه‌ها می‌توانند از جریان‌های تغییر برای اشتراک در همه تغییرات داده‌ها در یک مجموعه استفاده کنند […] و بلافاصله به آنها واکنش نشان می دهد.

عالی! کاری که ما می توانیم انجام دهیم این است که به اسناد جدید ایجاد شده در مجموعه صف خود گوش دهیم، که در واقع به معنای گوش دادن به پیام های جدید در نوبت است.

این کاملا ساده است:

const changeStream = db.yourQueueCollection.watch();
changeStream.on(‘insert’, changeEvent => {
// Dequeue the message
db.yourQueueCollection.findAndModify({
“query”: changeEvent.documentKey._id,
“update”: {
“$push”: {
“Statuses”: {
“$each”: [
{
“Status”: “Processing”,
“Timestamp”: ISODate(“2023-08-06T06:50:24.100+0000”),
“NextReevaluation”: null
}
],
“$position”: 0
}
}
}
});

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

پیام های برنامه ریزی شده و یتیم

با این حال، رویکرد جریان تغییر برای پیام‌های برنامه‌ریزی‌شده و یتیم کار نمی‌کند، زیرا واضح است که هیچ تغییری وجود ندارد که بتوانیم به آن گوش دهیم.

پیام های برنامه ریزی شده به سادگی در مجموعه با وضعیت قرار می گیرند “Enqueued” و الف “NextReevaluation” زمینه برای آینده تنظیم شده است.
پیام های یتیم پیام هایی هستند که در “Processing” وضعیت زمانی که فرآیند مصرف کننده آنها از بین رفت. آنها با وضعیت در مجموعه باقی می مانند “Processing” اما هیچ مصرف کننده ای هرگز وضعیت خود را تغییر نخواهد داد “Processed” یا “Failed”.

برای این موارد استفاده، باید به حلقه ساده خود برگردیم. با این حال، ما می توانیم از یک تاخیر نسبتاً سخاوتمندانه بین تکرارها استفاده کنیم.

پایگاه داده های “سنتی”، مانند MySQL، PostgreSQL، یا MongoDB (که به نظر من سنتی هستند)، امروزه فوق العاده قدرتمند هستند. اگر به درستی استفاده شود (مطمئن شوید که شاخص های شما بهینه شده اند!)، سریع هستند، به طرز چشمگیری مقیاس می شوند و در پلتفرم های میزبانی سنتی مقرون به صرفه هستند.

بسیاری از موارد استفاده را می توان تنها با استفاده از یک پایگاه داده و زبان برنامه نویسی دلخواه شما برطرف کرد. همیشه داشتن “ابزار مناسب برای کار مناسب” ضروری نیست، به این معنی که مجموعه متنوعی از ابزارها مانند Redis، Elasticsearch، RabbitMQ و غیره را نگهداری کنید.

اگرچه راه حل پیشنهادی ممکن است با عملکرد، به عنوان مثال، RabbitMQ مطابقت نداشته باشد، اما معمولاً کافی است و می تواند تا حدی باشد که موفقیت قابل توجهی را برای استارتاپ شما رقم بزند.

مهندسی نرم افزار در مورد پیمایش مبادلات است. مال خودت را عاقلانه انتخاب کن

هی👋

من مدز کویست هستم، بنیانگذار All Quiet. ما یک صف پیام خانگی را بر اساس MongoDB پیاده سازی کرده ایم و من اینجا هستم تا در مورد آن صحبت کنم:

  • چرا چرخ را دوباره اختراع کردیم
  • چگونه چرخ را دوباره اختراع کردیم

چرا به صف پیام نیاز داریم؟

All Quiet یک پلت فرم مدرن مدیریت حوادث است که شبیه به PagerDuty است. پلتفرم ما به ویژگی هایی مانند:

  • پس از ثبت نام کاربر، یک ایمیل با انتخاب دوگانه به صورت ناهمزمان ارسال می شود
  • ارسال ایمیل یادآوری 24 ساعت پس از ثبت نام
  • ارسال اعلان‌های فشاری با Firebase Cloud Messaging (FCM)، که ممکن است به دلیل مشکلات شبکه یا بارگذاری با شکست مواجه شود. از آنجایی که اعلان‌های فشار برای برنامه ما بسیار مهم هستند، در صورت بروز مشکل، باید دوباره ارسال آن‌ها را امتحان کنیم.
  • پذیرش ایمیل از خارج از ادغام ما و پردازش آنها به حوادث. این فرآیند ممکن است شکست بخورد، بنابراین ما می‌خواستیم آن را جدا کنیم و هر بار ایمیل را در یک صف پردازش کنیم.

توضیحات تصویر

پشته فناوری ما

برای درک نیازهای خاص ما، مهم است که در مورد پشته فناوری خود اطلاعاتی کسب کنیم:

  • ما یک برنامه وب یکپارچه را بر اساس NET Core 7 اجرا می کنیم. برنامه .NET Core در یک ظرف Docker اجرا می شود.
  • چندین کانتینر را به صورت موازی اجرا می کنیم.
  • یک نمونه HAProxy درخواست های HTTP را به طور مساوی در هر کانتینر توزیع می کند و از راه اندازی بسیار در دسترس اطمینان می دهد.
  • ما از MongoDB به عنوان پایگاه داده زیربنایی خود استفاده می کنیم که در مناطق در دسترس تکرار می شود.
  • همه اجزای فوق توسط AWS روی ماشین های مجازی EC2 عمومی میزبانی می شوند.

چرا چرخ را دوباره اختراع کردیم

  • ما یک مکانیسم صف ساده را می خواستیم که بتواند در چندین فرآیند به طور همزمان اجرا شود و در عین حال تضمین کند که هر پیام فقط یک بار پردازش می شود.
  • ما به الگوی میخانه/فرعی نیاز نداشتیم.
  • هدف ما یک سیستم توزیع پیچیده مبتنی بر منبع یابی CQRS / رویداد نبود، زیرا، می دانید، اولین قانون سیستم های توزیع شده توزیع نکردن است.
  • ما می‌خواستیم با پیروی از فلسفه انتخاب “تکنولوژی خسته کننده” همه چیز را تا حد امکان ساده نگه داریم.

در نهایت، این در مورد به حداقل رساندن تعداد قطعات متحرک در زیرساخت شما است. هدف ما ایجاد ویژگی‌های فوق‌العاده برای مشتریان عالی‌مان است و حفظ مطمئن خدمات خود ضروری است. مدیریت یک سیستم پایگاه داده واحد برای دستیابی به بیش از پنج نهن زمان آپتایم به اندازه کافی چالش برانگیز است. پس چرا خود را با مدیریت یک خوشه اضافی HA RabbitMQ متحمل شوید؟

چرا فقط از AWS SQS استفاده نمی کنید؟

بله… راه حل های ابری مانند AWS SQS، Google Cloud Tasks یا Azure Queue Storage فوق العاده هستند! با این حال، آنها منجر به قفل شدن فروشنده می شدند. ما صرفاً آرزوی مستقل بودن و مقرون به صرفه بودن را داریم و در عین حال خدماتی مقیاس‌پذیر به مشتریان خود ارائه می‌کنیم.

توضیحات تصویر

صف پیام چیست؟

صف پیام سیستمی است که پیام ها را ذخیره می کند. تولیدکنندگان پیام‌ها این پیام‌ها را در صف ذخیره می‌کنند، که بعداً توسط مصرف‌کنندگان برای پردازش از صف خارج می‌شوند. این برای جدا کردن اجزاء بسیار سودمند است، به خصوص زمانی که پردازش پیام ها یک کار منابع فشرده است.

صف ما باید چه ویژگی هایی را نشان دهد؟

  • استفاده از MongoDB به عنوان ذخیره سازی داده ما
  • تضمین این که هر پیام فقط یک بار مصرف شود
  • به چندین مصرف کننده اجازه می دهد پیام ها را به طور همزمان پردازش کنند
  • اطمینان از اینکه اگر پردازش پیام با شکست مواجه شود، امکان تکرار وجود دارد
  • امکان برنامه ریزی مصرف پیام برای آینده
  • بدون نیاز به سفارش تضمینی
  • اطمینان از در دسترس بودن بالا
  • اطمینان از بادوام بودن پیام‌ها و وضعیت آنها و مقاومت در برابر راه‌اندازی مجدد یا توقف طولانی‌مدت

MongoDB در طول سال ها به طور قابل توجهی تکامل یافته است و می تواند معیارهای ذکر شده در بالا را برآورده کند.

پیاده سازی

در بخش‌های بعدی، من شما را از طریق پیاده‌سازی اختصاصی MongoDB صف پیام خود راهنمایی می‌کنم. در حالی که شما به یک کتابخانه کلاینت مناسب برای زبان برنامه نویسی ترجیحی خود مانند NodeJS، Go، یا C# در مورد All Quiet نیاز دارید، مفاهیمی که من به اشتراک خواهم گذاشت، پلتفرم آگنوستیک هستند.

صف ها

هر صفی که می خواهید استفاده کنید به عنوان یک مجموعه اختصاصی در پایگاه داده MongoDB شما نشان داده می شود.
مدل پیام

در اینجا یک مثال از یک پیام پردازش شده است:

{
    "_id" : NumberLong(638269014234217933),
    "Statuses" : [
        {
            "Status" : "Processed",
            "Timestamp" : ISODate("2023-08-06T06:50:23.753+0000"),
            "NextReevaluation" : null
        },
        {
            "Status" : "Processing",
            "Timestamp" : ISODate("2023-08-06T06:50:23.572+0000"),
            "NextReevaluation" : null
        },
        {
            "Status" : "Enqueued",
            "Timestamp" : ISODate("2023-08-06T06:50:23.421+0000"),
            "NextReevaluation" : null
        }
    ],
    "Payload" : {
        "YourData" : "abc123"
    }
}

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

بیایید به هر ویژگی پیام نگاه کنیم.

_id

را _id فیلد ویژگی شناسه منحصر به فرد متعارف MongoDB است. در اینجا، شامل یک NumberLong، نه یک ObjectId . نیاز داریم NumberLong بجای ObjectId زیرا:

در حالی که ObjectId مقادیر باید در طول زمان افزایش یابد، آنها لزوما یکنواخت نیستند. این به این دلیل است که آنها:

تنها حاوی یک ثانیه وضوح زمانی است، بنابراین مقادیر ObjectId ایجاد شده در همان ثانیه دارای ترتیب تضمینی نیستند و توسط کلاینت‌هایی که ممکن است ساعت‌های سیستم متفاوتی داشته باشند، تولید می‌شوند.

در پیاده سازی سی شارپ خود، یک عدد ایجاد می کنیم Id با دقت میلی ثانیه و سفارش تضمینی بر اساس زمان درج. اگرچه ما به ترتیب پردازش دقیق در یک محیط چند مصرف کننده (مشابه RabbitMQ) نیاز نداریم، حفظ نظم FIFO هنگام کار فقط با یک مصرف کننده ضروری است. دستیابی به این امر با ObjectId قابل اجرا نیست اگر این برای شما حیاتی نیست، همچنان می توانید استفاده کنید ObjectId.

وضعیت ها

را Statuses ویژگی شامل یک آرایه حاوی تاریخچه پردازش پیام است. در شاخص 0، وضعیت فعلی را پیدا خواهید کرد که برای نمایه سازی بسیار مهم است.

شی وضعیت خود شامل سه ویژگی است:

  • Status: می تواند “در نوبت”، “پردازش”، “پردازش شده” یا “شکست خورده” باشد.
  • Timestamp: این نشان دهنده زمان فعلی است.
  • NextReevaluation: زمانی را ثبت می کند که ارزیابی بعدی باید انجام شود، که هم برای تلاش های مجدد و هم برای اجرای برنامه ریزی شده در آینده ضروری است.

ظرفیت ترابری

این ویژگی حاوی بار خاص پیام شما است.

در نوبت گذاشتن پیام

افزودن یک پیام یک عملیات درج ساده در مجموعه با وضعیت تنظیم شده است "Enqueued".

  • برای پردازش فوری، تنظیم کنید NextReevaluation باطل شدن
  • برای پردازش آینده، تنظیم کنید NextReevaluation به یک مهر زمانی در آینده، زمانی که می خواهید پیام شما پردازش شود.
db.yourQueueCollection.insert({
    "_id" : NumberLong(638269014234217933),
    "Statuses" : [
        {
            "Status" : "Enqueued",
            "Timestamp" : ISODate("2023-08-06T06:50:23.421+0000"),
            "NextReevaluation" : null
        }
    ],
    "Payload" : {
        "YourData" : "abc123"
    }
});
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

جدا کردن یک پیام

Dequeuing کمی پیچیده تر است اما هنوز نسبتاً ساده است. این به شدت به قابلیت های همزمان خواندن و به روز رسانی اتمی MongoDB متکی است.

این ویژگی ضروری MongoDB تضمین می کند:

  • هر پیام فقط یک بار پردازش می شود.
  • چندین مصرف کننده می توانند پیام ها را به طور همزمان با خیال راحت پردازش کنند.
db.yourQueueCollection.findAndModify({
   "query": {
      "$and": [
         {
            "Statuses.0.Status": "Enqueued"
         },
         {
            "Statuses.0.NextReevaluation": null
         }
      ]
   },
   "update": {
      "$push": {
         "Statuses": {
            "$each": [
               {
                  "Status": "Processing",
                  "Timestamp": ISODate("2023-08-06T06:50:23.800+0000"),
                  "NextReevaluation": null
               }
            ],
            "$position": 0
         }
      }
   }
});
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

بنابراین ما در حال خواندن یک پیام هستیم که در حالت است “Enqueued” و در عین حال با تنظیم وضعیت آن را اصلاح کنید “Processing” در موقعیت 0. از آنجایی که این عملیات اتمی است، تضمین می کند که پیام توسط مصرف کننده دیگری دریافت نخواهد شد.

علامت گذاری پیام به عنوان پردازش شده

هنگامی که پردازش پیام کامل شد، به روز رسانی وضعیت پیام به یک موضوع ساده است "Processed" با استفاده از پیام id.

db.yourQueueCollection.findAndModify({
   "query": {
     "_id": NumberLong(638269014234217933)
   },
   "update": {
      "$push": {
         "Statuses": {
            "$each": [
               {
                  "Status": "Processed",
                  "Timestamp": ISODate("2023-08-06T06:50:24.100+0000"),
                  "NextReevaluation": null
               }
            ],
            "$position": 0
         }
      }
   }
});
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

علامت گذاری یک پیام به عنوان ناموفق

اگر پردازش ناموفق بود، باید پیام را مطابق با آن علامت گذاری کنیم. اغلب، ممکن است بخواهید دوباره پردازش پیام را امتحان کنید. این را می توان با قرار دادن مجدد پیام به دست آورد. در بسیاری از سناریوها، بسته به ماهیت شکست پردازش، پردازش مجدد پیام پس از یک تاخیر خاص، مانند 10 ثانیه منطقی است.

db.yourQueueCollection.findAndModify({
   "query": {
     "_id": NumberLong(638269014234217933)
   },
   "update": {
      "$push": {
         "Statuses": {
            "$each": [
               {
                  "Status": "Failed",
                  "Timestamp": ISODate("2023-08-06T06:50:24.100+0000"),
                  "NextReevaluation": ISODate("2023-08-06T07:00:24.100+0000")
               }
            ],
            "$position": 0
         }
      }
   }
});

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

حلقه dequeuing

ما مشخص کرده‌ایم که چگونه می‌توانیم به راحتی آیتم‌ها را از «صف» خود، که در واقع صرفاً یک مجموعه MongoDB است، در صف قرار دهیم و از صف خارج کنیم. ما حتی می‌توانیم با استفاده از این پیام‌ها را برای آینده «زمان‌بندی» کنیم NextReevaluation رشته.

چیزی که کم است این است که چگونه مرتباً صف می کشیم. مصرف کنندگان باید آن را اجرا کنند findAndModify فرمان در نوعی حلقه یک رویکرد ساده ایجاد یک حلقه بی پایان است که در آن یک پیام را رد کرده و پردازش می کنیم. این روش ساده و موثر است. با این حال، فشار قابل توجهی بر پایگاه داده و شبکه وارد می کند.

یک جایگزین، معرفی یک تاخیر، به عنوان مثال، 100 میلی ثانیه، بین تکرارهای حلقه است. این امر بار را به میزان قابل توجهی کاهش می دهد، اما سرعت دک کردن را نیز کاهش می دهد.

راه حل مشکل همان چیزی است که MongoDB از آن به عنوان جریان تغییر یاد می کند.

MongoDB Change Streams

جریان های تغییر چیست؟ من نمی توانم آن را بهتر از بچه های MongoDB توضیح دهم:

جریان‌های تغییر به برنامه‌ها اجازه می‌دهد به تغییرات داده‌های بلادرنگ دسترسی داشته باشند […]. برنامه‌ها می‌توانند از جریان‌های تغییر برای اشتراک در همه تغییرات داده‌ها در یک مجموعه استفاده کنند […] و بلافاصله به آنها واکنش نشان می دهد.

عالی! کاری که ما می توانیم انجام دهیم این است که به اسناد جدید ایجاد شده در مجموعه صف خود گوش دهیم، که در واقع به معنای گوش دادن به پیام های جدید در نوبت است.

این کاملا ساده است:

const changeStream = db.yourQueueCollection.watch();
changeStream.on('insert', changeEvent => {
  // Dequeue the message
  db.yourQueueCollection.findAndModify({
    "query": changeEvent.documentKey._id,
    "update": {
      "$push": {
         "Statuses": {
            "$each": [
               {
                  "Status": "Processing",
                  "Timestamp": ISODate("2023-08-06T06:50:24.100+0000"),
                  "NextReevaluation": null
               }
            ],
            "$position": 0
         }
      }
   }
});
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

پیام های برنامه ریزی شده و یتیم

با این حال، رویکرد جریان تغییر برای پیام‌های برنامه‌ریزی‌شده و یتیم کار نمی‌کند، زیرا واضح است که هیچ تغییری وجود ندارد که بتوانیم به آن گوش دهیم.

  • پیام های برنامه ریزی شده به سادگی در مجموعه با وضعیت قرار می گیرند "Enqueued" و الف "NextReevaluation" زمینه برای آینده تنظیم شده است.
  • پیام های یتیم پیام هایی هستند که در "Processing" وضعیت زمانی که فرآیند مصرف کننده آنها از بین رفت. آنها با وضعیت در مجموعه باقی می مانند "Processing" اما هیچ مصرف کننده ای هرگز وضعیت خود را تغییر نخواهد داد "Processed" یا "Failed".

برای این موارد استفاده، باید به حلقه ساده خود برگردیم. با این حال، ما می توانیم از یک تاخیر نسبتاً سخاوتمندانه بین تکرارها استفاده کنیم.

پایگاه داده های “سنتی”، مانند MySQL، PostgreSQL، یا MongoDB (که به نظر من سنتی هستند)، امروزه فوق العاده قدرتمند هستند. اگر به درستی استفاده شود (مطمئن شوید که شاخص های شما بهینه شده اند!)، سریع هستند، به طرز چشمگیری مقیاس می شوند و در پلتفرم های میزبانی سنتی مقرون به صرفه هستند.

بسیاری از موارد استفاده را می توان تنها با استفاده از یک پایگاه داده و زبان برنامه نویسی دلخواه شما برطرف کرد. همیشه داشتن “ابزار مناسب برای کار مناسب” ضروری نیست، به این معنی که مجموعه متنوعی از ابزارها مانند Redis، Elasticsearch، RabbitMQ و غیره را نگهداری کنید.

اگرچه راه حل پیشنهادی ممکن است با عملکرد، به عنوان مثال، RabbitMQ مطابقت نداشته باشد، اما معمولاً کافی است و می تواند تا حدی باشد که موفقیت قابل توجهی را برای استارتاپ شما رقم بزند.

مهندسی نرم افزار در مورد پیمایش مبادلات است. مال خودت را عاقلانه انتخاب کن

نوشته های مشابه

دیدگاهتان را بنویسید

نشانی ایمیل شما منتشر نخواهد شد. بخش‌های موردنیاز علامت‌گذاری شده‌اند *

دکمه بازگشت به بالا