اتصال RDB ها و موتورهای جستجو – فصل 3

فصل 3: تأیید جریان از postgresql به debezium به Kafka
در این فصل ، ما روند پایان به پایان گرفتن داده های تغییر از PostgreSQL را با Debezium و تحویل آن به Kafka آزمایش خواهیم کرد.
1. شروع محیط زیست
قبل از شروع این فصل ، اطمینان حاصل کنید که تمام خدمات لازم از طریق آهنگسازی Docker اجرا می شوند:
docker compose up -d
تأیید کنید که اجزای زیر در حال اجرا هستند:
- پس از
- آپاچی کافکا
- Debezium Connect (Kafka Connect)
- باغ وحش
دیدن docker-compose.yaml
در مخزن برای جزئیات پیکربندی خدمات.
2. تنظیم postgreSQL
ابتدا جدول را ایجاد می کنیم که Debezium نظارت خواهد کرد. موارد زیر را به اضافه کنید postgres/00-init.sql
:
-- Create Debezium user
CREATE ROLE debezium WITH LOGIN PASSWORD 'dbz' REPLICATION;
-- Create target table
CREATE TABLE testtable (
id INTEGER PRIMARY KEY,
message VARCHAR(255)
);
INSERT INTO testtable (id, message) VALUES (1, 'CDC test row');
-- Grant SELECT permission
GRANT SELECT ON testtable TO debezium;
-- Create publication (for pgoutput plugin)
CREATE PUBLICATION debezium_pub FOR TABLE testtable;
این اسکریپت اجرا می شود فقط یک بار در حین ایجاد ظروف اولیه.
برای اعمال تغییرات در اسکریپت ، باید حجم مداوم را حذف کرده و مجدداً راه اندازی کنید:
docker compose down --volumes
3. پیکربندی کانتینر PostgreSQL
برای فعال کردن CDC با استفاده از WAL (ورود به سیستم) ، موارد زیر را به موارد اضافه کنید docker-compose.yaml
پیکربندی:
command: >
postgres
-c wal_level=logical
-c max_replication_slots=4
-c max_wal_senders=4
برای راه اندازی مجدد postgresql بدون از دست دادن داده ها:
docker compose restart postgres
برای شروع مجدد SQL ، باید حجم را حذف کنید:
docker compose down --volumes
4. ثبت اتصال Debezium
پس از اجرای DeBezium Connect ، اتصال را با دستور زیر ثبت کنید (فرمت DeBezium 1.9):
curl -X POST "localhost:8083/connectors" \
-H "Content-Type: application/json" \
-d '{
"name": "postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "debezium",
"database.password": "dbz",
"database.dbname": "postgres",
"database.server.name": "dbserver1",
"topic.prefix": "dbserver1",
"plugin.name": "pgoutput",
"publication.name": "debezium_pub",
"slot.name": "debezium_slot",
"slot.drop.on.stop": "true",
"table.include.list": "public.testtable",
"snapshot.mode": "initial",
"tombstones.on.delete": "false",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false"
}
}'
در مورد snapshot.mode
در snapshot.mode
گزینه کنترل می کند که آیا محتویات کامل پایگاه داده را در راه اندازی کانکتور بارگیری کنید.
-
initial
: تمام داده های موجود را یک بار بارگیری کنید ، سپس تغییرات را ضبط کنید (پیش فرض) -
never
: پرش بار اولیه ؛ فقط تغییرات بعدی را ضبط کنید
در این راهنما استفاده می کنیم initial
، بنابراین ردیف های موجود با کافکا ارسال می شوند op: r
(بخوانید).
5. بررسی وضعیت اتصال
وضعیت اتصال را بررسی کنید:
curl http://localhost:8083/connectors/postgres-connector/status
خروجی مورد انتظار:
{
"name": "postgres-connector",
"connector": {
"state": "RUNNING",
"worker_id": "10.4.1.81:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "10.4.1.81:8083"
}
],
"type": "source"
}
6. بررسی مباحث کافکا
مباحث کافکا را لیست کنید:
# Enter Kafka container
docker compose exec kafka bash
# List topics
kafka-topics --bootstrap-server localhost:9092 --list
خروجی مورد انتظار:
dbserver1.public.testtable
7. مشاهده رویدادهای CDC از کافکا
از موارد زیر برای مصرف رویدادها از موضوع استفاده کنید:
kafka-console-consumer --bootstrap-server localhost:9092 \
--topic dbserver1.public.testtable \
--from-beginning
نمونه خروجی عکس فوری:
{
"before": null,
"after": {
"id": 1,
"message": "CDC test row"
},
"op": "r"
}
8. تأیید تغییرات داده های زنده
یک ردیف جدید در PostgreSQL اضافه کنید:
# Log into PostgreSQL
docker compose exec postgres psql -U postgres
# Insert new row
INSERT INTO testtable (id, message) VALUES (2, 'inserted row');
خروجی کافکا مورد انتظار:
{
"before": null,
"after": {
"id": 2,
"message": "inserted row"
},
"op": "c"
}
قالب رویداد Debezium JSON
DeBezium از قالب زیر برای پیام های CDC استفاده می کند:
op ارزش |
معنی |
---|---|
r |
عکس فوری خوانده شده (داده های اولیه) |
c |
درج (ایجاد) |
u |
بروزرسانی |
d |
حذف کردن |
به عنوان مثال ، JSON زیر به این معنی است که یک ردیف جدید درج شده است:
{
"before": null,
"after": {
"id": 2,
"message": "inserted row"
},
"op": "c"
}
عیب یابی
موضوعی که در کافکا ایجاد نشده است
اگر dbserver1.public.testtable
ظاهر نمی شود ، بررسی کنید:
- این که
debezium_pub
در PostgreSQL وجود دارد:
SELECT * FROM pg_publication;
docker compose down --volumes
docker compose up -d
- که سیاهههای مربوط به Kafka بدون خطا هستند:
docker compose logs connect
هیچ رویدادی از کافکا نمایش داده نمی شود
اگر حوادث CDC در کافکا قابل مشاهده نباشد:
- اطمینان حاصل کنید که کانکتور است
RUNNING
(به مرحله 5 مراجعه کنید) - اطمینان حاصل کنید که ردیف درج شده دارای یک شناسه منحصر به فرد است
- اطمینان حاصل کنید که از ابتدا مصرف می کنید:
kafka-console-consumer --bootstrap-server localhost:9092 \
--topic dbserver1.public.testtable \
--from-beginning
در این فصل ، ما جریان داده های تغییر از postgreSQL به Kafka را با استفاده از Debezium تأیید کردیم. در فصل بعد ، ما از Flink برای پردازش این داده ها استفاده خواهیم کرد.
(به زودی: فصل 4 قسمت 1 – خروجی داده های CDC Kafka برای کنسول با فلینک)