چرا ما یک صف پیام MongoDB ساختیم و چرخ را دوباره اختراع کردیم
Summarize this content to 400 words in Persian Lang
هی👋
من مدز کویست هستم، بنیانگذار All Quiet. ما یک صف پیام خانگی را بر اساس MongoDB پیاده سازی کرده ایم و من اینجا هستم تا در مورد آن صحبت کنم:
چرا چرخ را دوباره اختراع کردیم
چگونه چرخ را دوباره اختراع کردیم
چرا به صف پیام نیاز داریم؟
All Quiet یک پلت فرم مدرن مدیریت حوادث است که شبیه به PagerDuty است. پلتفرم ما به ویژگی هایی مانند:
پس از ثبت نام کاربر، یک ایمیل با انتخاب دوگانه به صورت ناهمزمان ارسال می شود
ارسال ایمیل یادآوری 24 ساعت پس از ثبت نام
ارسال اعلانهای فشاری با Firebase Cloud Messaging (FCM)، که ممکن است به دلیل مشکلات شبکه یا بارگذاری با شکست مواجه شود. از آنجایی که اعلانهای فشار برای برنامه ما بسیار مهم هستند، در صورت بروز مشکل، باید دوباره ارسال آنها را امتحان کنیم.
پذیرش ایمیل از خارج از ادغام ما و پردازش آنها به حوادث. این فرآیند ممکن است شکست بخورد، بنابراین ما میخواستیم آن را جدا کنیم و هر بار ایمیل را در یک صف پردازش کنیم.
پشته فناوری ما
برای درک نیازهای خاص ما، مهم است که در مورد پشته فناوری خود اطلاعاتی کسب کنیم:
ما یک برنامه وب یکپارچه را بر اساس NET Core 7 اجرا می کنیم. برنامه .NET Core در یک ظرف Docker اجرا می شود.
چندین کانتینر را به صورت موازی اجرا می کنیم.
یک نمونه HAProxy درخواست های HTTP را به طور مساوی در هر کانتینر توزیع می کند و از راه اندازی بسیار در دسترس اطمینان می دهد.
ما از MongoDB به عنوان پایگاه داده زیربنایی خود استفاده می کنیم که در مناطق در دسترس تکرار می شود.
همه اجزای فوق توسط AWS روی ماشین های مجازی EC2 عمومی میزبانی می شوند.
چرا چرخ را دوباره اختراع کردیم
ما یک مکانیسم صف ساده را می خواستیم که بتواند در چندین فرآیند به طور همزمان اجرا شود و در عین حال تضمین کند که هر پیام فقط یک بار پردازش می شود.
ما به الگوی میخانه/فرعی نیاز نداشتیم.
هدف ما یک سیستم توزیع پیچیده مبتنی بر منبع یابی CQRS / رویداد نبود، زیرا، می دانید، اولین قانون سیستم های توزیع شده توزیع نکردن است.
ما میخواستیم با پیروی از فلسفه انتخاب “تکنولوژی خسته کننده” همه چیز را تا حد امکان ساده نگه داریم.
در نهایت، این در مورد به حداقل رساندن تعداد قطعات متحرک در زیرساخت شما است. هدف ما ایجاد ویژگیهای فوقالعاده برای مشتریان عالیمان است و حفظ مطمئن خدمات خود ضروری است. مدیریت یک سیستم پایگاه داده واحد برای دستیابی به بیش از پنج نهن زمان آپتایم به اندازه کافی چالش برانگیز است. پس چرا خود را با مدیریت یک خوشه اضافی HA RabbitMQ متحمل شوید؟
چرا فقط از AWS SQS استفاده نمی کنید؟
بله… راه حل های ابری مانند AWS SQS، Google Cloud Tasks یا Azure Queue Storage فوق العاده هستند! با این حال، آنها منجر به قفل شدن فروشنده می شدند. ما صرفاً آرزوی مستقل بودن و مقرون به صرفه بودن را داریم و در عین حال خدماتی مقیاسپذیر به مشتریان خود ارائه میکنیم.
صف پیام چیست؟
صف پیام سیستمی است که پیام ها را ذخیره می کند. تولیدکنندگان پیامها این پیامها را در صف ذخیره میکنند، که بعداً توسط مصرفکنندگان برای پردازش از صف خارج میشوند. این برای جدا کردن اجزاء بسیار سودمند است، به خصوص زمانی که پردازش پیام ها یک کار منابع فشرده است.
صف ما باید چه ویژگی هایی را نشان دهد؟
استفاده از MongoDB به عنوان ذخیره سازی داده ما
تضمین این که هر پیام فقط یک بار مصرف شود
به چندین مصرف کننده اجازه می دهد پیام ها را به طور همزمان پردازش کنند
اطمینان از اینکه اگر پردازش پیام با شکست مواجه شود، امکان تکرار وجود دارد
امکان برنامه ریزی مصرف پیام برای آینده
بدون نیاز به سفارش تضمینی
اطمینان از در دسترس بودن بالا
اطمینان از بادوام بودن پیامها و وضعیت آنها و مقاومت در برابر راهاندازی مجدد یا توقف طولانیمدت
MongoDB در طول سال ها به طور قابل توجهی تکامل یافته است و می تواند معیارهای ذکر شده در بالا را برآورده کند.
پیاده سازی
در بخشهای بعدی، من شما را از طریق پیادهسازی اختصاصی MongoDB صف پیام خود راهنمایی میکنم. در حالی که شما به یک کتابخانه کلاینت مناسب برای زبان برنامه نویسی ترجیحی خود مانند NodeJS، Go، یا C# در مورد All Quiet نیاز دارید، مفاهیمی که من به اشتراک خواهم گذاشت، پلتفرم آگنوستیک هستند.
صف ها
هر صفی که می خواهید استفاده کنید به عنوان یک مجموعه اختصاصی در پایگاه داده MongoDB شما نشان داده می شود.مدل پیام
در اینجا یک مثال از یک پیام پردازش شده است:
{
“_id” : NumberLong(638269014234217933),
“Statuses” : [
{
“Status” : “Processed”,
“Timestamp” : ISODate(“2023-08-06T06:50:23.753+0000”),
“NextReevaluation” : null
},
{
“Status” : “Processing”,
“Timestamp” : ISODate(“2023-08-06T06:50:23.572+0000”),
“NextReevaluation” : null
},
{
“Status” : “Enqueued”,
“Timestamp” : ISODate(“2023-08-06T06:50:23.421+0000”),
“NextReevaluation” : null
}
],
“Payload” : {
“YourData” : “abc123”
}
}
وارد حالت تمام صفحه شوید
از حالت تمام صفحه خارج شوید
بیایید به هر ویژگی پیام نگاه کنیم.
_id
را _id فیلد ویژگی شناسه منحصر به فرد متعارف MongoDB است. در اینجا، شامل یک NumberLong، نه یک ObjectId . نیاز داریم NumberLong بجای ObjectId زیرا:
در حالی که ObjectId مقادیر باید در طول زمان افزایش یابد، آنها لزوما یکنواخت نیستند. این به این دلیل است که آنها:
تنها حاوی یک ثانیه وضوح زمانی است، بنابراین مقادیر ObjectId ایجاد شده در همان ثانیه دارای ترتیب تضمینی نیستند و توسط کلاینتهایی که ممکن است ساعتهای سیستم متفاوتی داشته باشند، تولید میشوند.
در پیاده سازی سی شارپ خود، یک عدد ایجاد می کنیم Id با دقت میلی ثانیه و سفارش تضمینی بر اساس زمان درج. اگرچه ما به ترتیب پردازش دقیق در یک محیط چند مصرف کننده (مشابه RabbitMQ) نیاز نداریم، حفظ نظم FIFO هنگام کار فقط با یک مصرف کننده ضروری است. دستیابی به این امر با ObjectId قابل اجرا نیست اگر این برای شما حیاتی نیست، همچنان می توانید استفاده کنید ObjectId.
وضعیت ها
را Statuses ویژگی شامل یک آرایه حاوی تاریخچه پردازش پیام است. در شاخص 0، وضعیت فعلی را پیدا خواهید کرد که برای نمایه سازی بسیار مهم است.
شی وضعیت خود شامل سه ویژگی است:
Status: می تواند “در نوبت”، “پردازش”، “پردازش شده” یا “شکست خورده” باشد.
Timestamp: این نشان دهنده زمان فعلی است.
NextReevaluation: زمانی را ثبت می کند که ارزیابی بعدی باید انجام شود، که هم برای تلاش های مجدد و هم برای اجرای برنامه ریزی شده در آینده ضروری است.
ظرفیت ترابری
این ویژگی حاوی بار خاص پیام شما است.
در نوبت گذاشتن پیام
افزودن یک پیام یک عملیات درج ساده در مجموعه با وضعیت تنظیم شده است “Enqueued”.
برای پردازش فوری، تنظیم کنید NextReevaluation باطل شدن
برای پردازش آینده، تنظیم کنید NextReevaluation به یک مهر زمانی در آینده، زمانی که می خواهید پیام شما پردازش شود.
db.yourQueueCollection.insert({
“_id” : NumberLong(638269014234217933),
“Statuses” : [
{
“Status” : “Enqueued”,
“Timestamp” : ISODate(“2023-08-06T06:50:23.421+0000”),
“NextReevaluation” : null
}
],
“Payload” : {
“YourData” : “abc123”
}
});
وارد حالت تمام صفحه شوید
از حالت تمام صفحه خارج شوید
جدا کردن یک پیام
Dequeuing کمی پیچیده تر است اما هنوز نسبتاً ساده است. این به شدت به قابلیت های همزمان خواندن و به روز رسانی اتمی MongoDB متکی است.
این ویژگی ضروری MongoDB تضمین می کند:
هر پیام فقط یک بار پردازش می شود.
چندین مصرف کننده می توانند پیام ها را به طور همزمان با خیال راحت پردازش کنند.
db.yourQueueCollection.findAndModify({
“query”: {
“$and”: [
{
“Statuses.0.Status”: “Enqueued”
},
{
“Statuses.0.NextReevaluation”: null
}
]
},
“update”: {
“$push”: {
“Statuses”: {
“$each”: [
{
“Status”: “Processing”,
“Timestamp”: ISODate(“2023-08-06T06:50:23.800+0000”),
“NextReevaluation”: null
}
],
“$position”: 0
}
}
}
});
وارد حالت تمام صفحه شوید
از حالت تمام صفحه خارج شوید
بنابراین ما در حال خواندن یک پیام هستیم که در حالت است “Enqueued” و در عین حال با تنظیم وضعیت آن را اصلاح کنید “Processing” در موقعیت 0. از آنجایی که این عملیات اتمی است، تضمین می کند که پیام توسط مصرف کننده دیگری دریافت نخواهد شد.
علامت گذاری پیام به عنوان پردازش شده
هنگامی که پردازش پیام کامل شد، به روز رسانی وضعیت پیام به یک موضوع ساده است “Processed” با استفاده از پیام id.
db.yourQueueCollection.findAndModify({
“query”: {
“_id”: NumberLong(638269014234217933)
},
“update”: {
“$push”: {
“Statuses”: {
“$each”: [
{
“Status”: “Processed”,
“Timestamp”: ISODate(“2023-08-06T06:50:24.100+0000”),
“NextReevaluation”: null
}
],
“$position”: 0
}
}
}
});
وارد حالت تمام صفحه شوید
از حالت تمام صفحه خارج شوید
علامت گذاری یک پیام به عنوان ناموفق
اگر پردازش ناموفق بود، باید پیام را مطابق با آن علامت گذاری کنیم. اغلب، ممکن است بخواهید دوباره پردازش پیام را امتحان کنید. این را می توان با قرار دادن مجدد پیام به دست آورد. در بسیاری از سناریوها، بسته به ماهیت شکست پردازش، پردازش مجدد پیام پس از یک تاخیر خاص، مانند 10 ثانیه منطقی است.
db.yourQueueCollection.findAndModify({
“query”: {
“_id”: NumberLong(638269014234217933)
},
“update”: {
“$push”: {
“Statuses”: {
“$each”: [
{
“Status”: “Failed”,
“Timestamp”: ISODate(“2023-08-06T06:50:24.100+0000”),
“NextReevaluation”: ISODate(“2023-08-06T07:00:24.100+0000”)
}
],
“$position”: 0
}
}
}
});
وارد حالت تمام صفحه شوید
از حالت تمام صفحه خارج شوید
حلقه dequeuing
ما مشخص کردهایم که چگونه میتوانیم به راحتی آیتمها را از «صف» خود، که در واقع صرفاً یک مجموعه MongoDB است، در صف قرار دهیم و از صف خارج کنیم. ما حتی میتوانیم با استفاده از این پیامها را برای آینده «زمانبندی» کنیم NextReevaluation رشته.
چیزی که کم است این است که چگونه مرتباً صف می کشیم. مصرف کنندگان باید آن را اجرا کنند findAndModify فرمان در نوعی حلقه یک رویکرد ساده ایجاد یک حلقه بی پایان است که در آن یک پیام را رد کرده و پردازش می کنیم. این روش ساده و موثر است. با این حال، فشار قابل توجهی بر پایگاه داده و شبکه وارد می کند.
یک جایگزین، معرفی یک تاخیر، به عنوان مثال، 100 میلی ثانیه، بین تکرارهای حلقه است. این امر بار را به میزان قابل توجهی کاهش می دهد، اما سرعت دک کردن را نیز کاهش می دهد.
راه حل مشکل همان چیزی است که MongoDB از آن به عنوان جریان تغییر یاد می کند.
MongoDB Change Streams
جریان های تغییر چیست؟ من نمی توانم آن را بهتر از بچه های MongoDB توضیح دهم:
جریانهای تغییر به برنامهها اجازه میدهد به تغییرات دادههای بلادرنگ دسترسی داشته باشند […]. برنامهها میتوانند از جریانهای تغییر برای اشتراک در همه تغییرات دادهها در یک مجموعه استفاده کنند […] و بلافاصله به آنها واکنش نشان می دهد.
عالی! کاری که ما می توانیم انجام دهیم این است که به اسناد جدید ایجاد شده در مجموعه صف خود گوش دهیم، که در واقع به معنای گوش دادن به پیام های جدید در نوبت است.
این کاملا ساده است:
const changeStream = db.yourQueueCollection.watch();
changeStream.on(‘insert’, changeEvent => {
// Dequeue the message
db.yourQueueCollection.findAndModify({
“query”: changeEvent.documentKey._id,
“update”: {
“$push”: {
“Statuses”: {
“$each”: [
{
“Status”: “Processing”,
“Timestamp”: ISODate(“2023-08-06T06:50:24.100+0000”),
“NextReevaluation”: null
}
],
“$position”: 0
}
}
}
});
وارد حالت تمام صفحه شوید
از حالت تمام صفحه خارج شوید
پیام های برنامه ریزی شده و یتیم
با این حال، رویکرد جریان تغییر برای پیامهای برنامهریزیشده و یتیم کار نمیکند، زیرا واضح است که هیچ تغییری وجود ندارد که بتوانیم به آن گوش دهیم.
پیام های برنامه ریزی شده به سادگی در مجموعه با وضعیت قرار می گیرند “Enqueued” و الف “NextReevaluation” زمینه برای آینده تنظیم شده است.
پیام های یتیم پیام هایی هستند که در “Processing” وضعیت زمانی که فرآیند مصرف کننده آنها از بین رفت. آنها با وضعیت در مجموعه باقی می مانند “Processing” اما هیچ مصرف کننده ای هرگز وضعیت خود را تغییر نخواهد داد “Processed” یا “Failed”.
برای این موارد استفاده، باید به حلقه ساده خود برگردیم. با این حال، ما می توانیم از یک تاخیر نسبتاً سخاوتمندانه بین تکرارها استفاده کنیم.
پایگاه داده های “سنتی”، مانند MySQL، PostgreSQL، یا MongoDB (که به نظر من سنتی هستند)، امروزه فوق العاده قدرتمند هستند. اگر به درستی استفاده شود (مطمئن شوید که شاخص های شما بهینه شده اند!)، سریع هستند، به طرز چشمگیری مقیاس می شوند و در پلتفرم های میزبانی سنتی مقرون به صرفه هستند.
بسیاری از موارد استفاده را می توان تنها با استفاده از یک پایگاه داده و زبان برنامه نویسی دلخواه شما برطرف کرد. همیشه داشتن “ابزار مناسب برای کار مناسب” ضروری نیست، به این معنی که مجموعه متنوعی از ابزارها مانند Redis، Elasticsearch، RabbitMQ و غیره را نگهداری کنید.
اگرچه راه حل پیشنهادی ممکن است با عملکرد، به عنوان مثال، RabbitMQ مطابقت نداشته باشد، اما معمولاً کافی است و می تواند تا حدی باشد که موفقیت قابل توجهی را برای استارتاپ شما رقم بزند.
مهندسی نرم افزار در مورد پیمایش مبادلات است. مال خودت را عاقلانه انتخاب کن
هی👋
من مدز کویست هستم، بنیانگذار All Quiet. ما یک صف پیام خانگی را بر اساس MongoDB پیاده سازی کرده ایم و من اینجا هستم تا در مورد آن صحبت کنم:
- چرا چرخ را دوباره اختراع کردیم
- چگونه چرخ را دوباره اختراع کردیم
چرا به صف پیام نیاز داریم؟
All Quiet یک پلت فرم مدرن مدیریت حوادث است که شبیه به PagerDuty است. پلتفرم ما به ویژگی هایی مانند:
- پس از ثبت نام کاربر، یک ایمیل با انتخاب دوگانه به صورت ناهمزمان ارسال می شود
- ارسال ایمیل یادآوری 24 ساعت پس از ثبت نام
- ارسال اعلانهای فشاری با Firebase Cloud Messaging (FCM)، که ممکن است به دلیل مشکلات شبکه یا بارگذاری با شکست مواجه شود. از آنجایی که اعلانهای فشار برای برنامه ما بسیار مهم هستند، در صورت بروز مشکل، باید دوباره ارسال آنها را امتحان کنیم.
- پذیرش ایمیل از خارج از ادغام ما و پردازش آنها به حوادث. این فرآیند ممکن است شکست بخورد، بنابراین ما میخواستیم آن را جدا کنیم و هر بار ایمیل را در یک صف پردازش کنیم.
پشته فناوری ما
برای درک نیازهای خاص ما، مهم است که در مورد پشته فناوری خود اطلاعاتی کسب کنیم:
- ما یک برنامه وب یکپارچه را بر اساس NET Core 7 اجرا می کنیم. برنامه .NET Core در یک ظرف Docker اجرا می شود.
- چندین کانتینر را به صورت موازی اجرا می کنیم.
- یک نمونه HAProxy درخواست های HTTP را به طور مساوی در هر کانتینر توزیع می کند و از راه اندازی بسیار در دسترس اطمینان می دهد.
- ما از MongoDB به عنوان پایگاه داده زیربنایی خود استفاده می کنیم که در مناطق در دسترس تکرار می شود.
- همه اجزای فوق توسط AWS روی ماشین های مجازی EC2 عمومی میزبانی می شوند.
چرا چرخ را دوباره اختراع کردیم
- ما یک مکانیسم صف ساده را می خواستیم که بتواند در چندین فرآیند به طور همزمان اجرا شود و در عین حال تضمین کند که هر پیام فقط یک بار پردازش می شود.
- ما به الگوی میخانه/فرعی نیاز نداشتیم.
- هدف ما یک سیستم توزیع پیچیده مبتنی بر منبع یابی CQRS / رویداد نبود، زیرا، می دانید، اولین قانون سیستم های توزیع شده توزیع نکردن است.
- ما میخواستیم با پیروی از فلسفه انتخاب “تکنولوژی خسته کننده” همه چیز را تا حد امکان ساده نگه داریم.
در نهایت، این در مورد به حداقل رساندن تعداد قطعات متحرک در زیرساخت شما است. هدف ما ایجاد ویژگیهای فوقالعاده برای مشتریان عالیمان است و حفظ مطمئن خدمات خود ضروری است. مدیریت یک سیستم پایگاه داده واحد برای دستیابی به بیش از پنج نهن زمان آپتایم به اندازه کافی چالش برانگیز است. پس چرا خود را با مدیریت یک خوشه اضافی HA RabbitMQ متحمل شوید؟
چرا فقط از AWS SQS استفاده نمی کنید؟
بله… راه حل های ابری مانند AWS SQS، Google Cloud Tasks یا Azure Queue Storage فوق العاده هستند! با این حال، آنها منجر به قفل شدن فروشنده می شدند. ما صرفاً آرزوی مستقل بودن و مقرون به صرفه بودن را داریم و در عین حال خدماتی مقیاسپذیر به مشتریان خود ارائه میکنیم.
صف پیام چیست؟
صف پیام سیستمی است که پیام ها را ذخیره می کند. تولیدکنندگان پیامها این پیامها را در صف ذخیره میکنند، که بعداً توسط مصرفکنندگان برای پردازش از صف خارج میشوند. این برای جدا کردن اجزاء بسیار سودمند است، به خصوص زمانی که پردازش پیام ها یک کار منابع فشرده است.
صف ما باید چه ویژگی هایی را نشان دهد؟
- استفاده از MongoDB به عنوان ذخیره سازی داده ما
- تضمین این که هر پیام فقط یک بار مصرف شود
- به چندین مصرف کننده اجازه می دهد پیام ها را به طور همزمان پردازش کنند
- اطمینان از اینکه اگر پردازش پیام با شکست مواجه شود، امکان تکرار وجود دارد
- امکان برنامه ریزی مصرف پیام برای آینده
- بدون نیاز به سفارش تضمینی
- اطمینان از در دسترس بودن بالا
- اطمینان از بادوام بودن پیامها و وضعیت آنها و مقاومت در برابر راهاندازی مجدد یا توقف طولانیمدت
MongoDB در طول سال ها به طور قابل توجهی تکامل یافته است و می تواند معیارهای ذکر شده در بالا را برآورده کند.
پیاده سازی
در بخشهای بعدی، من شما را از طریق پیادهسازی اختصاصی MongoDB صف پیام خود راهنمایی میکنم. در حالی که شما به یک کتابخانه کلاینت مناسب برای زبان برنامه نویسی ترجیحی خود مانند NodeJS، Go، یا C# در مورد All Quiet نیاز دارید، مفاهیمی که من به اشتراک خواهم گذاشت، پلتفرم آگنوستیک هستند.
صف ها
هر صفی که می خواهید استفاده کنید به عنوان یک مجموعه اختصاصی در پایگاه داده MongoDB شما نشان داده می شود.
مدل پیام
در اینجا یک مثال از یک پیام پردازش شده است:
{
"_id" : NumberLong(638269014234217933),
"Statuses" : [
{
"Status" : "Processed",
"Timestamp" : ISODate("2023-08-06T06:50:23.753+0000"),
"NextReevaluation" : null
},
{
"Status" : "Processing",
"Timestamp" : ISODate("2023-08-06T06:50:23.572+0000"),
"NextReevaluation" : null
},
{
"Status" : "Enqueued",
"Timestamp" : ISODate("2023-08-06T06:50:23.421+0000"),
"NextReevaluation" : null
}
],
"Payload" : {
"YourData" : "abc123"
}
}
بیایید به هر ویژگی پیام نگاه کنیم.
_id
را _id
فیلد ویژگی شناسه منحصر به فرد متعارف MongoDB است. در اینجا، شامل یک NumberLong
، نه یک ObjectId
. نیاز داریم NumberLong
بجای ObjectId
زیرا:
در حالی که ObjectId
مقادیر باید در طول زمان افزایش یابد، آنها لزوما یکنواخت نیستند. این به این دلیل است که آنها:
تنها حاوی یک ثانیه وضوح زمانی است، بنابراین مقادیر ObjectId ایجاد شده در همان ثانیه دارای ترتیب تضمینی نیستند و توسط کلاینتهایی که ممکن است ساعتهای سیستم متفاوتی داشته باشند، تولید میشوند.
در پیاده سازی سی شارپ خود، یک عدد ایجاد می کنیم Id
با دقت میلی ثانیه و سفارش تضمینی بر اساس زمان درج. اگرچه ما به ترتیب پردازش دقیق در یک محیط چند مصرف کننده (مشابه RabbitMQ) نیاز نداریم، حفظ نظم FIFO هنگام کار فقط با یک مصرف کننده ضروری است. دستیابی به این امر با ObjectId
قابل اجرا نیست اگر این برای شما حیاتی نیست، همچنان می توانید استفاده کنید ObjectId
.
وضعیت ها
را Statuses
ویژگی شامل یک آرایه حاوی تاریخچه پردازش پیام است. در شاخص 0
، وضعیت فعلی را پیدا خواهید کرد که برای نمایه سازی بسیار مهم است.
شی وضعیت خود شامل سه ویژگی است:
-
Status
: می تواند “در نوبت”، “پردازش”، “پردازش شده” یا “شکست خورده” باشد. -
Timestamp
: این نشان دهنده زمان فعلی است. -
NextReevaluation
: زمانی را ثبت می کند که ارزیابی بعدی باید انجام شود، که هم برای تلاش های مجدد و هم برای اجرای برنامه ریزی شده در آینده ضروری است.
ظرفیت ترابری
این ویژگی حاوی بار خاص پیام شما است.
در نوبت گذاشتن پیام
افزودن یک پیام یک عملیات درج ساده در مجموعه با وضعیت تنظیم شده است "Enqueued"
.
- برای پردازش فوری، تنظیم کنید
NextReevaluation
باطل شدن - برای پردازش آینده، تنظیم کنید
NextReevaluation
به یک مهر زمانی در آینده، زمانی که می خواهید پیام شما پردازش شود.
db.yourQueueCollection.insert({
"_id" : NumberLong(638269014234217933),
"Statuses" : [
{
"Status" : "Enqueued",
"Timestamp" : ISODate("2023-08-06T06:50:23.421+0000"),
"NextReevaluation" : null
}
],
"Payload" : {
"YourData" : "abc123"
}
});
جدا کردن یک پیام
Dequeuing کمی پیچیده تر است اما هنوز نسبتاً ساده است. این به شدت به قابلیت های همزمان خواندن و به روز رسانی اتمی MongoDB متکی است.
این ویژگی ضروری MongoDB تضمین می کند:
- هر پیام فقط یک بار پردازش می شود.
- چندین مصرف کننده می توانند پیام ها را به طور همزمان با خیال راحت پردازش کنند.
db.yourQueueCollection.findAndModify({
"query": {
"$and": [
{
"Statuses.0.Status": "Enqueued"
},
{
"Statuses.0.NextReevaluation": null
}
]
},
"update": {
"$push": {
"Statuses": {
"$each": [
{
"Status": "Processing",
"Timestamp": ISODate("2023-08-06T06:50:23.800+0000"),
"NextReevaluation": null
}
],
"$position": 0
}
}
}
});
بنابراین ما در حال خواندن یک پیام هستیم که در حالت است “Enqueued”
و در عین حال با تنظیم وضعیت آن را اصلاح کنید “Processing”
در موقعیت 0
. از آنجایی که این عملیات اتمی است، تضمین می کند که پیام توسط مصرف کننده دیگری دریافت نخواهد شد.
علامت گذاری پیام به عنوان پردازش شده
هنگامی که پردازش پیام کامل شد، به روز رسانی وضعیت پیام به یک موضوع ساده است "Processed"
با استفاده از پیام id
.
db.yourQueueCollection.findAndModify({
"query": {
"_id": NumberLong(638269014234217933)
},
"update": {
"$push": {
"Statuses": {
"$each": [
{
"Status": "Processed",
"Timestamp": ISODate("2023-08-06T06:50:24.100+0000"),
"NextReevaluation": null
}
],
"$position": 0
}
}
}
});
علامت گذاری یک پیام به عنوان ناموفق
اگر پردازش ناموفق بود، باید پیام را مطابق با آن علامت گذاری کنیم. اغلب، ممکن است بخواهید دوباره پردازش پیام را امتحان کنید. این را می توان با قرار دادن مجدد پیام به دست آورد. در بسیاری از سناریوها، بسته به ماهیت شکست پردازش، پردازش مجدد پیام پس از یک تاخیر خاص، مانند 10 ثانیه منطقی است.
db.yourQueueCollection.findAndModify({
"query": {
"_id": NumberLong(638269014234217933)
},
"update": {
"$push": {
"Statuses": {
"$each": [
{
"Status": "Failed",
"Timestamp": ISODate("2023-08-06T06:50:24.100+0000"),
"NextReevaluation": ISODate("2023-08-06T07:00:24.100+0000")
}
],
"$position": 0
}
}
}
});
حلقه dequeuing
ما مشخص کردهایم که چگونه میتوانیم به راحتی آیتمها را از «صف» خود، که در واقع صرفاً یک مجموعه MongoDB است، در صف قرار دهیم و از صف خارج کنیم. ما حتی میتوانیم با استفاده از این پیامها را برای آینده «زمانبندی» کنیم NextReevaluation
رشته.
چیزی که کم است این است که چگونه مرتباً صف می کشیم. مصرف کنندگان باید آن را اجرا کنند findAndModify
فرمان در نوعی حلقه یک رویکرد ساده ایجاد یک حلقه بی پایان است که در آن یک پیام را رد کرده و پردازش می کنیم. این روش ساده و موثر است. با این حال، فشار قابل توجهی بر پایگاه داده و شبکه وارد می کند.
یک جایگزین، معرفی یک تاخیر، به عنوان مثال، 100 میلی ثانیه، بین تکرارهای حلقه است. این امر بار را به میزان قابل توجهی کاهش می دهد، اما سرعت دک کردن را نیز کاهش می دهد.
راه حل مشکل همان چیزی است که MongoDB از آن به عنوان جریان تغییر یاد می کند.
MongoDB Change Streams
جریان های تغییر چیست؟ من نمی توانم آن را بهتر از بچه های MongoDB توضیح دهم:
جریانهای تغییر به برنامهها اجازه میدهد به تغییرات دادههای بلادرنگ دسترسی داشته باشند […]. برنامهها میتوانند از جریانهای تغییر برای اشتراک در همه تغییرات دادهها در یک مجموعه استفاده کنند […] و بلافاصله به آنها واکنش نشان می دهد.
عالی! کاری که ما می توانیم انجام دهیم این است که به اسناد جدید ایجاد شده در مجموعه صف خود گوش دهیم، که در واقع به معنای گوش دادن به پیام های جدید در نوبت است.
این کاملا ساده است:
const changeStream = db.yourQueueCollection.watch();
changeStream.on('insert', changeEvent => {
// Dequeue the message
db.yourQueueCollection.findAndModify({
"query": changeEvent.documentKey._id,
"update": {
"$push": {
"Statuses": {
"$each": [
{
"Status": "Processing",
"Timestamp": ISODate("2023-08-06T06:50:24.100+0000"),
"NextReevaluation": null
}
],
"$position": 0
}
}
}
});
پیام های برنامه ریزی شده و یتیم
با این حال، رویکرد جریان تغییر برای پیامهای برنامهریزیشده و یتیم کار نمیکند، زیرا واضح است که هیچ تغییری وجود ندارد که بتوانیم به آن گوش دهیم.
- پیام های برنامه ریزی شده به سادگی در مجموعه با وضعیت قرار می گیرند
"Enqueued"
و الف"NextReevaluation"
زمینه برای آینده تنظیم شده است. - پیام های یتیم پیام هایی هستند که در
"Processing"
وضعیت زمانی که فرآیند مصرف کننده آنها از بین رفت. آنها با وضعیت در مجموعه باقی می مانند"Processing"
اما هیچ مصرف کننده ای هرگز وضعیت خود را تغییر نخواهد داد"Processed"
یا"Failed"
.
برای این موارد استفاده، باید به حلقه ساده خود برگردیم. با این حال، ما می توانیم از یک تاخیر نسبتاً سخاوتمندانه بین تکرارها استفاده کنیم.
پایگاه داده های “سنتی”، مانند MySQL، PostgreSQL، یا MongoDB (که به نظر من سنتی هستند)، امروزه فوق العاده قدرتمند هستند. اگر به درستی استفاده شود (مطمئن شوید که شاخص های شما بهینه شده اند!)، سریع هستند، به طرز چشمگیری مقیاس می شوند و در پلتفرم های میزبانی سنتی مقرون به صرفه هستند.
بسیاری از موارد استفاده را می توان تنها با استفاده از یک پایگاه داده و زبان برنامه نویسی دلخواه شما برطرف کرد. همیشه داشتن “ابزار مناسب برای کار مناسب” ضروری نیست، به این معنی که مجموعه متنوعی از ابزارها مانند Redis، Elasticsearch، RabbitMQ و غیره را نگهداری کنید.
اگرچه راه حل پیشنهادی ممکن است با عملکرد، به عنوان مثال، RabbitMQ مطابقت نداشته باشد، اما معمولاً کافی است و می تواند تا حدی باشد که موفقیت قابل توجهی را برای استارتاپ شما رقم بزند.
مهندسی نرم افزار در مورد پیمایش مبادلات است. مال خودت را عاقلانه انتخاب کن