نحوه جلوگیری از شرایط مسابقه در سیستم Node.js با استفاده از Mutexes و RabbitMQ

Summarize this content to 400 words in Persian Lang
در سیستم های توزیع شده، اطمینان از سازگاری داده ها در چندین فرآیند همزمان بسیار مهم است. هنگام برخورد با عملیات حساس مانند برداشت حساب، شرایط مسابقه می تواند منجر به خرابی داده ها یا اختلافات مالی شود. این مقاله نحوه استفاده از mutexes (قفلهای حذف متقابل) با RabbitMQ و Node.js را برای مدیریت ایمن عملیات همزمان و در عین حال مدیریت کارآمد حافظه توضیح میدهد.
مشکل
سناریویی را تصور کنید که در آن چندین کارگر RabbitMQ درخواست های برداشت از یک حساب را به طور همزمان پردازش می کنند. بدون همگام سازی مناسب، دو کارگر ممکن است موجودی اولیه یکسانی را بخوانند، مبالغ متفاوتی را کم کنند و سپس حساب را به روز کنند که منجر به موجودی نادرست شود.
مثال شرایط مسابقه:
موجودی اولیه: 1000 دلار
کارگر A موجودی را می خواند: 1000 دلار
کارگر B موجودی را می خواند: 1000 دلار
کارگر A 300 دلار را کم می کند و موجودی را به روز می کند: 700 دلار
کارگر B 200 دلار را کم می کند و موجودی را به روز می کند: 800 دلار (رونویسی به روز رسانی کارگر A)
موجودی نهایی: 800 دلار (باید 500 دلار بود).
راه حل: استفاده از Mutexes
یک mutex تضمین می کند که فقط یک کارگر می تواند در هر زمان عملیات روی یک حساب خاص را پردازش کند. در اینجا نحوه پیاده سازی راه حل مبتنی بر mutex در سیستم Node.js مبتنی بر RabbitMQ آمده است.
پیاده سازی گام به گام
پیش نیازها
بسته های لازم را نصب کنید:
npm install async-mutex amqplib
وارد حالت تمام صفحه شوید
از حالت تمام صفحه خارج شوید
async-mutex: یک mutex با کاربری آسان برای عملیات ناهمزمان ارائه می دهد.
amqplib: تعامل با RabbitMQ را فعال می کند.
پیاده سازی کد
در زیر اجرای کامل، از جمله مدیریت حافظه قوی برای پاکسازی mutex آورده شده است.
الف مدیریت موتکس
ما از نقشه برای حفظ mutexes در هر حساب استفاده می کنیم. Mutexe ها در صورت درخواست ایجاد می شوند و زمانی که دیگر مورد نیاز نیستند حذف می شوند.
ب بهینه سازی حافظه
برای جلوگیری از نفخ حافظه، ما اجرا می کنیم:
Idle Timeouts: پس از 5 دقیقه mutexeها را برای حساب های غیرفعال به طور خودکار حذف کنید.
پاکسازی دوره ای: یک فرآیند پس زمینه تضمین می کند که mutexeهای قدیمی هر 1 دقیقه حذف می شوند.
ج پیاده سازی کامل
const { Mutex } = require(‘async-mutex’);
const amqp = require(‘amqplib’);
// Mutex for account operations with automatic cleanup
const accountMutexes = new Map(); // Store one mutex per account
const accountTimeouts = new Map(); // Store timeout references for cleanup
const CLEANUP_INTERVAL_MS = 60000; // 1-minute cleanup interval
const IDLE_TIMEOUT_MS = 300000; // 5-minute idle timeout per account
// Function to get or create a mutex for a specific account
function getAccountMutex(accountId) {
if (!accountMutexes.has(accountId)) {
const mutex = new Mutex();
accountMutexes.set(accountId, mutex);
resetAccountTimeout(accountId); // Start idle timeout cleanup
}
return accountMutexes.get(accountId);
}
// Function to reset idle timeout for an account
function resetAccountTimeout(accountId) {
if (accountTimeouts.has(accountId)) {
clearTimeout(accountTimeouts.get(accountId));
}
const timeout = setTimeout(() => {
accountMutexes.delete(accountId);
accountTimeouts.delete(accountId);
console.log(`Mutex for account ${accountId} removed due to inactivity.`);
}, IDLE_TIMEOUT_MS);
accountTimeouts.set(accountId, timeout);
}
// Periodic cleanup process
function startPeriodicCleanup() {
setInterval(() => {
accountTimeouts.forEach((_, accountId) => {
if (!accountMutexes.has(accountId)) {
accountTimeouts.delete(accountId);
}
});
}, CLEANUP_INTERVAL_MS);
console.log(`Periodic cleanup started: checking every ${CLEANUP_INTERVAL_MS / 1000} seconds.`);
}
// Simulated database of accounts
const accounts = {
“123”: { balance: 1000 },
“456”: { balance: 2000 },
};
// Process withdrawal
async function processWithdrawal(accountId, amount) {
const mutex = getAccountMutex(accountId);
const release = await mutex.acquire();
try {
console.log(`Processing withdrawal for account ${accountId}`);
const account = accounts[accountId];
if (!account) {
throw new Error(‘Account not found’);
}
if (account.balance < amount) {
throw new Error(‘Insufficient funds’);
}
account.balance -= amount;
console.log(`Withdrawal successful! New balance for account ${accountId}: ${account.balance}`);
} catch (error) {
console.error(`Error processing withdrawal for account ${accountId}:`, error.message);
} finally {
release();
resetAccountTimeout(accountId);
}
}
// RabbitMQ message handler
async function handleMessage(message) {
const { accountId, amount } = JSON.parse(message.content.toString());
await processWithdrawal(accountId, amount);
}
// Connect to RabbitMQ and consume messages
(async () => {
const connection = await amqp.connect(‘amqp://localhost’);
const channel = await connection.createChannel();
const queueName = ‘withdrawals’;
await channel.assertQueue(queueName, { durable: true });
console.log(`Waiting for messages in queue: ${queueName}`);
channel.consume(queueName, async (msg) => {
if (msg) {
await handleMessage(msg);
channel.ack(msg); // Acknowledge message after processing
}
});
startPeriodicCleanup(); // Start periodic cleanup
})();
وارد حالت تمام صفحه شوید
از حالت تمام صفحه خارج شوید
چگونه کار می کند
Mutex ویژه حساب:
هر حساب دارای mutex (accountMutexes) خود است که امکان همزمانی امن را برای حساب های مختلف فراهم می کند.
Mutexeها در صورت دسترسی برای اولین بار به صورت پویا ایجاد می شوند.
بخش بحرانی:
تابع processWithdrawal mutex را قفل می کند تا اطمینان حاصل شود که فقط یک کارگر می تواند موجودی حساب را تغییر دهد.
مدیریت حافظه:
Idle Timeout: mutexes را پس از 5 دقیقه عدم فعالیت حذف می کند.
پاکسازی دورهای: یک فرآیند پسزمینه هر دقیقه اجرا میشود تا mutexeهای قدیمی یا بیارجاع را پاک کند.
مزایا
پیشگیری از شرایط مسابقه:
اطمینان حاصل می کند که فقط یک کارگر در یک زمان برداشت های حساب معین را پردازش می کند.
مدیریت کارآمد حافظه:
به طور خودکار mutexe ها را برای حساب های غیر فعال حذف می کند و از نفخ حافظه جلوگیری می کند.
توان عملیاتی بالا:
پردازش همزمان حسابهای مختلف بیتأثیر است و مقیاسپذیری سیستم را حفظ میکند.
مدیریت خطاهای قوی:
مدیریت صحیح خطاهای حساب و آزادسازی قفل در بلوک نهایی تضمین می کند که سیستم ثابت می ماند.
خروجی نمونه
پیام های صف ورودی:
{ “accountId”: “123”, “amount”: 100 }
{ “accountId”: “456”, “amount”: 200 }
{ “accountId”: “123”, “amount”: 300 }
وارد حالت تمام صفحه شوید
از حالت تمام صفحه خارج شوید
خروجی کنسول:
Waiting for messages in queue: withdrawals
Periodic cleanup started: checking every 60 seconds.
Processing withdrawal for account 123
Withdrawal successful! New balance for account 123: 900
Processing withdrawal for account 456
Withdrawal successful! New balance for account 456: 1800
Processing withdrawal for account 123
Withdrawal successful! New balance for account 123: 600
Mutex for account 123 removed due to inactivity.
Mutex for account 456 removed due to inactivity.
وارد حالت تمام صفحه شوید
از حالت تمام صفحه خارج شوید
نتیجه گیری
با ترکیب mutexeها با RabbitMQ، می توانید با خیال راحت عملیات همزمان در سیستم های Node.js را مدیریت کنید.
اضافه کردن وقفه های بیکار و پاکسازی دوره ای مدیریت کارآمد حافظه را تضمین می کند و این راه حل را برای موارد استفاده در دنیای واقعی مقیاس پذیر و قوی می کند.
در سیستم های توزیع شده، اطمینان از سازگاری داده ها در چندین فرآیند همزمان بسیار مهم است. هنگام برخورد با عملیات حساس مانند برداشت حساب، شرایط مسابقه می تواند منجر به خرابی داده ها یا اختلافات مالی شود. این مقاله نحوه استفاده از mutexes (قفلهای حذف متقابل) با RabbitMQ و Node.js را برای مدیریت ایمن عملیات همزمان و در عین حال مدیریت کارآمد حافظه توضیح میدهد.
مشکل
سناریویی را تصور کنید که در آن چندین کارگر RabbitMQ درخواست های برداشت از یک حساب را به طور همزمان پردازش می کنند. بدون همگام سازی مناسب، دو کارگر ممکن است موجودی اولیه یکسانی را بخوانند، مبالغ متفاوتی را کم کنند و سپس حساب را به روز کنند که منجر به موجودی نادرست شود.
مثال شرایط مسابقه:
- موجودی اولیه: 1000 دلار
- کارگر A موجودی را می خواند: 1000 دلار
- کارگر B موجودی را می خواند: 1000 دلار
- کارگر A 300 دلار را کم می کند و موجودی را به روز می کند: 700 دلار
- کارگر B 200 دلار را کم می کند و موجودی را به روز می کند: 800 دلار (رونویسی به روز رسانی کارگر A)
موجودی نهایی: 800 دلار (باید 500 دلار بود).
راه حل: استفاده از Mutexes
یک mutex تضمین می کند که فقط یک کارگر می تواند در هر زمان عملیات روی یک حساب خاص را پردازش کند. در اینجا نحوه پیاده سازی راه حل مبتنی بر mutex در سیستم Node.js مبتنی بر RabbitMQ آمده است.
پیاده سازی گام به گام
- پیش نیازها
بسته های لازم را نصب کنید:
npm install async-mutex amqplib
- async-mutex: یک mutex با کاربری آسان برای عملیات ناهمزمان ارائه می دهد.
- amqplib: تعامل با RabbitMQ را فعال می کند.
- پیاده سازی کد
در زیر اجرای کامل، از جمله مدیریت حافظه قوی برای پاکسازی mutex آورده شده است.
الف مدیریت موتکس
ما از نقشه برای حفظ mutexes در هر حساب استفاده می کنیم. Mutexe ها در صورت درخواست ایجاد می شوند و زمانی که دیگر مورد نیاز نیستند حذف می شوند.
ب بهینه سازی حافظه
برای جلوگیری از نفخ حافظه، ما اجرا می کنیم:
- Idle Timeouts: پس از 5 دقیقه mutexeها را برای حساب های غیرفعال به طور خودکار حذف کنید.
- پاکسازی دوره ای: یک فرآیند پس زمینه تضمین می کند که mutexeهای قدیمی هر 1 دقیقه حذف می شوند.
ج پیاده سازی کامل
const { Mutex } = require('async-mutex');
const amqp = require('amqplib');
// Mutex for account operations with automatic cleanup
const accountMutexes = new Map(); // Store one mutex per account
const accountTimeouts = new Map(); // Store timeout references for cleanup
const CLEANUP_INTERVAL_MS = 60000; // 1-minute cleanup interval
const IDLE_TIMEOUT_MS = 300000; // 5-minute idle timeout per account
// Function to get or create a mutex for a specific account
function getAccountMutex(accountId) {
if (!accountMutexes.has(accountId)) {
const mutex = new Mutex();
accountMutexes.set(accountId, mutex);
resetAccountTimeout(accountId); // Start idle timeout cleanup
}
return accountMutexes.get(accountId);
}
// Function to reset idle timeout for an account
function resetAccountTimeout(accountId) {
if (accountTimeouts.has(accountId)) {
clearTimeout(accountTimeouts.get(accountId));
}
const timeout = setTimeout(() => {
accountMutexes.delete(accountId);
accountTimeouts.delete(accountId);
console.log(`Mutex for account ${accountId} removed due to inactivity.`);
}, IDLE_TIMEOUT_MS);
accountTimeouts.set(accountId, timeout);
}
// Periodic cleanup process
function startPeriodicCleanup() {
setInterval(() => {
accountTimeouts.forEach((_, accountId) => {
if (!accountMutexes.has(accountId)) {
accountTimeouts.delete(accountId);
}
});
}, CLEANUP_INTERVAL_MS);
console.log(`Periodic cleanup started: checking every ${CLEANUP_INTERVAL_MS / 1000} seconds.`);
}
// Simulated database of accounts
const accounts = {
"123": { balance: 1000 },
"456": { balance: 2000 },
};
// Process withdrawal
async function processWithdrawal(accountId, amount) {
const mutex = getAccountMutex(accountId);
const release = await mutex.acquire();
try {
console.log(`Processing withdrawal for account ${accountId}`);
const account = accounts[accountId];
if (!account) {
throw new Error('Account not found');
}
if (account.balance < amount) {
throw new Error('Insufficient funds');
}
account.balance -= amount;
console.log(`Withdrawal successful! New balance for account ${accountId}: ${account.balance}`);
} catch (error) {
console.error(`Error processing withdrawal for account ${accountId}:`, error.message);
} finally {
release();
resetAccountTimeout(accountId);
}
}
// RabbitMQ message handler
async function handleMessage(message) {
const { accountId, amount } = JSON.parse(message.content.toString());
await processWithdrawal(accountId, amount);
}
// Connect to RabbitMQ and consume messages
(async () => {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
const queueName = 'withdrawals';
await channel.assertQueue(queueName, { durable: true });
console.log(`Waiting for messages in queue: ${queueName}`);
channel.consume(queueName, async (msg) => {
if (msg) {
await handleMessage(msg);
channel.ack(msg); // Acknowledge message after processing
}
});
startPeriodicCleanup(); // Start periodic cleanup
})();
چگونه کار می کند
- Mutex ویژه حساب:
- هر حساب دارای mutex (accountMutexes) خود است که امکان همزمانی امن را برای حساب های مختلف فراهم می کند.
- Mutexeها در صورت دسترسی برای اولین بار به صورت پویا ایجاد می شوند.
- بخش بحرانی:
- تابع processWithdrawal mutex را قفل می کند تا اطمینان حاصل شود که فقط یک کارگر می تواند موجودی حساب را تغییر دهد.
- مدیریت حافظه:
- Idle Timeout: mutexes را پس از 5 دقیقه عدم فعالیت حذف می کند.
- پاکسازی دورهای: یک فرآیند پسزمینه هر دقیقه اجرا میشود تا mutexeهای قدیمی یا بیارجاع را پاک کند.
مزایا
- پیشگیری از شرایط مسابقه:
- اطمینان حاصل می کند که فقط یک کارگر در یک زمان برداشت های حساب معین را پردازش می کند.
- مدیریت کارآمد حافظه:
- به طور خودکار mutexe ها را برای حساب های غیر فعال حذف می کند و از نفخ حافظه جلوگیری می کند.
- توان عملیاتی بالا:
- پردازش همزمان حسابهای مختلف بیتأثیر است و مقیاسپذیری سیستم را حفظ میکند.
- مدیریت خطاهای قوی:
- مدیریت صحیح خطاهای حساب و آزادسازی قفل در بلوک نهایی تضمین می کند که سیستم ثابت می ماند.
خروجی نمونه
پیام های صف ورودی:
{ "accountId": "123", "amount": 100 }
{ "accountId": "456", "amount": 200 }
{ "accountId": "123", "amount": 300 }
خروجی کنسول:
Waiting for messages in queue: withdrawals
Periodic cleanup started: checking every 60 seconds.
Processing withdrawal for account 123
Withdrawal successful! New balance for account 123: 900
Processing withdrawal for account 456
Withdrawal successful! New balance for account 456: 1800
Processing withdrawal for account 123
Withdrawal successful! New balance for account 123: 600
Mutex for account 123 removed due to inactivity.
Mutex for account 456 removed due to inactivity.
نتیجه گیری
با ترکیب mutexeها با RabbitMQ، می توانید با خیال راحت عملیات همزمان در سیستم های Node.js را مدیریت کنید.
اضافه کردن وقفه های بیکار و پاکسازی دوره ای مدیریت کارآمد حافظه را تضمین می کند و این راه حل را برای موارد استفاده در دنیای واقعی مقیاس پذیر و قوی می کند.