ردیابی تغییرات حساب کاربری در زمان واقعی: پیاده سازی Debezium و ClickHouse

Summarize this content to 400 words in Persian Lang
معرفی
در برنامههای کاربردی تجارت الکترونیک مدرن، ردیابی تغییرات دادههای مهم مانند انواع حسابهای کاربری در زمان واقعی برای هوش تجاری و رعایت مقررات ضروری است. پایگاه های داده سنتی اغلب ارزش های قدیمی را بازنویسی می کنند و تحلیل تاریخی را به چالش می کشند. Enter Change Data Capture (CDC)، تکنیکی که هر تغییری را در دادههای شما ثبت و ذخیره میکند و یک مسیر حسابرسی جامع و تجزیه و تحلیل بلادرنگ را ممکن میسازد.
نمای کلی سناریو
تصور کنید یک پایگاه داده PostgreSQL را برای یک پلتفرم تجارت الکترونیکی مدیریت می کنید که در آن انواع حساب های کاربری (برنز، نقره ای، طلایی) اغلب تغییر می کنند. با این حال، PostgreSQL فقط آخرین مقادیر را حفظ می کند، و تجزیه و تحلیل وضعیت های گذشته حساب های کاربری را دشوار می کند. برای رفع این مشکل، از Debezium و ClickHouse برای ردیابی و ذخیره تمام تغییرات نوع حساب با مُهر زمانی استفاده می کنیم.
تنظیم محیط
دریافت پروژه از Git
برای شروع، مخزن postgres-cdc-clickhouse GitHub را شبیه سازی کنید و محیط را با استفاده از Docker Compose راه اندازی کنید:
git clone https://github.com/ranjbaryshahab/postgres-cdc-clickhouse
cd postgres-cdc-clickhouse
docker-compose up
وارد حالت تمام صفحه شوید
از حالت تمام صفحه خارج شوید
ایجاد جدول PostgreSQL
ابتدا جدول کاربران را در PostgreSQL برای ذخیره داده های کاربر و پیگیری تغییرات تعریف کنید:
CREATE TABLE users (
user_id SERIAL PRIMARY KEY,
username VARCHAR(50) NOT NULL,
account_type VARCHAR(20) NOT NULL,
updated_at TIMESTAMP DEFAULT timezone(‘UTC’, CURRENT_TIMESTAMP),
created_at TIMESTAMP DEFAULT timezone(‘UTC’, CURRENT_TIMESTAMP)
);
ALTER TABLE public.users REPLICA IDENTITY FULL;
وارد حالت تمام صفحه شوید
از حالت تمام صفحه خارج شوید
سپس درج داده ها:
INSERT INTO users (username, account_type) VALUES
(‘user1’, ‘Bronze’),
(‘user2’, ‘Silver’),
(‘user3’, ‘Gold’);
وارد حالت تمام صفحه شوید
از حالت تمام صفحه خارج شوید
و نتیجه به این صورت است:
راه اندازی کانکتور Debezium
کانکتور Debezium PostgreSQL را در Kafka Connect پیکربندی کنید تا تغییرات جدول کاربران را ثبت کند:
curl –location –request POST ‘http://localhost:8083/connectors’ \
–header ‘Content-Type: application/json’ \
–data-raw ‘{
“name”: “raw_data.shop-connector”,
“config”: {
“connector.class”: “io.debezium.connector.postgresql.PostgresConnector”,
“database.dbname”: “postgres”,
“database.history.kafka.bootstrap.servers”: “kafka:9092”,
“database.history.kafka.topic”: “schema-changes.shop”,
“database.hostname”: “postgres”,
“database.password”: “postgres”,
“database.port”: “5432”,
“database.server.name”: “shop”,
“database.user”: “postgres”,
“name”: “raw_data.shop-connector”,
“plugin.name”: “pgoutput”,
“table.include.list”: “public.users”,
“tasks.max”: “1”,
“topic.creation.default.cleanup.policy”: “delete”,
“topic.creation.default.partitions”: “1”,
“topic.creation.default.replication.factor”: “1”,
“topic.creation.default.retention.ms”: “604800000”,
“topic.creation.enable”: “true”,
“topic.prefix”: “raw_data.shop”,
“database.history.skip.unparseable.ddl”: “true”,
“key.converter”: ” org.apache.kafka.connect.json.JsonConverter”,
“value.converter”: ” org.apache.kafka.connect.json.JsonConverter”,
“key.converter.schemas.enable”: “false”,
“value.converter.schemas.enable”: “false”,
“snapshot.mode”: “initial”
}
}’
وارد حالت تمام صفحه شوید
از حالت تمام صفحه خارج شوید
نتیجه را در Redpanda Console ببینید:برای مشاهده موضوعات ایجاد شده توسط کانکتور Debezium به http://localhost:9080/topics بروید.
ردیابی تغییرات در زمان واقعی
اکنون، بیایید یک بهروزرسانی را در جدول کاربران شبیهسازی کنیم تا تغییرات منعکسشده در موضوعات کافکا و ClickHouse ما را ببینیم:
— Update user with ID 1 to change the account type to ‘Gold’
UPDATE users
SET account_type = ‘Gold’, updated_at = timezone(‘UTC’, CURRENT_TIMESTAMP)
WHERE user_id = 1;
وارد حالت تمام صفحه شوید
از حالت تمام صفحه خارج شوید
هنگامی که این به روز رسانی ساخته شد، Debezium تغییرات را ثبت می کند و یک رویداد در موضوع کافکا ایجاد می کند. این رویداد حاوی مقادیر قبلی و جدید فیلد account_type به همراه نوع عملیات (u برای بهروزرسانی) است.
نتیجه را در Redpanda Console ببینید:
ذخیره تغییرات در ClickHouse
از ClickHouse برای ذخیره همه تغییرات در یک جدول اختصاصی و نمای واقعی استفاده کنید:
پایگاه داده و جدول ClickHouse ایجاد کنید
CREATE DATABASE shop;
CREATE TABLE shop.account_type_switch (
user_id UInt32,
username String,
before_account_type String,
after_account_type String,
updated_at DateTime,
created_at DateTime
) ENGINE = ReplacingMergeTree
ORDER BY (user_id, updated_at)
SETTINGS index_granularity = 8192;
وارد حالت تمام صفحه شوید
از حالت تمام صفحه خارج شوید
جدول موتور کافکا را در ClickHouse تنظیم کنید
CREATE TABLE kafka_shop.kafka__account_type_switch (
`jsonString` String
) ENGINE = Kafka
SETTINGS kafka_broker_list = ‘broker:29092’,
kafka_topic_list = ‘raw_data.shop.public.users’,
kafka_group_name = ‘raw_date_clickhouse’,
kafka_format = ‘JSONAsString’;
وارد حالت تمام صفحه شوید
از حالت تمام صفحه خارج شوید
نمای متریال شده ایجاد کنید
CREATE MATERIALIZED VIEW kafka_shop.consumer__account_type_switch TO shop.account_type_switch (
user_id UInt32,
username String,
before_account_type String,
after_account_type String,
updated_at DateTime,
created_at DateTime
) AS
SELECT
JSONExtract(JSONExtract(jsonString, ‘after’, ‘Nullable(String)’), ‘user_id’, ‘Nullable(UInt32)’) AS user_id,
JSONExtract(JSONExtract(jsonString, ‘after’, ‘Nullable(String)’), ‘username’, ‘Nullable(String)’) AS username,
JSONExtract(JSONExtract(jsonString, ‘before’, ‘Nullable(String)’), ‘account_type’, ‘Nullable(String)’) AS before_account_type,
JSONExtract(JSONExtract(jsonString, ‘after’, ‘Nullable(String)’), ‘account_type’, ‘Nullable(String)’) AS after_account_type,
toDateTime(JSONExtract(JSONExtract(jsonString, ‘after’, ‘Nullable(String)’), ‘updated_at’, ‘Nullable(UInt64)’) / 1000000) AS updated_at,
toDateTime(JSONExtract(JSONExtract(jsonString, ‘after’, ‘Nullable(String)’), ‘created_at’, ‘Nullable(UInt64)’) / 1000000) AS created_at
FROM kafka_shop.kafka__account_type_switch
WHERE (before_account_type != after_account_type) AND (JSONExtract(jsonString, ‘op’, ‘Nullable(String)’) = ‘u’);
وارد حالت تمام صفحه شوید
از حالت تمام صفحه خارج شوید
داده ها را در ClickHouse تأیید کنید
نتیجه
پیادهسازی CDC با Debezium و ClickHouse پلتفرم تجارت الکترونیک شما را قادر میسازد تا تاریخچه کامل تغییر نوع حساب کاربری را حفظ کند. این راهاندازی حسابرسی دقیق، انطباق با مقررات، و تجزیه و تحلیل پیشرفته را قدرت میدهد و بینشهای ارزشمندی را در مورد رفتار کاربر و روندهای تجاری ارائه میدهد.
با دنبال کردن این مراحل، میتوانید شیوههای مدیریت دادههای خود را تقویت کنید و از ردیابی تغییرات در زمان واقعی برای هدایت تصمیمگیری آگاهانه در کسبوکار خود استفاده کنید.
معرفی
در برنامههای کاربردی تجارت الکترونیک مدرن، ردیابی تغییرات دادههای مهم مانند انواع حسابهای کاربری در زمان واقعی برای هوش تجاری و رعایت مقررات ضروری است. پایگاه های داده سنتی اغلب ارزش های قدیمی را بازنویسی می کنند و تحلیل تاریخی را به چالش می کشند. Enter Change Data Capture (CDC)، تکنیکی که هر تغییری را در دادههای شما ثبت و ذخیره میکند و یک مسیر حسابرسی جامع و تجزیه و تحلیل بلادرنگ را ممکن میسازد.
نمای کلی سناریو
تصور کنید یک پایگاه داده PostgreSQL را برای یک پلتفرم تجارت الکترونیکی مدیریت می کنید که در آن انواع حساب های کاربری (برنز، نقره ای، طلایی) اغلب تغییر می کنند. با این حال، PostgreSQL فقط آخرین مقادیر را حفظ می کند، و تجزیه و تحلیل وضعیت های گذشته حساب های کاربری را دشوار می کند. برای رفع این مشکل، از Debezium و ClickHouse برای ردیابی و ذخیره تمام تغییرات نوع حساب با مُهر زمانی استفاده می کنیم.
تنظیم محیط
دریافت پروژه از Git
برای شروع، مخزن postgres-cdc-clickhouse GitHub را شبیه سازی کنید و محیط را با استفاده از Docker Compose راه اندازی کنید:
git clone https://github.com/ranjbaryshahab/postgres-cdc-clickhouse
cd postgres-cdc-clickhouse
docker-compose up
ایجاد جدول PostgreSQL
ابتدا جدول کاربران را در PostgreSQL برای ذخیره داده های کاربر و پیگیری تغییرات تعریف کنید:
CREATE TABLE users (
user_id SERIAL PRIMARY KEY,
username VARCHAR(50) NOT NULL,
account_type VARCHAR(20) NOT NULL,
updated_at TIMESTAMP DEFAULT timezone('UTC', CURRENT_TIMESTAMP),
created_at TIMESTAMP DEFAULT timezone('UTC', CURRENT_TIMESTAMP)
);
ALTER TABLE public.users REPLICA IDENTITY FULL;
سپس درج داده ها:
INSERT INTO users (username, account_type) VALUES
('user1', 'Bronze'),
('user2', 'Silver'),
('user3', 'Gold');
و نتیجه به این صورت است:
راه اندازی کانکتور Debezium
کانکتور Debezium PostgreSQL را در Kafka Connect پیکربندی کنید تا تغییرات جدول کاربران را ثبت کند:
curl --location --request POST 'http://localhost:8083/connectors' \
--header 'Content-Type: application/json' \
--data-raw '{
"name": "raw_data.shop-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.dbname": "postgres",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.shop",
"database.hostname": "postgres",
"database.password": "postgres",
"database.port": "5432",
"database.server.name": "shop",
"database.user": "postgres",
"name": "raw_data.shop-connector",
"plugin.name": "pgoutput",
"table.include.list": "public.users",
"tasks.max": "1",
"topic.creation.default.cleanup.policy": "delete",
"topic.creation.default.partitions": "1",
"topic.creation.default.replication.factor": "1",
"topic.creation.default.retention.ms": "604800000",
"topic.creation.enable": "true",
"topic.prefix": "raw_data.shop",
"database.history.skip.unparseable.ddl": "true",
"key.converter": " org.apache.kafka.connect.json.JsonConverter",
"value.converter": " org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"snapshot.mode": "initial"
}
}'
نتیجه را در Redpanda Console ببینید:
برای مشاهده موضوعات ایجاد شده توسط کانکتور Debezium به http://localhost:9080/topics بروید.
ردیابی تغییرات در زمان واقعی
اکنون، بیایید یک بهروزرسانی را در جدول کاربران شبیهسازی کنیم تا تغییرات منعکسشده در موضوعات کافکا و ClickHouse ما را ببینیم:
-- Update user with ID 1 to change the account type to 'Gold'
UPDATE users
SET account_type = 'Gold', updated_at = timezone('UTC', CURRENT_TIMESTAMP)
WHERE user_id = 1;
هنگامی که این به روز رسانی ساخته شد، Debezium تغییرات را ثبت می کند و یک رویداد در موضوع کافکا ایجاد می کند. این رویداد حاوی مقادیر قبلی و جدید فیلد account_type به همراه نوع عملیات (u برای بهروزرسانی) است.
نتیجه را در Redpanda Console ببینید:
ذخیره تغییرات در ClickHouse
از ClickHouse برای ذخیره همه تغییرات در یک جدول اختصاصی و نمای واقعی استفاده کنید:
پایگاه داده و جدول ClickHouse ایجاد کنید
CREATE DATABASE shop;
CREATE TABLE shop.account_type_switch (
user_id UInt32,
username String,
before_account_type String,
after_account_type String,
updated_at DateTime,
created_at DateTime
) ENGINE = ReplacingMergeTree
ORDER BY (user_id, updated_at)
SETTINGS index_granularity = 8192;
جدول موتور کافکا را در ClickHouse تنظیم کنید
CREATE TABLE kafka_shop.kafka__account_type_switch (
`jsonString` String
) ENGINE = Kafka
SETTINGS kafka_broker_list = 'broker:29092',
kafka_topic_list = 'raw_data.shop.public.users',
kafka_group_name = 'raw_date_clickhouse',
kafka_format = 'JSONAsString';
نمای متریال شده ایجاد کنید
CREATE MATERIALIZED VIEW kafka_shop.consumer__account_type_switch TO shop.account_type_switch (
user_id UInt32,
username String,
before_account_type String,
after_account_type String,
updated_at DateTime,
created_at DateTime
) AS
SELECT
JSONExtract(JSONExtract(jsonString, 'after', 'Nullable(String)'), 'user_id', 'Nullable(UInt32)') AS user_id,
JSONExtract(JSONExtract(jsonString, 'after', 'Nullable(String)'), 'username', 'Nullable(String)') AS username,
JSONExtract(JSONExtract(jsonString, 'before', 'Nullable(String)'), 'account_type', 'Nullable(String)') AS before_account_type,
JSONExtract(JSONExtract(jsonString, 'after', 'Nullable(String)'), 'account_type', 'Nullable(String)') AS after_account_type,
toDateTime(JSONExtract(JSONExtract(jsonString, 'after', 'Nullable(String)'), 'updated_at', 'Nullable(UInt64)') / 1000000) AS updated_at,
toDateTime(JSONExtract(JSONExtract(jsonString, 'after', 'Nullable(String)'), 'created_at', 'Nullable(UInt64)') / 1000000) AS created_at
FROM kafka_shop.kafka__account_type_switch
WHERE (before_account_type != after_account_type) AND (JSONExtract(jsonString, 'op', 'Nullable(String)') = 'u');
داده ها را در ClickHouse تأیید کنید
نتیجه
پیادهسازی CDC با Debezium و ClickHouse پلتفرم تجارت الکترونیک شما را قادر میسازد تا تاریخچه کامل تغییر نوع حساب کاربری را حفظ کند. این راهاندازی حسابرسی دقیق، انطباق با مقررات، و تجزیه و تحلیل پیشرفته را قدرت میدهد و بینشهای ارزشمندی را در مورد رفتار کاربر و روندهای تجاری ارائه میدهد.
با دنبال کردن این مراحل، میتوانید شیوههای مدیریت دادههای خود را تقویت کنید و از ردیابی تغییرات در زمان واقعی برای هدایت تصمیمگیری آگاهانه در کسبوکار خود استفاده کنید.