برنامه نویسی

Node.js Walkthrough: ساخت یک برنامه ساده رویداد محور با کافکا

Summarize this content to 400 words in Persian Lang
آیا تا به حال فکر کرده اید که چگونه برخی از برنامه های مورد علاقه شما به روز رسانی های بلادرنگ را مدیریت می کنند؟ نمرات ورزشی زنده، تیک تیک های بازار سهام، یا حتی اعلان های رسانه های اجتماعی – همه آنها برای پردازش فوری داده ها به معماری رویداد محور (EDA) متکی هستند. EDA مانند یک مکالمه است که در آن هر قطعه اطلاعات جدیدی باعث واکنش فوری می شود. این چیزی است که یک برنامه کاربردی را تعاملی تر و پاسخگوتر می کند.

در این راهنما، ما شما را از طریق ایجاد یک برنامه کاربردی رویداد محور ساده با استفاده از آپاچی کافکا در Heroku راهنمایی می کنیم. پوشش خواهیم داد:

راه اندازی یک خوشه کافکا در هروکو
ساخت یک برنامه Node.js که رویدادها را تولید و مصرف می کند
استقرار برنامه شما در Heroku

آپاچی کافکا یک ابزار قدرتمند برای ساخت سیستم های EDA است. این یک پلت فرم منبع باز است که برای مدیریت فیدهای داده در زمان واقعی طراحی شده است. Apache Kafka on Heroku یک افزونه Heroku است که کافکا را به عنوان یک سرویس ارائه می دهد. Heroku استقرار و مدیریت برنامه ها را بسیار آسان می کند و من اخیراً از آن بیشتر در پروژه های خود استفاده کرده ام. هنگامی که می خواهید یک برنامه رویداد محور را اجرا کنید، ترکیب Kafka با Heroku فرآیند راه اندازی را ساده می کند.

در پایان این راهنما، یک برنامه در حال اجرا خواهید داشت که قدرت EDA را با آپاچی کافکا در هروکو نشان می دهد. بیا شروع کنیم!

شروع شدن

قبل از اینکه وارد کد شویم، اجازه دهید به سرعت برخی از مفاهیم اصلی را مرور کنیم. هنگامی که این موارد را درک کردید، دنبال کردن آنها آسان تر خواهد بود.

مناسبت ها بخش‌هایی از داده‌ها هستند که نشان‌دهنده‌ی رخدادهایی در سیستم هستند، مانند خواندن دما از یک سنسور.
موضوعات دسته ها یا کانال هایی هستند که رویدادها در آنها منتشر می شود. آنها را به عنوان موضوعاتی که در یک خبرنامه مشترک می شوید در نظر بگیرید.
تهیه کنندگان موجودیت هایی هستند که رویدادها را ایجاد و به موضوعات ارسال می کنند. در برنامه آزمایشی EDA ما، تولیدکنندگان ما مجموعه ای از سنسورهای آب و هوا خواهند بود.
مصرف کنندگان نهادهایی هستند که رویدادها را از موضوعات می خوانند و پردازش می کنند. برنامه ما یک مصرف کننده خواهد داشت که به رویدادهای داده های آب و هوا گوش می دهد و آنها را ثبت می کند.

معرفی اپلیکیشن ما

ما یک برنامه Node.js با استفاده از کتابخانه KafkaJS خواهیم ساخت. در اینجا یک نمای کلی از نحوه عملکرد برنامه ما آورده شده است:

حسگرهای آب و هوای ما (تولیدکنندگان) به صورت دوره ای داده هایی مانند دما، رطوبت و فشار هوا را تولید می کنند و این رویدادها را به آپاچی کافکا ارسال می کنند. برای اهداف آزمایشی، داده ها به صورت تصادفی تولید می شوند.
ما یک مصرف کننده خواهیم داشت که به موضوعات گوش می دهد. هنگامی که یک رویداد جدید دریافت می شود، داده ها را در یک گزارش می نویسد.
ما کل تنظیمات را در Heroku مستقر می‌کنیم و از گزارش‌های Heroku برای نظارت بر رویدادها هنگام وقوع استفاده می‌کنیم.

پیش نیازها

قبل از شروع، مطمئن شوید که موارد زیر را دارید:

یک حساب Heroku: اگر ندارید، در Heroku ثبت نام کنید.
Heroku CLI: Heroku CLI را دانلود و نصب کنید.
Node.js برای توسعه روی ماشین محلی شما نصب شده است. در دستگاه من از Node (v.20.9.0) و npm (10.4.0) استفاده می کنم.

پایگاه کد برای کل این پروژه در این مخزن GitHub موجود است. با خیال راحت کد را شبیه سازی کنید و در سراسر این پست دنبال کنید.

اکنون که اصول اولیه را پوشش دادیم، بیایید خوشه کافکا خود را در Heroku راه اندازی کنیم و شروع به ساختن کنیم.

راه اندازی یک خوشه کافکا در هروکو

بیایید همه چیز را در هروکو تنظیم کنیم. این یک فرآیند بسیار سریع و آسان است.

مرحله 1: از طریق Heroku CLI وارد شوید

~/project$ heroku login

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

مرحله 2: یک برنامه Heroku ایجاد کنید

~/project$ heroku create weather-eda

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

(من نام برنامه خود را Heroku گذاشته ام weather-eda، اما می توانید یک نام منحصر به فرد برای برنامه خود انتخاب کنید.)

مرحله 3: آپاچی کافکا را در افزونه Heroku اضافه کنید

~/project$ heroku addons:create heroku-kafka:basic-0

Creating heroku-kafka:basic-0 on ⬢ weather-eda… ~$0.139/hour (max $100/month)
The cluster should be available in a few minutes.
Run `heroku kafka:wait` to wait until the cluster is ready.
You can read more about managing Kafka at https://devcenter.heroku.com/articles/kafka-on-heroku#managing-kafka
kafka-adjacent-07560 is being created in the background. The app will restart when complete…
Use heroku addons:info kafka-adjacent-07560 to check creation progress
Use heroku addons:docs heroku-kafka to view documentation

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

می‌توانید اطلاعات بیشتری درباره افزونه Apache Kafka در Heroku در اینجا بیابید. برای نسخه ی نمایشی ما، سطح پایه 0 را به افزونه اضافه می کنم. هزینه افزودنی 0.139 دلار در ساعت است. همانطور که در حال ساخت این برنامه آزمایشی بودم، کمتر از یک ساعت از افزونه استفاده کردم و سپس آن را چرخاندم.

چند دقیقه طول می‌کشد تا هیروکو کافکا را برای شما آماده کند. خیلی زود، این چیزی است که خواهید دید:

~/project$ heroku addons:info kafka-adjacent-07560

=== kafka-adjacent-07560
Attachments: weather-eda::KAFKA
Installed at: Mon May 27 2024 11:44:37 GMT-0700 (Mountain Standard Time)
Max Price: $100/month
Owning app: weather-eda
Plan: heroku-kafka:basic-0
Price: ~$0.139/hour
State: created

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

مرحله 4: اعتبارنامه و تنظیمات کافکا را دریافت کنید

با چرخاندن خوشه کافکا، باید اعتبارنامه ها و سایر تنظیمات را دریافت کنیم. Heroku چندین متغیر پیکربندی را برای برنامه ما ایجاد می کند و آنها را با اطلاعاتی از خوشه کافکا که به تازگی ایجاد شده است پر می کند. با اجرای موارد زیر می توانیم همه این کانفیگ ها را ببینیم:

~/project$ heroku config
=== weather-eda Config Vars

KAFKA_CLIENT_CERT: —–BEGIN CERTIFICATE—–
MIIDQzCCAiugAwIBAgIBADANBgkqhkiG9w0BAQsFADAyMTAwLgYDVQQDDCdjYS1h

—–END CERTIFICATE—–

KAFKA_CLIENT_CERT_KEY: —–BEGIN RSA PRIVATE KEY—–
MIIEowIBAAKCAQEAsgv1oBiF4Az/IQsepHSh5pceL0XLy0uEAokD7ety9J0PTjj3

—–END RSA PRIVATE KEY—–

KAFKA_PREFIX: columbia-68051.
KAFKA_TRUSTED_CERT: —–BEGIN CERTIFICATE—–
MIIDfzCCAmegAwIBAgIBADANBgkqhkiG9w0BAQsFADAyMTAwLgYDVQQDDCdjYS1h

F+f3juViDqm4eLCZBAdoK/DnI4fFrNH3YzhAPdhoHOa8wi4=
—–END CERTIFICATE—–

KAFKA_URL: kafka+ssl://ec2-18-233-140-74.compute-1.amazonaws.com:9096,kafka+ssl://ec2-18-208-61-56.compute-1.amazonaws.com:9096…kafka+ssl://ec2-34-203-24-91.compute-1.amazonaws.com:9096

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

همانطور که می بینید، ما چندین متغیر پیکربندی داریم. ما یک فایل در پوشه ریشه پروژه خود می خواهیم به نام .env با تمام این مقادیر var config. برای این کار کافیست دستور زیر را اجرا کنیم:

~/project$ heroku config –shell > .env

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

ما .env فایل به شکل زیر است:

KAFKA_CLIENT_CERT=”—–BEGIN CERTIFICATE—–

—–END CERTIFICATE—–”
KAFKA_CLIENT_CERT_KEY=”—–BEGIN RSA PRIVATE KEY—–

—–END RSA PRIVATE KEY—–”
KAFKA_PREFIX=”columbia-68051.”
KAFKA_TRUSTED_CERT=”—–BEGIN CERTIFICATE—–

—–END CERTIFICATE—–”
KAFKA_URL=”kafka+ssl://ec2-18-233-140-74.compute-1.amazonaws.com:9096,kafka+ssl://ec2-18-208-61-56.compute-1.amazonaws.com:9096…kafka+ssl://ec2-34-203-24-91.compute-1.amazonaws.com:9096″

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

همچنین، ما حتماً .env را به فایل .gitignore خود اضافه می کنیم. ما نمی خواهیم این داده های حساس را به مخزن خود اختصاص دهیم.

مرحله 5: پلاگین کافکا را در Heroku CLI نصب کنید

Heroku CLI با دستورات مربوط به کافکا به طور مستقیم ارائه نمی شود. از آنجایی که ما از کافکا استفاده می کنیم، باید افزونه CLI را نصب کنیم.

~/project$ heroku plugins:install heroku-kafka
Installing plugin heroku-kafka… installed v2.12.0

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

اکنون، ما می توانیم خوشه کافکا خود را از CLI مدیریت کنیم.

~/project$ heroku kafka:info
=== KAFKA_URL
Plan: heroku-kafka:basic-0
Status: available
Version: 2.8.2
Created: 2024-05-27T18:44:38.023+00:00
Topics: [··········] 0 / 40 topics, see heroku kafka:topics
Prefix: columbia-68051.
Partitions: [··········] 0 / 240 partition replicas (partitions × replication factor)
Messages: 0 messages/s
Traffic: 0 bytes/s in / 0 bytes/s out
Data Size: [··········] 0 bytes / 4.00 GB (0.00%)
Add-on: kafka-adjacent-07560

~/project$ heroku kafka:topics
=== Kafka Topics on KAFKA_URL

No topics found on this Kafka cluster.
Use heroku kafka:topics:create to create a topic (limit 40)

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

مرحله 6: تعامل با خوشه را آزمایش کنید

درست به عنوان یک بررسی عقلانی، بیایید با خوشه کافکا خود بازی کنیم. ما با ایجاد یک موضوع شروع می کنیم.

~/project$ heroku kafka:topics:create test-topic-01
Creating topic test-topic-01 with compaction disabled and retention time 1 day on kafka-adjacent-07560… done
Use `heroku kafka:topics:info test-topic-01` to monitor your topic.
Your topic is using the prefix columbia-68051..

~/project$ heroku kafka:topics:info test-topic-01
▸ topic test-topic-01 is not available yet

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

در عرض یک دقیقه یا بیشتر، موضوع ما در دسترس می شود.

~/project$ heroku kafka:topics:info test-topic-01
=== kafka-adjacent-07560 :: test-topic-01

Topic Prefix: columbia-68051.
Producers: 0 messages/second (0 bytes/second) total
Consumers: 0 bytes/second total
Partitions: 8 partitions
Replication Factor: 3
Compaction: Compaction is disabled for test-topic-01
Retention: 24 hours

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

در مرحله بعد، در این پنجره ترمینال، ما به عنوان یک مصرف کننده عمل می کنیم و با دنبال کردن آن به این موضوع گوش می دهیم.

~/project$ heroku kafka:topics:tail test-topic-01

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

از اینجا، ترمینال به سادگی منتظر هر رویدادی است که در مورد موضوع منتشر شده است.

در یک پنجره ترمینال جداگانه، ما به عنوان یک تولید کننده عمل می کنیم و برخی از پیام ها را به موضوع منتشر می کنیم.

~/project$ heroku kafka:topics:write test-topic-01 “hello world!”

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

در پنجره ترمینال مصرف کننده ما، این چیزی است که می بینیم:

~/project$ heroku kafka:topics:tail test-topic-01
test-topic-01 0 0 12 hello world!

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

عالی! ما با موفقیت یک رویداد را برای یک موضوع در خوشه کافکا خود تولید و مصرف کرده ایم. ما آماده هستیم تا به برنامه Node.js خود برویم. بیایید این مبحث تست را نابود کنیم تا زمین بازی خود را مرتب نگه داریم.

~/project$ heroku kafka:topics:destroy test-topic-01
▸ This command will affect the cluster: kafka-adjacent-07560, which is on weather-eda
▸ To proceed, type weather-eda or re-run this command with –confirm weather-eda

> weather-eda
Deleting topic test-topic-01… done
Your topic has been marked for deletion, and will be removed from the cluster shortly

~/project$ heroku kafka:topics
=== Kafka Topics on KAFKA_URL

No topics found on this Kafka cluster.
Use heroku kafka:topics:create to create a topic (limit 40).

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

مرحله 7: کافکا را برای برنامه ما آماده کنید

برای آماده شدن برای برنامه کاربردی ما برای استفاده از کافکا، باید دو چیز ایجاد کنیم: یک موضوع و یک گروه مصرف کننده.

بیایید موضوعی را ایجاد کنیم که برنامه ما از آن استفاده خواهد کرد.

~/project$ heroku kafka:topics:create weather-data

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

در مرحله بعد، گروه مصرف کننده ای را ایجاد می کنیم که مصرف کننده برنامه ما بخشی از آن خواهد بود:

~/project$ heroku kafka:consumer-groups:create weather-consumers

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

ما آماده ساختن اپلیکیشن Node.js خود هستیم!

برنامه را بسازید

بیایید یک پروژه جدید را راه اندازی کنیم و وابستگی های خود را نصب کنیم.

~/project$ npm init -y
~/project$ npm install kafkajs dotenv @faker-js/faker pino pino-pretty

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

پروژه ما دو فرآیند در حال اجرا خواهد داشت:

consumer.js، که در موضوع مشترک است و هر رویدادی را که منتشر می شود ثبت می کند.
producer.js، که هر چند ثانیه یکبار برخی از داده های آب و هوای تصادفی را در این موضوع منتشر می کند.

هر دوی این فرآیندها برای اتصال به خوشه کافکا باید از KafkaJS استفاده کنند، بنابراین ما کد خود را مدولار می کنیم تا آن را قابل استفاده مجدد کنیم.

کار با مشتری کافکا

در پروژه src پوشه، یک فایل به نام ایجاد می کنیم kafka.js. به نظر می رسد این است:

const { Kafka } = require(‘kafkajs’);

const BROKER_URLS = process.env.KAFKA_URL.split(‘,’).map(uri => uri.replace(‘kafka+ssl://’,” ))
const TOPIC = `${process.env.KAFKA_PREFIX}weather-data`
const CONSUMER_GROUP = `${process.env.KAFKA_PREFIX}weather-consumers`

const kafka = new Kafka({
clientId: ‘weather-eda-app-nodejs-client’,
brokers: BROKER_URLS,
ssl: {
rejectUnauthorized: false,
ca: process.env.KAFKA_TRUSTED_CERT,
key: process.env.KAFKA_CLIENT_CERT_KEY,
cert: process.env.KAFKA_CLIENT_CERT,
},
})

const producer = async () => {
const p = kafka.producer()
await p.connect()
return p;
}

const consumer = async () => {
const c = kafka.consumer({
groupId: CONSUMER_GROUP,
sessionTimeout: 30000
})
await c.connect()
await c.subscribe({ topics: [TOPIC] });
return c;
}

module.exports = {
producer,
consumer,
topic: TOPIC,
groupId: CONSUMER_GROUP
};

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

در این فایل با ایجاد یک کلاینت جدید کافکا شروع می کنیم. این به URL هایی برای کارگزاران کافکا نیاز دارد که ما قادر به تجزیه آن ها هستیم KAFKA_URL متغیر در ما .env فایل (که در اصل از فراخوانی پیکربندی heroku به دست آمد). برای احراز هویت تلاش اتصال، باید ارائه دهیم KAFKA_TRUSTED_CERT، KAFKA_CLIENT_CERT_KEY، و KAFKA_CLIENT_CERT.

سپس، از مشتری کافکا خود، a را ایجاد می کنیم producer و الف consumer، مطمئن شوید که مشتری خود را مشترک می کنیم weather-data موضوع.

توضیح در مورد پیشوند کافکا

توجه کنید kafka.js که ما آماده می کنیم KAFKA_PREFIX به موضوع ما و نام گروه مصرف کننده. ما از پلن Basic 0 برای آپاچی کافکا در Heroku استفاده می کنیم که یک طرح کافکا چند مستاجر است. یعنی ما با a کار می کنیم KAFKA_PREFIX. با وجود اینکه ما موضوع خود را نامگذاری کردیم weather-data و گروه مصرف کننده ما weather-consumers، نام واقعی آنها در خوشه کافکا چند مستاجر ما باید دارای این باشد KAFKA_PREFIX به آنها تعلق گرفت (برای اطمینان از منحصر به فرد بودن آنها).

بنابراین، از نظر فنی، برای نسخه آزمایشی ما، نام اصلی موضوع است columbia-68051.weather-data، نه weather-data. (به همین ترتیب برای نام گروه مصرف کننده.)

فرآیند تولید کننده

اکنون، بیایید فرآیند پس زمینه خود را ایجاد کنیم که به عنوان تولید کننده سنسور آب و هوا عمل می کند. در پوشه ریشه پروژه ما یک فایل به نام داریم producer.js. به نظر می رسد این است:

require(‘dotenv’).config();
const kafka = require(‘./src/kafka.js’);
const { faker } = require(‘@faker-js/faker’);

const SENSORS = [‘sensor01′,’sensor02′,’sensor03′,’sensor04′,’sensor05’];
const MAX_DELAY_MS = 20000;
const READINGS = [‘temperature’,’humidity’,’barometric_pressure’];
const MAX_TEMP = 130;
const MIN_PRESSURE = 2910;
const PRESSURE_RANGE = 160;

const getRandom = (arr) => arr[faker.number.int(arr.length – 1)];

const getRandomReading = {
temperature: () => faker.number.int(MAX_TEMP) + (faker.number.int(100) / 100),
humidity: () => faker.number.int(100) / 100,
barometric_pressure: () => (MIN_PRESSURE + faker.number.int(PRESSURE_RANGE)) / 100
};

const sleep = (ms) => {
return new Promise((resolve) => {
setTimeout(resolve, ms);
});
};

(async () => {
const producer = await kafka.producer()

while(true) {
const sensor = getRandom(SENSORS)
const reading = getRandom(READINGS)
const value = getRandomReading[reading]()
const data = { reading, value }
await producer.send({
topic: kafka.topic,
messages: [{
key: sensor,
value: JSON.stringify(data)
}] })
await sleep(faker.number.int(MAX_DELAY_MS))
}
})()

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

بسیاری از کدهای موجود در فایل مربوط به تولید مقادیر تصادفی است. بخش های مهم را برجسته می کنم:

ما داشتن پنج سنسور مختلف آب و هوا را شبیه سازی می کنیم. نام آنها در یافت می شود SENSORS.
یک حسگر مقداری را برای یکی از سه قرائت ممکن منتشر می کند (منتشر می کند): temperature، humidity، یا barometric_pressure. را getRandomReading شی برای هر یک از این خوانش ها تابعی برای تولید یک مقدار متناظر منطقی دارد.
کل فرآیند به صورت یک اجرا می شود async تابع با بی نهایت while حلقه

در داخل while حلقه، ما:

a را انتخاب کنید sensor به صورت تصادفی.
a را انتخاب کنید reading به صورت تصادفی.
تصادفی ایجاد کنید value برای آن خواندن
زنگ زدن producer.send برای انتشار این داده ها به موضوع. را sensor به عنوان key برای رویداد، در حالی که reading و value پیام رویداد را تشکیل خواهد داد.
سپس، تا 20 ثانیه قبل از تکرار بعدی حلقه صبر می کنیم.

فرآیند مصرف کننده

فرآیند پس زمینه در consumer.js بسیار ساده تر است.

require(‘dotenv’).config();
const logger = require(‘./src/logger.js’);
const kafka = require(‘./src/kafka.js’);

(async () => {
const consumer = await kafka.consumer()
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const sensorId = message.key.toString()
const messageObj = JSON.parse(message.value.toString())
const logMessage = { sensorId }
logMessage[messageObj.reading] = messageObj.value
logger.info(logMessage)
}
})
})()

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

ما consumer قبلاً مشترک است weather-data موضوع. زنگ میزنیم consumer.run، و سپس یک کنترل کننده برای eachMessage. هر زمان که کافکا به او اطلاع می دهد consumer از یک پیام، پیام را ثبت می کند. این تمام چیزی است که در آن وجود دارد.

فرآیندها و Procfile

در package.json فایل، باید چند تا اضافه کنیم scripts که فرآیندهای پس زمینه تولید کننده و مصرف کننده ما را راه اندازی می کند. اکنون فایل باید شامل موارد زیر باشد:


“scripts”: {
“start”: “echo ‘do nothing'”,
“start:consumer”: “node consumer.js”,
“start:producer”: “node producer.js”
},

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

موارد مهم هستند start:consumer و start:producer. اما ما نگه می داریم start در فایل ما (حتی اگر هیچ چیز معنی‌داری انجام نمی‌دهد) زیرا سازنده Heroku انتظار دارد که آنجا باشد.

بعد، یک را ایجاد می کنیم Procfile که به Heroku می گوید چگونه کارگران مختلفی را که برای برنامه Heroku خود نیاز داریم راه اندازی کنیم. در پوشه ریشه پروژه ما، Procfile باید شبیه این باشد:

consumer_worker: npm run start:consumer
producer_worker: npm run start:producer

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

خیلی ساده، درست است؟ ما یک کارگر فرآیند پس زمینه را فراخوانی خواهیم داشت consumer_workerو دیگری زنگ زد producer_worker. متوجه خواهید شد که ما a نداریم web کارگر، چیزی است که معمولاً در آن می بینید Procfile برای یک برنامه وب برای برنامه Heroku ما فقط به دو کارگر پس زمینه نیاز داریم. ما نیاز نداریم web.

استقرار و آزمایش برنامه

با آن، تمام کد ما تنظیم می شود. ما تمام کد خود را به مخزن متعهد کرده ایم و آماده استقرار هستیم.

~/project$ git push heroku main

remote: —–> Build succeeded!

remote: —–> Compressing…
remote: Done: 48.6M
remote: —–> Launching…

remote: Verifying deploy… done

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

پس از استقرار، می خواهیم مطمئن شویم که داینوهای خود را به درستی مقیاس بندی کرده ایم. ما برای یک فرآیند وب نیازی به dyno نداریم، اما برای هر دو به یکی نیاز داریم consumer_worker و producer_worker. دستور زیر را اجرا می کنیم تا این فرآیندها را بر اساس نیاز خود تنظیم کنیم.

~/project$ heroku ps:scale web=0 consumer_worker=1 producer_worker=1
Scaling dynos… done, now running producer_worker at 1:Eco, consumer_worker at 1:Eco, web at 0:Eco

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

در حال حاضر، همه چیز باید راه اندازی و اجرا شود. پشت صحنه، ما producer_worker باید به خوشه کافکا متصل شود و سپس شروع به انتشار داده های حسگر آب و هوا هر چند ثانیه یکبار کند. سپس، ما consumer_worker باید به خوشه کافکا متصل شود و هر پیامی را که از موضوعی که در آن مشترک است دریافت می کند، ثبت کند.

تا ببینیم ما چیست consumer_worker در حال انجام است، می‌توانیم در لاگ‌های Heroku خود نگاه کنیم.

~/project$ heroku logs –tail

heroku[producer_worker.1]: Starting process with command `npm run start:producer`
heroku[producer_worker.1]: State changed from starting to up
app[producer_worker.1]:
app[producer_worker.1]: > weather-eda-kafka-heroku-node@1.0.0 start:producer
app[producer_worker.1]: > node producer.js
app[producer_worker.1]:

heroku[consumer_worker.1]: Starting process with command `npm run start:consumer`
heroku[consumer_worker.1]: State changed from starting to up
app[consumer_worker.1]:
app[consumer_worker.1]: > weather-eda-kafka-heroku-node@1.0.0 start:consumer
app[consumer_worker.1]: > node consumer.js
app[consumer_worker.1]:
app[consumer_worker.1]: {“level”:”INFO”,”timestamp”:”2024-05-28T02:31:20.660Z”,”logger”:”kafkajs”,”message”:”[Consumer] Starting”,”groupId”:”columbia-68051.weather-consumers”}
app[consumer_worker.1]: {“level”:”INFO”,”timestamp”:”2024-05-28T02:31:23.702Z”,”logger”:”kafkajs”,”message”:”[ConsumerGroup] Consumer has joined the group”,”groupId”:”columbia-68051.weather-consumers”,”memberId”:”weather-eda-app-nodejs-client-3ee5d1fa-eba9-4b59-826c-d3b924a6e4e4″,”leaderId”:”weather-eda-app-nodejs-client-3ee5d1fa-eba9-4b59-826c-d3b924a6e4e4″,”isLeader”:true,”memberAssignment”:{“columbia-68051.test-topic-1″:[0,1,2,3,4,5,6,7]},”groupProtocol”:”RoundRobinAssigner”,”duration”:3041}
app[consumer_worker.1]: [2024-05-28 02:31:23.755 +0000] INFO (21): {“sensorId”:”sensor01″,”temperature”:87.84}
app[consumer_worker.1]: [2024-05-28 02:31:23.764 +0000] INFO (21): {“sensorId”:”sensor01″,”humidity”:0.3}
app[consumer_worker.1]: [2024-05-28 02:31:23.777 +0000] INFO (21): {“sensorId”:”sensor03″,”temperature”:22.11}
app[consumer_worker.1]: [2024-05-28 02:31:37.773 +0000] INFO (21): {“sensorId”:”sensor01″,”barometric_pressure”:29.71}
app[consumer_worker.1]: [2024-05-28 02:31:54.495 +0000] INFO (21): {“sensorId”:”sensor05″,”barometric_pressure”:29.55}
app[consumer_worker.1]: [2024-05-28 02:32:02.629 +0000] INFO (21): {“sensorId”:”sensor04″,”temperature”:90.58}
app[consumer_worker.1]: [2024-05-28 02:32:03.995 +0000] INFO (21): {“sensorId”:”sensor02″,”barometric_pressure”:29.25}
app[consumer_worker.1]: [2024-05-28 02:32:12.688 +0000] INFO (21): {“sensorId”:”sensor04″,”humidity”:0.1}
app[consumer_worker.1]: [2024-05-28 02:32:32.127 +0000] INFO (21): {“sensorId”:”sensor01″,”humidity”:0.34}
app[consumer_worker.1]: [2024-05-28 02:32:32.851 +0000] INFO (21): {“sensorId”:”sensor02″,”humidity”:0.61}
app[consumer_worker.1]: [2024-05-28 02:32:37.200 +0000] INFO (21): {“sensorId”:”sensor01″,”barometric_pressure”:30.36}
app[consumer_worker.1]: [2024-05-28 02:32:50.388 +0000] INFO (21): {“sensorId”:”sensor03″,”temperature”:104.55}

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

کار می کند! ما می دانیم که تولید کننده ما به طور دوره ای پیام هایی را برای کافکا منتشر می کند زیرا مصرف کننده ما آنها را دریافت می کند و سپس آنها را ثبت می کند.

البته، در یک برنامه بزرگتر EDA، هر سنسور یک تولید کننده است. آنها ممکن است در چندین موضوع برای اهداف مختلف منتشر کنند، یا ممکن است همه آنها در یک موضوع منتشر کنند. و مصرف کننده شما می تواند در چندین موضوع مشترک شود. همچنین، در برنامه آزمایشی ما، مصرف کننده ما به سادگی مقدار زیادی را منتشر کرد eachMessage; اما در یک برنامه EDA، یک مصرف کننده ممکن است با تماس با یک API شخص ثالث، ارسال یک اعلان پیامکی، یا پرس و جو از پایگاه داده پاسخ دهد.

اکنون که درک اولیه ای از رویدادها، موضوعات، تولیدکنندگان و مصرف کنندگان دارید و می دانید که چگونه با کافکا کار کنید، می توانید شروع به طراحی و ساخت برنامه های کاربردی EDA خود کنید تا موارد استفاده تجاری پیچیده تر را برآورده کنید.

نتیجه

EDA بسیار قدرتمند است – شما می توانید سیستم های خود را جدا کنید در حالی که از ویژگی های کلیدی مانند مقیاس پذیری آسان و پردازش داده ها در زمان واقعی لذت می برید. برای EDA، کافکا یک ابزار کلیدی است که به شما کمک می‌کند تا به راحتی جریان‌های داده‌ای با توان عملیاتی بالا را مدیریت کنید. استفاده از آپاچی کافکا در Heroku به شما کمک می کند تا سریع شروع کنید. از آنجایی که این یک سرویس مدیریت شده است، لازم نیست نگران بخش های پیچیده مدیریت خوشه کافکا باشید. شما فقط می توانید بر روی ساخت برنامه های خود تمرکز کنید.

از اینجا، زمان آزمایش و نمونه سازی شما فرا رسیده است. موارد استفاده را شناسایی کنید که به خوبی با EDA مطابقت دارند. شیرجه بزنید، آن را روی Heroku آزمایش کنید و چیزی شگفت انگیز بسازید. کد نویسی مبارک!

آیا تا به حال فکر کرده اید که چگونه برخی از برنامه های مورد علاقه شما به روز رسانی های بلادرنگ را مدیریت می کنند؟ نمرات ورزشی زنده، تیک تیک های بازار سهام، یا حتی اعلان های رسانه های اجتماعی – همه آنها برای پردازش فوری داده ها به معماری رویداد محور (EDA) متکی هستند. EDA مانند یک مکالمه است که در آن هر قطعه اطلاعات جدیدی باعث واکنش فوری می شود. این چیزی است که یک برنامه کاربردی را تعاملی تر و پاسخگوتر می کند.

در این راهنما، ما شما را از طریق ایجاد یک برنامه کاربردی رویداد محور ساده با استفاده از آپاچی کافکا در Heroku راهنمایی می کنیم. پوشش خواهیم داد:

  • راه اندازی یک خوشه کافکا در هروکو

  • ساخت یک برنامه Node.js که رویدادها را تولید و مصرف می کند

  • استقرار برنامه شما در Heroku

آپاچی کافکا یک ابزار قدرتمند برای ساخت سیستم های EDA است. این یک پلت فرم منبع باز است که برای مدیریت فیدهای داده در زمان واقعی طراحی شده است. Apache Kafka on Heroku یک افزونه Heroku است که کافکا را به عنوان یک سرویس ارائه می دهد. Heroku استقرار و مدیریت برنامه ها را بسیار آسان می کند و من اخیراً از آن بیشتر در پروژه های خود استفاده کرده ام. هنگامی که می خواهید یک برنامه رویداد محور را اجرا کنید، ترکیب Kafka با Heroku فرآیند راه اندازی را ساده می کند.

در پایان این راهنما، یک برنامه در حال اجرا خواهید داشت که قدرت EDA را با آپاچی کافکا در هروکو نشان می دهد. بیا شروع کنیم!

شروع شدن

قبل از اینکه وارد کد شویم، اجازه دهید به سرعت برخی از مفاهیم اصلی را مرور کنیم. هنگامی که این موارد را درک کردید، دنبال کردن آنها آسان تر خواهد بود.

  • مناسبت ها بخش‌هایی از داده‌ها هستند که نشان‌دهنده‌ی رخدادهایی در سیستم هستند، مانند خواندن دما از یک سنسور.

  • موضوعات دسته ها یا کانال هایی هستند که رویدادها در آنها منتشر می شود. آنها را به عنوان موضوعاتی که در یک خبرنامه مشترک می شوید در نظر بگیرید.

  • تهیه کنندگان موجودیت هایی هستند که رویدادها را ایجاد و به موضوعات ارسال می کنند. در برنامه آزمایشی EDA ما، تولیدکنندگان ما مجموعه ای از سنسورهای آب و هوا خواهند بود.

  • مصرف کنندگان نهادهایی هستند که رویدادها را از موضوعات می خوانند و پردازش می کنند. برنامه ما یک مصرف کننده خواهد داشت که به رویدادهای داده های آب و هوا گوش می دهد و آنها را ثبت می کند.

معرفی اپلیکیشن ما

ما یک برنامه Node.js با استفاده از کتابخانه KafkaJS خواهیم ساخت. در اینجا یک نمای کلی از نحوه عملکرد برنامه ما آورده شده است:

  1. حسگرهای آب و هوای ما (تولیدکنندگان) به صورت دوره ای داده هایی مانند دما، رطوبت و فشار هوا را تولید می کنند و این رویدادها را به آپاچی کافکا ارسال می کنند. برای اهداف آزمایشی، داده ها به صورت تصادفی تولید می شوند.

  2. ما یک مصرف کننده خواهیم داشت که به موضوعات گوش می دهد. هنگامی که یک رویداد جدید دریافت می شود، داده ها را در یک گزارش می نویسد.

  3. ما کل تنظیمات را در Heroku مستقر می‌کنیم و از گزارش‌های Heroku برای نظارت بر رویدادها هنگام وقوع استفاده می‌کنیم.

پیش نیازها

قبل از شروع، مطمئن شوید که موارد زیر را دارید:

  • یک حساب Heroku: اگر ندارید، در Heroku ثبت نام کنید.

  • Heroku CLI: Heroku CLI را دانلود و نصب کنید.

  • Node.js برای توسعه روی ماشین محلی شما نصب شده است. در دستگاه من از Node (v.20.9.0) و npm (10.4.0) استفاده می کنم.

پایگاه کد برای کل این پروژه در این مخزن GitHub موجود است. با خیال راحت کد را شبیه سازی کنید و در سراسر این پست دنبال کنید.

اکنون که اصول اولیه را پوشش دادیم، بیایید خوشه کافکا خود را در Heroku راه اندازی کنیم و شروع به ساختن کنیم.

راه اندازی یک خوشه کافکا در هروکو

بیایید همه چیز را در هروکو تنظیم کنیم. این یک فرآیند بسیار سریع و آسان است.

مرحله 1: از طریق Heroku CLI وارد شوید

~/project$ heroku login
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

مرحله 2: یک برنامه Heroku ایجاد کنید

~/project$ heroku create weather-eda
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

(من نام برنامه خود را Heroku گذاشته ام weather-eda، اما می توانید یک نام منحصر به فرد برای برنامه خود انتخاب کنید.)

مرحله 3: آپاچی کافکا را در افزونه Heroku اضافه کنید

~/project$ heroku addons:create heroku-kafka:basic-0

Creating heroku-kafka:basic-0 on ⬢ weather-eda... ~$0.139/hour (max $100/month)
The cluster should be available in a few minutes.
Run `heroku kafka:wait` to wait until the cluster is ready.
You can read more about managing Kafka at https://devcenter.heroku.com/articles/kafka-on-heroku#managing-kafka
kafka-adjacent-07560 is being created in the background. The app will restart when complete...
Use heroku addons:info kafka-adjacent-07560 to check creation progress
Use heroku addons:docs heroku-kafka to view documentation
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

می‌توانید اطلاعات بیشتری درباره افزونه Apache Kafka در Heroku در اینجا بیابید. برای نسخه ی نمایشی ما، سطح پایه 0 را به افزونه اضافه می کنم. هزینه افزودنی 0.139 دلار در ساعت است. همانطور که در حال ساخت این برنامه آزمایشی بودم، کمتر از یک ساعت از افزونه استفاده کردم و سپس آن را چرخاندم.

چند دقیقه طول می‌کشد تا هیروکو کافکا را برای شما آماده کند. خیلی زود، این چیزی است که خواهید دید:

~/project$ heroku addons:info kafka-adjacent-07560

=== kafka-adjacent-07560
Attachments:  weather-eda::KAFKA
Installed at: Mon May 27 2024 11:44:37 GMT-0700 (Mountain Standard Time)
Max Price:    $100/month
Owning app:   weather-eda
Plan:         heroku-kafka:basic-0
Price:        ~$0.139/hour
State:        created
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

مرحله 4: اعتبارنامه و تنظیمات کافکا را دریافت کنید

با چرخاندن خوشه کافکا، باید اعتبارنامه ها و سایر تنظیمات را دریافت کنیم. Heroku چندین متغیر پیکربندی را برای برنامه ما ایجاد می کند و آنها را با اطلاعاتی از خوشه کافکا که به تازگی ایجاد شده است پر می کند. با اجرای موارد زیر می توانیم همه این کانفیگ ها را ببینیم:

~/project$ heroku config
=== weather-eda Config Vars

KAFKA_CLIENT_CERT:     -----BEGIN CERTIFICATE-----
MIIDQzCCAiugAwIBAgIBADANBgkqhkiG9w0BAQsFADAyMTAwLgYDVQQDDCdjYS1h
...
-----END CERTIFICATE-----

KAFKA_CLIENT_CERT_KEY: -----BEGIN RSA PRIVATE KEY-----
MIIEowIBAAKCAQEAsgv1oBiF4Az/IQsepHSh5pceL0XLy0uEAokD7ety9J0PTjj3
...
-----END RSA PRIVATE KEY-----

KAFKA_PREFIX:          columbia-68051.
KAFKA_TRUSTED_CERT:    -----BEGIN CERTIFICATE-----
MIIDfzCCAmegAwIBAgIBADANBgkqhkiG9w0BAQsFADAyMTAwLgYDVQQDDCdjYS1h
...
F+f3juViDqm4eLCZBAdoK/DnI4fFrNH3YzhAPdhoHOa8wi4=
-----END CERTIFICATE-----

KAFKA_URL:             kafka+ssl://ec2-18-233-140-74.compute-1.amazonaws.com:9096,kafka+ssl://ec2-18-208-61-56.compute-1.amazonaws.com:9096...kafka+ssl://ec2-34-203-24-91.compute-1.amazonaws.com:9096
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

همانطور که می بینید، ما چندین متغیر پیکربندی داریم. ما یک فایل در پوشه ریشه پروژه خود می خواهیم به نام .env با تمام این مقادیر var config. برای این کار کافیست دستور زیر را اجرا کنیم:

~/project$ heroku config --shell > .env
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

ما .env فایل به شکل زیر است:

KAFKA_CLIENT_CERT="-----BEGIN CERTIFICATE-----
...
-----END CERTIFICATE-----"
KAFKA_CLIENT_CERT_KEY="-----BEGIN RSA PRIVATE KEY-----
...
-----END RSA PRIVATE KEY-----"
KAFKA_PREFIX="columbia-68051."
KAFKA_TRUSTED_CERT="-----BEGIN CERTIFICATE-----
...
-----END CERTIFICATE-----"
KAFKA_URL="kafka+ssl://ec2-18-233-140-74.compute-1.amazonaws.com:9096,kafka+ssl://ec2-18-208-61-56.compute-1.amazonaws.com:9096...kafka+ssl://ec2-34-203-24-91.compute-1.amazonaws.com:9096"
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

همچنین، ما حتماً .env را به فایل .gitignore خود اضافه می کنیم. ما نمی خواهیم این داده های حساس را به مخزن خود اختصاص دهیم.

مرحله 5: پلاگین کافکا را در Heroku CLI نصب کنید

Heroku CLI با دستورات مربوط به کافکا به طور مستقیم ارائه نمی شود. از آنجایی که ما از کافکا استفاده می کنیم، باید افزونه CLI را نصب کنیم.

~/project$ heroku plugins:install heroku-kafka
Installing plugin heroku-kafka... installed v2.12.0
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

اکنون، ما می توانیم خوشه کافکا خود را از CLI مدیریت کنیم.

~/project$ heroku kafka:info
=== KAFKA_URL
Plan:       heroku-kafka:basic-0
Status:     available
Version:    2.8.2
Created:    2024-05-27T18:44:38.023+00:00
Topics:     [··········] 0 / 40 topics, see heroku kafka:topics
Prefix:     columbia-68051.
Partitions: [··········] 0 / 240 partition replicas (partitions × replication factor)
Messages:   0 messages/s
Traffic:    0 bytes/s in / 0 bytes/s out
Data Size:  [··········] 0 bytes / 4.00 GB (0.00%)
Add-on:     kafka-adjacent-07560

~/project$ heroku kafka:topics
=== Kafka Topics on KAFKA_URL

No topics found on this Kafka cluster.
Use heroku kafka:topics:create to create a topic (limit 40)
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

مرحله 6: تعامل با خوشه را آزمایش کنید

درست به عنوان یک بررسی عقلانی، بیایید با خوشه کافکا خود بازی کنیم. ما با ایجاد یک موضوع شروع می کنیم.

~/project$ heroku kafka:topics:create test-topic-01
Creating topic test-topic-01 with compaction disabled and retention time 1 day on kafka-adjacent-07560... done
Use `heroku kafka:topics:info test-topic-01` to monitor your topic.
Your topic is using the prefix columbia-68051..

~/project$ heroku kafka:topics:info test-topic-01
 ▸    topic test-topic-01 is not available yet
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

در عرض یک دقیقه یا بیشتر، موضوع ما در دسترس می شود.

~/project$ heroku kafka:topics:info test-topic-01
=== kafka-adjacent-07560 :: test-topic-01

Topic Prefix:       columbia-68051.
Producers:          0 messages/second (0 bytes/second) total
Consumers:          0 bytes/second total
Partitions:         8 partitions
Replication Factor: 3
Compaction:         Compaction is disabled for test-topic-01
Retention:          24 hours
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

در مرحله بعد، در این پنجره ترمینال، ما به عنوان یک مصرف کننده عمل می کنیم و با دنبال کردن آن به این موضوع گوش می دهیم.

~/project$ heroku kafka:topics:tail test-topic-01
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

از اینجا، ترمینال به سادگی منتظر هر رویدادی است که در مورد موضوع منتشر شده است.

در یک پنجره ترمینال جداگانه، ما به عنوان یک تولید کننده عمل می کنیم و برخی از پیام ها را به موضوع منتشر می کنیم.

~/project$ heroku kafka:topics:write test-topic-01 "hello world!"
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

در پنجره ترمینال مصرف کننده ما، این چیزی است که می بینیم:

~/project$ heroku kafka:topics:tail test-topic-01
test-topic-01 0 0 12 hello world!
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

عالی! ما با موفقیت یک رویداد را برای یک موضوع در خوشه کافکا خود تولید و مصرف کرده ایم. ما آماده هستیم تا به برنامه Node.js خود برویم. بیایید این مبحث تست را نابود کنیم تا زمین بازی خود را مرتب نگه داریم.

~/project$ heroku kafka:topics:destroy test-topic-01
 ▸    This command will affect the cluster: kafka-adjacent-07560, which is on weather-eda
 ▸    To proceed, type weather-eda or re-run this command with --confirm weather-eda

> weather-eda
Deleting topic test-topic-01... done
Your topic has been marked for deletion, and will be removed from the cluster shortly

~/project$ heroku kafka:topics
=== Kafka Topics on KAFKA_URL

No topics found on this Kafka cluster.
Use heroku kafka:topics:create to create a topic (limit 40).
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

مرحله 7: کافکا را برای برنامه ما آماده کنید

برای آماده شدن برای برنامه کاربردی ما برای استفاده از کافکا، باید دو چیز ایجاد کنیم: یک موضوع و یک گروه مصرف کننده.

بیایید موضوعی را ایجاد کنیم که برنامه ما از آن استفاده خواهد کرد.

~/project$ heroku kafka:topics:create weather-data
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

در مرحله بعد، گروه مصرف کننده ای را ایجاد می کنیم که مصرف کننده برنامه ما بخشی از آن خواهد بود:

~/project$ heroku kafka:consumer-groups:create weather-consumers
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

ما آماده ساختن اپلیکیشن Node.js خود هستیم!

برنامه را بسازید

بیایید یک پروژه جدید را راه اندازی کنیم و وابستگی های خود را نصب کنیم.

~/project$ npm init -y
~/project$ npm install kafkajs dotenv @faker-js/faker pino pino-pretty
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

پروژه ما دو فرآیند در حال اجرا خواهد داشت:

  1. consumer.js، که در موضوع مشترک است و هر رویدادی را که منتشر می شود ثبت می کند.

  2. producer.js، که هر چند ثانیه یکبار برخی از داده های آب و هوای تصادفی را در این موضوع منتشر می کند.

هر دوی این فرآیندها برای اتصال به خوشه کافکا باید از KafkaJS استفاده کنند، بنابراین ما کد خود را مدولار می کنیم تا آن را قابل استفاده مجدد کنیم.

کار با مشتری کافکا

در پروژه src پوشه، یک فایل به نام ایجاد می کنیم kafka.js. به نظر می رسد این است:

const { Kafka } = require('kafkajs');

const BROKER_URLS = process.env.KAFKA_URL.split(',').map(uri => uri.replace('kafka+ssl://','' ))
const TOPIC = `${process.env.KAFKA_PREFIX}weather-data`
const CONSUMER_GROUP = `${process.env.KAFKA_PREFIX}weather-consumers`

const kafka = new Kafka({
  clientId: 'weather-eda-app-nodejs-client',
  brokers: BROKER_URLS,
  ssl: {
     rejectUnauthorized: false,
     ca: process.env.KAFKA_TRUSTED_CERT,
     key: process.env.KAFKA_CLIENT_CERT_KEY,
     cert: process.env.KAFKA_CLIENT_CERT,
  },
})

const producer = async () => {
  const p = kafka.producer()
  await p.connect()
  return p;
}

const consumer = async () => {
  const c = kafka.consumer({
    groupId: CONSUMER_GROUP,
    sessionTimeout: 30000
  })
  await c.connect()
  await c.subscribe({ topics: [TOPIC] });
  return c;
}

module.exports = {
  producer,
  consumer,
  topic: TOPIC,
  groupId: CONSUMER_GROUP
};
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

در این فایل با ایجاد یک کلاینت جدید کافکا شروع می کنیم. این به URL هایی برای کارگزاران کافکا نیاز دارد که ما قادر به تجزیه آن ها هستیم KAFKA_URL متغیر در ما .env فایل (که در اصل از فراخوانی پیکربندی heroku به دست آمد). برای احراز هویت تلاش اتصال، باید ارائه دهیم KAFKA_TRUSTED_CERT، KAFKA_CLIENT_CERT_KEY، و KAFKA_CLIENT_CERT.

سپس، از مشتری کافکا خود، a را ایجاد می کنیم producer و الف consumer، مطمئن شوید که مشتری خود را مشترک می کنیم weather-data موضوع.

توضیح در مورد پیشوند کافکا

توجه کنید kafka.js که ما آماده می کنیم KAFKA_PREFIX به موضوع ما و نام گروه مصرف کننده. ما از پلن Basic 0 برای آپاچی کافکا در Heroku استفاده می کنیم که یک طرح کافکا چند مستاجر است. یعنی ما با a کار می کنیم KAFKA_PREFIX. با وجود اینکه ما موضوع خود را نامگذاری کردیم weather-data و گروه مصرف کننده ما weather-consumers، نام واقعی آنها در خوشه کافکا چند مستاجر ما باید دارای این باشد KAFKA_PREFIX به آنها تعلق گرفت (برای اطمینان از منحصر به فرد بودن آنها).

بنابراین، از نظر فنی، برای نسخه آزمایشی ما، نام اصلی موضوع است columbia-68051.weather-data، نه weather-data. (به همین ترتیب برای نام گروه مصرف کننده.)

فرآیند تولید کننده

اکنون، بیایید فرآیند پس زمینه خود را ایجاد کنیم که به عنوان تولید کننده سنسور آب و هوا عمل می کند. در پوشه ریشه پروژه ما یک فایل به نام داریم producer.js. به نظر می رسد این است:

require('dotenv').config();
const kafka = require('./src/kafka.js');
const { faker } = require('@faker-js/faker');

const SENSORS = ['sensor01','sensor02','sensor03','sensor04','sensor05'];
const MAX_DELAY_MS = 20000;
const READINGS = ['temperature','humidity','barometric_pressure'];
const MAX_TEMP = 130;
const MIN_PRESSURE = 2910;
const PRESSURE_RANGE = 160;

const getRandom = (arr) => arr[faker.number.int(arr.length - 1)];

const getRandomReading = {
  temperature: () => faker.number.int(MAX_TEMP) + (faker.number.int(100) / 100),
  humidity: () => faker.number.int(100) / 100,
  barometric_pressure: () => (MIN_PRESSURE + faker.number.int(PRESSURE_RANGE)) / 100
};

const sleep = (ms) => {
  return new Promise((resolve) => {
    setTimeout(resolve, ms);
  });
};

(async () => {
  const producer = await kafka.producer()

  while(true) {
    const sensor = getRandom(SENSORS)
    const reading = getRandom(READINGS)
    const value = getRandomReading[reading]()
    const data = { reading, value }
    await producer.send({
      topic: kafka.topic,
      messages: [{
        key: sensor,
        value: JSON.stringify(data)
      }]
    })
    await sleep(faker.number.int(MAX_DELAY_MS))
  }
})()
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

بسیاری از کدهای موجود در فایل مربوط به تولید مقادیر تصادفی است. بخش های مهم را برجسته می کنم:

  • ما داشتن پنج سنسور مختلف آب و هوا را شبیه سازی می کنیم. نام آنها در یافت می شود SENSORS.

  • یک حسگر مقداری را برای یکی از سه قرائت ممکن منتشر می کند (منتشر می کند): temperature، humidity، یا barometric_pressure. را getRandomReading شی برای هر یک از این خوانش ها تابعی برای تولید یک مقدار متناظر منطقی دارد.

  • کل فرآیند به صورت یک اجرا می شود async تابع با بی نهایت while حلقه

در داخل while حلقه، ما:

  • a را انتخاب کنید sensor به صورت تصادفی.

  • a را انتخاب کنید reading به صورت تصادفی.

  • تصادفی ایجاد کنید value برای آن خواندن

  • زنگ زدن producer.send برای انتشار این داده ها به موضوع. را sensor به عنوان key برای رویداد، در حالی که reading و value پیام رویداد را تشکیل خواهد داد.

  • سپس، تا 20 ثانیه قبل از تکرار بعدی حلقه صبر می کنیم.

فرآیند مصرف کننده

فرآیند پس زمینه در consumer.js بسیار ساده تر است.

require('dotenv').config();
const logger = require('./src/logger.js');
const kafka = require('./src/kafka.js');

(async () => {
  const consumer = await kafka.consumer()
  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      const sensorId = message.key.toString()
      const messageObj = JSON.parse(message.value.toString())
      const logMessage = { sensorId }
      logMessage[messageObj.reading] = messageObj.value
      logger.info(logMessage)
    }
  })
})()
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

ما consumer قبلاً مشترک است weather-data موضوع. زنگ میزنیم consumer.run، و سپس یک کنترل کننده برای eachMessage. هر زمان که کافکا به او اطلاع می دهد consumer از یک پیام، پیام را ثبت می کند. این تمام چیزی است که در آن وجود دارد.

فرآیندها و Procfile

در package.json فایل، باید چند تا اضافه کنیم scripts که فرآیندهای پس زمینه تولید کننده و مصرف کننده ما را راه اندازی می کند. اکنون فایل باید شامل موارد زیر باشد:

...
  "scripts": {
    "start": "echo 'do nothing'",
    "start:consumer": "node consumer.js",
    "start:producer": "node producer.js"
  },
...
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

موارد مهم هستند start:consumer و start:producer. اما ما نگه می داریم start در فایل ما (حتی اگر هیچ چیز معنی‌داری انجام نمی‌دهد) زیرا سازنده Heroku انتظار دارد که آنجا باشد.

بعد، یک را ایجاد می کنیم Procfile که به Heroku می گوید چگونه کارگران مختلفی را که برای برنامه Heroku خود نیاز داریم راه اندازی کنیم. در پوشه ریشه پروژه ما، Procfile باید شبیه این باشد:

consumer_worker: npm run start:consumer
producer_worker: npm run start:producer
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

خیلی ساده، درست است؟ ما یک کارگر فرآیند پس زمینه را فراخوانی خواهیم داشت consumer_workerو دیگری زنگ زد producer_worker. متوجه خواهید شد که ما a نداریم web کارگر، چیزی است که معمولاً در آن می بینید Procfile برای یک برنامه وب برای برنامه Heroku ما فقط به دو کارگر پس زمینه نیاز داریم. ما نیاز نداریم web.

استقرار و آزمایش برنامه

با آن، تمام کد ما تنظیم می شود. ما تمام کد خود را به مخزن متعهد کرده ایم و آماده استقرار هستیم.

~/project$ git push heroku main
…
remote: -----> Build succeeded!
…
remote: -----> Compressing...
remote:        Done: 48.6M
remote: -----> Launching...
…
remote: Verifying deploy... done
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

پس از استقرار، می خواهیم مطمئن شویم که داینوهای خود را به درستی مقیاس بندی کرده ایم. ما برای یک فرآیند وب نیازی به dyno نداریم، اما برای هر دو به یکی نیاز داریم consumer_worker و producer_worker. دستور زیر را اجرا می کنیم تا این فرآیندها را بر اساس نیاز خود تنظیم کنیم.

~/project$ heroku ps:scale web=0 consumer_worker=1 producer_worker=1
Scaling dynos... done, now running producer_worker at 1:Eco, consumer_worker at 1:Eco, web at 0:Eco
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

در حال حاضر، همه چیز باید راه اندازی و اجرا شود. پشت صحنه، ما producer_worker باید به خوشه کافکا متصل شود و سپس شروع به انتشار داده های حسگر آب و هوا هر چند ثانیه یکبار کند. سپس، ما consumer_worker باید به خوشه کافکا متصل شود و هر پیامی را که از موضوعی که در آن مشترک است دریافت می کند، ثبت کند.

تا ببینیم ما چیست consumer_worker در حال انجام است، می‌توانیم در لاگ‌های Heroku خود نگاه کنیم.

~/project$ heroku logs --tail
…
heroku[producer_worker.1]: Starting process with command `npm run start:producer`
heroku[producer_worker.1]: State changed from starting to up
app[producer_worker.1]: 
app[producer_worker.1]: > weather-eda-kafka-heroku-node@1.0.0 start:producer
app[producer_worker.1]: > node producer.js
app[producer_worker.1]: 


…

heroku[consumer_worker.1]: Starting process with command `npm run start:consumer`
heroku[consumer_worker.1]: State changed from starting to up
app[consumer_worker.1]: 
app[consumer_worker.1]: > weather-eda-kafka-heroku-node@1.0.0 start:consumer
app[consumer_worker.1]: > node consumer.js
app[consumer_worker.1]: 
app[consumer_worker.1]: {"level":"INFO","timestamp":"2024-05-28T02:31:20.660Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"columbia-68051.weather-consumers"}
app[consumer_worker.1]: {"level":"INFO","timestamp":"2024-05-28T02:31:23.702Z","logger":"kafkajs","message":"[ConsumerGroup] Consumer has joined the group","groupId":"columbia-68051.weather-consumers","memberId":"weather-eda-app-nodejs-client-3ee5d1fa-eba9-4b59-826c-d3b924a6e4e4","leaderId":"weather-eda-app-nodejs-client-3ee5d1fa-eba9-4b59-826c-d3b924a6e4e4","isLeader":true,"memberAssignment":{"columbia-68051.test-topic-1":[0,1,2,3,4,5,6,7]},"groupProtocol":"RoundRobinAssigner","duration":3041}
app[consumer_worker.1]: [2024-05-28 02:31:23.755 +0000] INFO (21): {"sensorId":"sensor01","temperature":87.84}
app[consumer_worker.1]: [2024-05-28 02:31:23.764 +0000] INFO (21): {"sensorId":"sensor01","humidity":0.3}
app[consumer_worker.1]: [2024-05-28 02:31:23.777 +0000] INFO (21): {"sensorId":"sensor03","temperature":22.11}
app[consumer_worker.1]: [2024-05-28 02:31:37.773 +0000] INFO (21): {"sensorId":"sensor01","barometric_pressure":29.71}
app[consumer_worker.1]: [2024-05-28 02:31:54.495 +0000] INFO (21): {"sensorId":"sensor05","barometric_pressure":29.55}
app[consumer_worker.1]: [2024-05-28 02:32:02.629 +0000] INFO (21): {"sensorId":"sensor04","temperature":90.58}
app[consumer_worker.1]: [2024-05-28 02:32:03.995 +0000] INFO (21): {"sensorId":"sensor02","barometric_pressure":29.25}
app[consumer_worker.1]: [2024-05-28 02:32:12.688 +0000] INFO (21): {"sensorId":"sensor04","humidity":0.1}
app[consumer_worker.1]: [2024-05-28 02:32:32.127 +0000] INFO (21): {"sensorId":"sensor01","humidity":0.34}
app[consumer_worker.1]: [2024-05-28 02:32:32.851 +0000] INFO (21): {"sensorId":"sensor02","humidity":0.61}
app[consumer_worker.1]: [2024-05-28 02:32:37.200 +0000] INFO (21): {"sensorId":"sensor01","barometric_pressure":30.36}
app[consumer_worker.1]: [2024-05-28 02:32:50.388 +0000] INFO (21): {"sensorId":"sensor03","temperature":104.55}
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

کار می کند! ما می دانیم که تولید کننده ما به طور دوره ای پیام هایی را برای کافکا منتشر می کند زیرا مصرف کننده ما آنها را دریافت می کند و سپس آنها را ثبت می کند.

البته، در یک برنامه بزرگتر EDA، هر سنسور یک تولید کننده است. آنها ممکن است در چندین موضوع برای اهداف مختلف منتشر کنند، یا ممکن است همه آنها در یک موضوع منتشر کنند. و مصرف کننده شما می تواند در چندین موضوع مشترک شود. همچنین، در برنامه آزمایشی ما، مصرف کننده ما به سادگی مقدار زیادی را منتشر کرد eachMessage; اما در یک برنامه EDA، یک مصرف کننده ممکن است با تماس با یک API شخص ثالث، ارسال یک اعلان پیامکی، یا پرس و جو از پایگاه داده پاسخ دهد.

اکنون که درک اولیه ای از رویدادها، موضوعات، تولیدکنندگان و مصرف کنندگان دارید و می دانید که چگونه با کافکا کار کنید، می توانید شروع به طراحی و ساخت برنامه های کاربردی EDA خود کنید تا موارد استفاده تجاری پیچیده تر را برآورده کنید.

نتیجه

EDA بسیار قدرتمند است – شما می توانید سیستم های خود را جدا کنید در حالی که از ویژگی های کلیدی مانند مقیاس پذیری آسان و پردازش داده ها در زمان واقعی لذت می برید. برای EDA، کافکا یک ابزار کلیدی است که به شما کمک می‌کند تا به راحتی جریان‌های داده‌ای با توان عملیاتی بالا را مدیریت کنید. استفاده از آپاچی کافکا در Heroku به شما کمک می کند تا سریع شروع کنید. از آنجایی که این یک سرویس مدیریت شده است، لازم نیست نگران بخش های پیچیده مدیریت خوشه کافکا باشید. شما فقط می توانید بر روی ساخت برنامه های خود تمرکز کنید.

از اینجا، زمان آزمایش و نمونه سازی شما فرا رسیده است. موارد استفاده را شناسایی کنید که به خوبی با EDA مطابقت دارند. شیرجه بزنید، آن را روی Heroku آزمایش کنید و چیزی شگفت انگیز بسازید. کد نویسی مبارک!

نوشته های مشابه

دیدگاهتان را بنویسید

نشانی ایمیل شما منتشر نخواهد شد. بخش‌های موردنیاز علامت‌گذاری شده‌اند *

دکمه بازگشت به بالا