برنامه نویسی

اتصال RDB ها و موتورهای جستجو – فصل 3

فصل 3: تأیید جریان از postgresql به debezium به Kafka

در این فصل ، ما روند پایان به پایان گرفتن داده های تغییر از PostgreSQL را با Debezium و تحویل آن به Kafka آزمایش خواهیم کرد.


1. شروع محیط زیست

قبل از شروع این فصل ، اطمینان حاصل کنید که تمام خدمات لازم از طریق آهنگسازی Docker اجرا می شوند:

docker compose up -d
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

تأیید کنید که اجزای زیر در حال اجرا هستند:

  • پس از
  • آپاچی کافکا
  • Debezium Connect (Kafka Connect)
  • باغ وحش

دیدن docker-compose.yaml در مخزن برای جزئیات پیکربندی خدمات.


2. تنظیم postgreSQL

ابتدا جدول را ایجاد می کنیم که Debezium نظارت خواهد کرد. موارد زیر را به اضافه کنید postgres/00-init.sql:

-- Create Debezium user
CREATE ROLE debezium WITH LOGIN PASSWORD 'dbz' REPLICATION;

-- Create target table
CREATE TABLE testtable (
  id INTEGER PRIMARY KEY,
  message VARCHAR(255)
);
INSERT INTO testtable (id, message) VALUES (1, 'CDC test row');

-- Grant SELECT permission
GRANT SELECT ON testtable TO debezium;

-- Create publication (for pgoutput plugin)
CREATE PUBLICATION debezium_pub FOR TABLE testtable;
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

این اسکریپت اجرا می شود فقط یک بار در حین ایجاد ظروف اولیه.
برای اعمال تغییرات در اسکریپت ، باید حجم مداوم را حذف کرده و مجدداً راه اندازی کنید:

docker compose down --volumes
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید


3. پیکربندی کانتینر PostgreSQL

برای فعال کردن CDC با استفاده از WAL (ورود به سیستم) ، موارد زیر را به موارد اضافه کنید docker-compose.yaml پیکربندی:

command: >
  postgres
  -c wal_level=logical
  -c max_replication_slots=4
  -c max_wal_senders=4
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

برای راه اندازی مجدد postgresql بدون از دست دادن داده ها:

docker compose restart postgres
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

برای شروع مجدد SQL ، باید حجم را حذف کنید:

docker compose down --volumes
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید


4. ثبت اتصال Debezium

پس از اجرای DeBezium Connect ، اتصال را با دستور زیر ثبت کنید (فرمت DeBezium 1.9):

curl -X POST "localhost:8083/connectors" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "postgres-connector",
    "config": {
      "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
      "database.hostname": "postgres",
      "database.port": "5432",
      "database.user": "debezium",
      "database.password": "dbz",
      "database.dbname": "postgres",
      "database.server.name": "dbserver1",
      "topic.prefix": "dbserver1",
      "plugin.name": "pgoutput",
      "publication.name": "debezium_pub",
      "slot.name": "debezium_slot",
      "slot.drop.on.stop": "true",
      "table.include.list": "public.testtable",
      "snapshot.mode": "initial",
      "tombstones.on.delete": "false",
      "key.converter": "org.apache.kafka.connect.json.JsonConverter",
      "value.converter": "org.apache.kafka.connect.json.JsonConverter",
      "key.converter.schemas.enable": "false",
      "value.converter.schemas.enable": "false"
    }
  }'
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

در مورد snapshot.mode

در snapshot.mode گزینه کنترل می کند که آیا محتویات کامل پایگاه داده را در راه اندازی کانکتور بارگیری کنید.

  • initial: تمام داده های موجود را یک بار بارگیری کنید ، سپس تغییرات را ضبط کنید (پیش فرض)
  • never: پرش بار اولیه ؛ فقط تغییرات بعدی را ضبط کنید

در این راهنما استفاده می کنیم initial، بنابراین ردیف های موجود با کافکا ارسال می شوند op: r (بخوانید).


5. بررسی وضعیت اتصال

وضعیت اتصال را بررسی کنید:

curl http://localhost:8083/connectors/postgres-connector/status
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

خروجی مورد انتظار:

{
  "name": "postgres-connector",
  "connector": {
    "state": "RUNNING",
    "worker_id": "10.4.1.81:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "10.4.1.81:8083"
    }
  ],
  "type": "source"
}
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید


6. بررسی مباحث کافکا

مباحث کافکا را لیست کنید:

# Enter Kafka container
docker compose exec kafka bash

# List topics
kafka-topics --bootstrap-server localhost:9092 --list
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

خروجی مورد انتظار:

dbserver1.public.testtable
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید


7. مشاهده رویدادهای CDC از کافکا

از موارد زیر برای مصرف رویدادها از موضوع استفاده کنید:

kafka-console-consumer --bootstrap-server localhost:9092 \
  --topic dbserver1.public.testtable \
  --from-beginning
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

نمونه خروجی عکس فوری:

{
  "before": null,
  "after": {
    "id": 1,
    "message": "CDC test row"
  },
  "op": "r"
}
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید


8. تأیید تغییرات داده های زنده

یک ردیف جدید در PostgreSQL اضافه کنید:

# Log into PostgreSQL
docker compose exec postgres psql -U postgres

# Insert new row
INSERT INTO testtable (id, message) VALUES (2, 'inserted row');
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

خروجی کافکا مورد انتظار:

{
  "before": null,
  "after": {
    "id": 2,
    "message": "inserted row"
  },
  "op": "c"
}
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

قالب رویداد Debezium JSON

DeBezium از قالب زیر برای پیام های CDC استفاده می کند:

op ارزش معنی
r عکس فوری خوانده شده (داده های اولیه)
c درج (ایجاد)
u بروزرسانی
d حذف کردن

به عنوان مثال ، JSON زیر به این معنی است که یک ردیف جدید درج شده است:

{
  "before": null,
  "after": {
    "id": 2,
    "message": "inserted row"
  },
  "op": "c"
}
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید


عیب یابی

موضوعی که در کافکا ایجاد نشده است

اگر dbserver1.public.testtable ظاهر نمی شود ، بررسی کنید:

  • این که debezium_pub در PostgreSQL وجود دارد:
SELECT * FROM pg_publication;
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

docker compose down --volumes
docker compose up -d
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

  • که سیاهههای مربوط به Kafka بدون خطا هستند:
docker compose logs connect
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

هیچ رویدادی از کافکا نمایش داده نمی شود

اگر حوادث CDC در کافکا قابل مشاهده نباشد:

  • اطمینان حاصل کنید که کانکتور است RUNNING (به مرحله 5 مراجعه کنید)
  • اطمینان حاصل کنید که ردیف درج شده دارای یک شناسه منحصر به فرد است
  • اطمینان حاصل کنید که از ابتدا مصرف می کنید:
kafka-console-consumer --bootstrap-server localhost:9092 \
  --topic dbserver1.public.testtable \
  --from-beginning
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید


در این فصل ، ما جریان داده های تغییر از postgreSQL به Kafka را با استفاده از Debezium تأیید کردیم. در فصل بعد ، ما از Flink برای پردازش این داده ها استفاده خواهیم کرد.

(به زودی: فصل 4 قسمت 1 – خروجی داده های CDC Kafka برای کنسول با فلینک)

نوشته های مشابه

دیدگاهتان را بنویسید

نشانی ایمیل شما منتشر نخواهد شد. بخش‌های موردنیاز علامت‌گذاری شده‌اند *

دکمه بازگشت به بالا