برنامه نویسی

ردیابی تغییرات حساب کاربری در زمان واقعی: پیاده سازی Debezium و ClickHouse

Summarize this content to 400 words in Persian Lang

معرفی

در برنامه‌های کاربردی تجارت الکترونیک مدرن، ردیابی تغییرات داده‌های مهم مانند انواع حساب‌های کاربری در زمان واقعی برای هوش تجاری و رعایت مقررات ضروری است. پایگاه های داده سنتی اغلب ارزش های قدیمی را بازنویسی می کنند و تحلیل تاریخی را به چالش می کشند. Enter Change Data Capture (CDC)، تکنیکی که هر تغییری را در داده‌های شما ثبت و ذخیره می‌کند و یک مسیر حسابرسی جامع و تجزیه و تحلیل بلادرنگ را ممکن می‌سازد.

نمای کلی سناریو

تصور کنید یک پایگاه داده PostgreSQL را برای یک پلتفرم تجارت الکترونیکی مدیریت می کنید که در آن انواع حساب های کاربری (برنز، نقره ای، طلایی) اغلب تغییر می کنند. با این حال، PostgreSQL فقط آخرین مقادیر را حفظ می کند، و تجزیه و تحلیل وضعیت های گذشته حساب های کاربری را دشوار می کند. برای رفع این مشکل، از Debezium و ClickHouse برای ردیابی و ذخیره تمام تغییرات نوع حساب با مُهر زمانی استفاده می کنیم.

تنظیم محیط

دریافت پروژه از Git

برای شروع، مخزن postgres-cdc-clickhouse GitHub را شبیه سازی کنید و محیط را با استفاده از Docker Compose راه اندازی کنید:

git clone https://github.com/ranjbaryshahab/postgres-cdc-clickhouse
cd postgres-cdc-clickhouse
docker-compose up

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

ایجاد جدول PostgreSQL

ابتدا جدول کاربران را در PostgreSQL برای ذخیره داده های کاربر و پیگیری تغییرات تعریف کنید:

CREATE TABLE users (
user_id SERIAL PRIMARY KEY,
username VARCHAR(50) NOT NULL,
account_type VARCHAR(20) NOT NULL,
updated_at TIMESTAMP DEFAULT timezone(‘UTC’, CURRENT_TIMESTAMP),
created_at TIMESTAMP DEFAULT timezone(‘UTC’, CURRENT_TIMESTAMP)
);

ALTER TABLE public.users REPLICA IDENTITY FULL;

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

سپس درج داده ها:

INSERT INTO users (username, account_type) VALUES
(‘user1’, ‘Bronze’),
(‘user2’, ‘Silver’),
(‘user3’, ‘Gold’);

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

و نتیجه به این صورت است:

راه اندازی کانکتور Debezium

کانکتور Debezium PostgreSQL را در Kafka Connect پیکربندی کنید تا تغییرات جدول کاربران را ثبت کند:

curl –location –request POST ‘http://localhost:8083/connectors’ \
–header ‘Content-Type: application/json’ \
–data-raw ‘{
“name”: “raw_data.shop-connector”,
“config”: {
“connector.class”: “io.debezium.connector.postgresql.PostgresConnector”,
“database.dbname”: “postgres”,
“database.history.kafka.bootstrap.servers”: “kafka:9092”,
“database.history.kafka.topic”: “schema-changes.shop”,
“database.hostname”: “postgres”,
“database.password”: “postgres”,
“database.port”: “5432”,
“database.server.name”: “shop”,
“database.user”: “postgres”,
“name”: “raw_data.shop-connector”,
“plugin.name”: “pgoutput”,
“table.include.list”: “public.users”,
“tasks.max”: “1”,
“topic.creation.default.cleanup.policy”: “delete”,
“topic.creation.default.partitions”: “1”,
“topic.creation.default.replication.factor”: “1”,
“topic.creation.default.retention.ms”: “604800000”,
“topic.creation.enable”: “true”,
“topic.prefix”: “raw_data.shop”,
“database.history.skip.unparseable.ddl”: “true”,
“key.converter”: ” org.apache.kafka.connect.json.JsonConverter”,
“value.converter”: ” org.apache.kafka.connect.json.JsonConverter”,
“key.converter.schemas.enable”: “false”,
“value.converter.schemas.enable”: “false”,
“snapshot.mode”: “initial”
}
}’

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

نتیجه را در Redpanda Console ببینید:برای مشاهده موضوعات ایجاد شده توسط کانکتور Debezium به http://localhost:9080/topics بروید.

ردیابی تغییرات در زمان واقعی

اکنون، بیایید یک به‌روزرسانی را در جدول کاربران شبیه‌سازی کنیم تا تغییرات منعکس‌شده در موضوعات کافکا و ClickHouse ما را ببینیم:

— Update user with ID 1 to change the account type to ‘Gold’
UPDATE users
SET account_type = ‘Gold’, updated_at = timezone(‘UTC’, CURRENT_TIMESTAMP)
WHERE user_id = 1;

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

هنگامی که این به روز رسانی ساخته شد، Debezium تغییرات را ثبت می کند و یک رویداد در موضوع کافکا ایجاد می کند. این رویداد حاوی مقادیر قبلی و جدید فیلد account_type به همراه نوع عملیات (u برای به‌روزرسانی) است.

نتیجه را در Redpanda Console ببینید:

ذخیره تغییرات در ClickHouse

از ClickHouse برای ذخیره همه تغییرات در یک جدول اختصاصی و نمای واقعی استفاده کنید:

پایگاه داده و جدول ClickHouse ایجاد کنید

CREATE DATABASE shop;

CREATE TABLE shop.account_type_switch (
user_id UInt32,
username String,
before_account_type String,
after_account_type String,
updated_at DateTime,
created_at DateTime
) ENGINE = ReplacingMergeTree
ORDER BY (user_id, updated_at)
SETTINGS index_granularity = 8192;

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

جدول موتور کافکا را در ClickHouse تنظیم کنید

CREATE TABLE kafka_shop.kafka__account_type_switch (
`jsonString` String
) ENGINE = Kafka
SETTINGS kafka_broker_list = ‘broker:29092’,
kafka_topic_list = ‘raw_data.shop.public.users’,
kafka_group_name = ‘raw_date_clickhouse’,
kafka_format = ‘JSONAsString’;

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

نمای متریال شده ایجاد کنید

CREATE MATERIALIZED VIEW kafka_shop.consumer__account_type_switch TO shop.account_type_switch (
user_id UInt32,
username String,
before_account_type String,
after_account_type String,
updated_at DateTime,
created_at DateTime
) AS
SELECT
JSONExtract(JSONExtract(jsonString, ‘after’, ‘Nullable(String)’), ‘user_id’, ‘Nullable(UInt32)’) AS user_id,
JSONExtract(JSONExtract(jsonString, ‘after’, ‘Nullable(String)’), ‘username’, ‘Nullable(String)’) AS username,
JSONExtract(JSONExtract(jsonString, ‘before’, ‘Nullable(String)’), ‘account_type’, ‘Nullable(String)’) AS before_account_type,
JSONExtract(JSONExtract(jsonString, ‘after’, ‘Nullable(String)’), ‘account_type’, ‘Nullable(String)’) AS after_account_type,
toDateTime(JSONExtract(JSONExtract(jsonString, ‘after’, ‘Nullable(String)’), ‘updated_at’, ‘Nullable(UInt64)’) / 1000000) AS updated_at,
toDateTime(JSONExtract(JSONExtract(jsonString, ‘after’, ‘Nullable(String)’), ‘created_at’, ‘Nullable(UInt64)’) / 1000000) AS created_at
FROM kafka_shop.kafka__account_type_switch
WHERE (before_account_type != after_account_type) AND (JSONExtract(jsonString, ‘op’, ‘Nullable(String)’) = ‘u’);

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

داده ها را در ClickHouse تأیید کنید

نتیجه

پیاده‌سازی CDC با Debezium و ClickHouse پلتفرم تجارت الکترونیک شما را قادر می‌سازد تا تاریخچه کامل تغییر نوع حساب کاربری را حفظ کند. این راه‌اندازی حسابرسی دقیق، انطباق با مقررات، و تجزیه و تحلیل پیشرفته را قدرت می‌دهد و بینش‌های ارزشمندی را در مورد رفتار کاربر و روندهای تجاری ارائه می‌دهد.

با دنبال کردن این مراحل، می‌توانید شیوه‌های مدیریت داده‌های خود را تقویت کنید و از ردیابی تغییرات در زمان واقعی برای هدایت تصمیم‌گیری آگاهانه در کسب‌وکار خود استفاده کنید.

معرفی

در برنامه‌های کاربردی تجارت الکترونیک مدرن، ردیابی تغییرات داده‌های مهم مانند انواع حساب‌های کاربری در زمان واقعی برای هوش تجاری و رعایت مقررات ضروری است. پایگاه های داده سنتی اغلب ارزش های قدیمی را بازنویسی می کنند و تحلیل تاریخی را به چالش می کشند. Enter Change Data Capture (CDC)، تکنیکی که هر تغییری را در داده‌های شما ثبت و ذخیره می‌کند و یک مسیر حسابرسی جامع و تجزیه و تحلیل بلادرنگ را ممکن می‌سازد.

نمای کلی سناریو

تصور کنید یک پایگاه داده PostgreSQL را برای یک پلتفرم تجارت الکترونیکی مدیریت می کنید که در آن انواع حساب های کاربری (برنز، نقره ای، طلایی) اغلب تغییر می کنند. با این حال، PostgreSQL فقط آخرین مقادیر را حفظ می کند، و تجزیه و تحلیل وضعیت های گذشته حساب های کاربری را دشوار می کند. برای رفع این مشکل، از Debezium و ClickHouse برای ردیابی و ذخیره تمام تغییرات نوع حساب با مُهر زمانی استفاده می کنیم.

تنظیم محیط

دریافت پروژه از Git

برای شروع، مخزن postgres-cdc-clickhouse GitHub را شبیه سازی کنید و محیط را با استفاده از Docker Compose راه اندازی کنید:

git clone https://github.com/ranjbaryshahab/postgres-cdc-clickhouse
cd postgres-cdc-clickhouse
docker-compose up
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

ایجاد جدول PostgreSQL

ابتدا جدول کاربران را در PostgreSQL برای ذخیره داده های کاربر و پیگیری تغییرات تعریف کنید:

CREATE TABLE users (
    user_id SERIAL PRIMARY KEY,
    username VARCHAR(50) NOT NULL,
    account_type VARCHAR(20) NOT NULL,
    updated_at TIMESTAMP DEFAULT timezone('UTC', CURRENT_TIMESTAMP),
    created_at TIMESTAMP DEFAULT timezone('UTC', CURRENT_TIMESTAMP)
);

ALTER TABLE public.users REPLICA IDENTITY FULL;
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

سپس درج داده ها:

INSERT INTO users (username, account_type) VALUES
('user1', 'Bronze'),
('user2', 'Silver'),
('user3', 'Gold');
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

و نتیجه به این صورت است:

نتیجه Postgres

راه اندازی کانکتور Debezium

کانکتور Debezium PostgreSQL را در Kafka Connect پیکربندی کنید تا تغییرات جدول کاربران را ثبت کند:

curl --location --request POST 'http://localhost:8083/connectors' \
--header 'Content-Type: application/json' \
--data-raw '{
    "name": "raw_data.shop-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.dbname": "postgres",
        "database.history.kafka.bootstrap.servers": "kafka:9092",
        "database.history.kafka.topic": "schema-changes.shop",
        "database.hostname": "postgres",
        "database.password": "postgres",
        "database.port": "5432",
        "database.server.name": "shop",
        "database.user": "postgres",
        "name": "raw_data.shop-connector",
        "plugin.name": "pgoutput",
        "table.include.list": "public.users",
        "tasks.max": "1",
        "topic.creation.default.cleanup.policy": "delete",
        "topic.creation.default.partitions": "1",
        "topic.creation.default.replication.factor": "1",
        "topic.creation.default.retention.ms": "604800000",
        "topic.creation.enable": "true",
        "topic.prefix": "raw_data.shop",
        "database.history.skip.unparseable.ddl": "true",
        "key.converter": " org.apache.kafka.connect.json.JsonConverter",
        "value.converter": " org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "false",
        "value.converter.schemas.enable": "false",
        "snapshot.mode": "initial"
    }
}'
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

نتیجه را در Redpanda Console ببینید:
برای مشاهده موضوعات ایجاد شده توسط کانکتور Debezium به http://localhost:9080/topics بروید.

داده های کنسول Redpanda

ردیابی تغییرات در زمان واقعی

اکنون، بیایید یک به‌روزرسانی را در جدول کاربران شبیه‌سازی کنیم تا تغییرات منعکس‌شده در موضوعات کافکا و ClickHouse ما را ببینیم:

-- Update user with ID 1 to change the account type to 'Gold'
UPDATE users
SET account_type = 'Gold', updated_at = timezone('UTC', CURRENT_TIMESTAMP)
WHERE user_id = 1;
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

هنگامی که این به روز رسانی ساخته شد، Debezium تغییرات را ثبت می کند و یک رویداد در موضوع کافکا ایجاد می کند. این رویداد حاوی مقادیر قبلی و جدید فیلد account_type به همراه نوع عملیات (u برای به‌روزرسانی) است.

نتیجه را در Redpanda Console ببینید:

داده های ردیابی کنسول Redpanda

ذخیره تغییرات در ClickHouse

از ClickHouse برای ذخیره همه تغییرات در یک جدول اختصاصی و نمای واقعی استفاده کنید:

پایگاه داده و جدول ClickHouse ایجاد کنید

CREATE DATABASE shop;

CREATE TABLE shop.account_type_switch (
    user_id UInt32,
    username String,
    before_account_type String,
    after_account_type String,
    updated_at DateTime,
    created_at DateTime
) ENGINE = ReplacingMergeTree
ORDER BY (user_id, updated_at)
SETTINGS index_granularity = 8192;
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

جدول موتور کافکا را در ClickHouse تنظیم کنید

CREATE TABLE kafka_shop.kafka__account_type_switch (
    `jsonString` String
) ENGINE = Kafka
SETTINGS kafka_broker_list = 'broker:29092',
        kafka_topic_list = 'raw_data.shop.public.users', 
        kafka_group_name = 'raw_date_clickhouse',
        kafka_format = 'JSONAsString';
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

نمای متریال شده ایجاد کنید

CREATE MATERIALIZED VIEW kafka_shop.consumer__account_type_switch TO shop.account_type_switch (
    user_id UInt32,
    username String,
    before_account_type String,
    after_account_type String,
    updated_at DateTime,
    created_at DateTime
) AS 
SELECT
    JSONExtract(JSONExtract(jsonString, 'after', 'Nullable(String)'), 'user_id', 'Nullable(UInt32)') AS user_id,
    JSONExtract(JSONExtract(jsonString, 'after', 'Nullable(String)'), 'username', 'Nullable(String)') AS username,
    JSONExtract(JSONExtract(jsonString, 'before', 'Nullable(String)'), 'account_type', 'Nullable(String)') AS before_account_type,
    JSONExtract(JSONExtract(jsonString, 'after', 'Nullable(String)'), 'account_type', 'Nullable(String)') AS after_account_type,
    toDateTime(JSONExtract(JSONExtract(jsonString, 'after', 'Nullable(String)'), 'updated_at', 'Nullable(UInt64)') / 1000000) AS updated_at,
    toDateTime(JSONExtract(JSONExtract(jsonString, 'after', 'Nullable(String)'), 'created_at', 'Nullable(UInt64)') / 1000000) AS created_at
FROM kafka_shop.kafka__account_type_switch
WHERE (before_account_type != after_account_type) AND (JSONExtract(jsonString, 'op', 'Nullable(String)') = 'u');
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

داده ها را در ClickHouse تأیید کنید

نتیجه کلیک هاوس

نتیجه

پیاده‌سازی CDC با Debezium و ClickHouse پلتفرم تجارت الکترونیک شما را قادر می‌سازد تا تاریخچه کامل تغییر نوع حساب کاربری را حفظ کند. این راه‌اندازی حسابرسی دقیق، انطباق با مقررات، و تجزیه و تحلیل پیشرفته را قدرت می‌دهد و بینش‌های ارزشمندی را در مورد رفتار کاربر و روندهای تجاری ارائه می‌دهد.

با دنبال کردن این مراحل، می‌توانید شیوه‌های مدیریت داده‌های خود را تقویت کنید و از ردیابی تغییرات در زمان واقعی برای هدایت تصمیم‌گیری آگاهانه در کسب‌وکار خود استفاده کنید.

نوشته های مشابه

دیدگاهتان را بنویسید

نشانی ایمیل شما منتشر نخواهد شد. بخش‌های موردنیاز علامت‌گذاری شده‌اند *

دکمه بازگشت به بالا