ساختن یک سیستم صف پیام قوی با کافکا و گوفام

سلام Devs همکار! 👋 امروز ، ما در حال تبدیل شدن به نحوه ساخت یک سیستم صف پیام قوی با استفاده از Apache Kafka با چارچوب Goframe هستیم. این که آیا شما در حال استفاده از جریان داده های با توان بالا ، ساختمان معماری های محور رویداد هستید ، یا فقط می خواهید خدمات خود را جدا کنید ، این راهنما شما را تحت پوشش قرار داده است!
آنچه خواهیم ساخت
ما یک سیستم صف پیام کامل ایجاد خواهیم کرد که می تواند:
- تولید کنندگان و مصرف کنندگان کافکا را در Goframe راه اندازی کنید
- انتشار پیام و مصرف پیام
- اجرای خطای قوی
- مکانیسم های آزمایش مجدد را برای عملیات ناموفق اضافه کنید
پیش نیازها
قبل از شروع ، اطمینان حاصل کنید که:
- روی دستگاه خود نصب کنید
- درک اساسی از صف های GO و پیام
- Kafka و Zookeeper که به صورت محلی در حال اجرا هستند یا یک خوشه Kafka که می توانید به آن وصل شوید
- چارچوب Goframe نصب شده
کافکا سریع آغازگر
اگر تازه وارد کافکا هستید ، آنچه را که باید بدانید در اینجا آورده شده است. Kafka یک سیستم پیام رسانی توزیع شده است که در استفاده از جریان داده های با توان بالا برتری دارد. مفاهیم کلیدی عبارتند از:
- تولید کننده: فرستنده پیام شما
- مصرف کننده: گیرنده پیام شما
- کارگزار: سرور کافکا
- موضوع: دسته بندی برای پیام های شما
- پارتیشن: چگونه مباحث برای مقیاس پذیری تقسیم می شوند
بیایید کد کنیم! 💻
1. ابتدا مشتری Kafka را نصب کنید
go get github.com/IBM/sarama
2. پیکربندی خود را تنظیم کنید
یک فایل پیکربندی ایجاد کنید که Kafka از آن استفاده خواهد کرد:
# config.yaml
kafka:
address:
- "localhost:9092"
topic: "my_topic"
3. تولید کننده خود را ایجاد کنید
package main
import (
"context"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/gctx"
"github.com/IBM/sarama"
)
var kafkaProducer sarama.SyncProducer
func initKafkaProducer(ctx context.Context) {
// Create producer config
config := sarama.NewConfig()
config.Producer.Return.Successes = true
// Get address from config
address := g.Cfg().MustGet(ctx, "kafka.address").Strings()
// Initialize producer
producer, err := sarama.NewSyncProducer(address, config)
if err != nil {
panic(err)
}
kafkaProducer = producer
}
4. مصرف کننده خود را تنظیم کنید
var kafkaConsumer sarama.Consumer
func initKafkaConsumer(ctx context.Context) {
address := g.Cfg().MustGet(ctx, "kafka.address").Strings()
consumer, err := sarama.NewConsumer(address, nil)
if err != nil {
panic(err)
}
kafkaConsumer = consumer
}
ساخت آن آماده تولید
ارسال پیام با رسیدگی به خطا
func sendMessage(ctx context.Context, message string) error {
msg := &sarama.ProducerMessage{
Topic: g.Cfg().MustGet(ctx, "kafka.topic").String(),
Value: sarama.StringEncoder(message),
}
partition, offset, err := kafkaProducer.SendMessage(msg)
if err != nil {
return fmt.Errorf("failed to send message: %w", err)
}
g.Log().Infof(ctx, "Message sent successfully partition=%d, offset=%d",
partition, offset)
return nil
}
مصرف پیام هایی مانند یک حرفه ای
در اینجا یک اجرای قوی مصرف کننده با استفاده از خطا و خاموش کردن برازنده وجود دارد:
func consumeMessages(ctx context.Context) {
topic := g.Cfg().MustGet(ctx, "kafka.topic").String()
partitionList, err := kafkaConsumer.Partitions(topic)
if err != nil {
g.Log().Errorf(ctx, "Failed to get partition list: %v", err)
return
}
// Create a channel to handle shutdown
done := make(chan bool)
for partition := range partitionList {
// Start a goroutine for each partition
go func(partition int32) {
pc, err := kafkaConsumer.ConsumePartition(topic, partition,
sarama.OffsetNewest)
if err != nil {
g.Log().Errorf(ctx, "Failed to start consumer: %v", err)
return
}
defer pc.Close()
for {
select {
case msg := <-pc.Messages():
handleMessageWithRetry(ctx, msg)
case <-ctx.Done():
done <- true
return
}
}
}(int32(partition))
}
<-done // Wait for shutdown signal
}
افزودن منطق آزمایش مجدد
در اینجا یک مکانیزم آزمایشگاه قوی برای رسیدگی به خرابی های پردازش پیام وجود دارد:
const (
maxRetries = 3
retryDelay = time.Second
)
func handleMessageWithRetry(ctx context.Context, msg *sarama.ConsumerMessage) {
var err error
for attempt := 0; attempt < maxRetries; attempt++ {
err = processMessage(ctx, msg)
if err == nil {
// Success! Let's mark the message as processed
markMessageProcessed(msg)
return
}
g.Log().Warningf(ctx,
"Failed to process message (attempt %d/%d): %v",
attempt+1, maxRetries, err)
if attempt < maxRetries-1 {
time.Sleep(retryDelay * time.Duration(attempt+1))
}
}
// If we get here, all retries failed
g.Log().Errorf(ctx,
"Failed to process message after %d attempts: %v",
maxRetries, err)
// Here you might want to:
// 1. Send to a dead letter queue
// 2. Store in an error log
// 3. Trigger an alert
handleFailedMessage(ctx, msg, err)
}
نکات حرفه ای
باغ وحش اول: همیشه اطمینان حاصل کنید که Zookeeper قبل از شروع کافکا در حال اجرا است:
# Start ZooKeeper first
./zookeeper-server-start.sh config/zookeeper.properties
# Then start Kafka
./kafka-server-start.sh config/server.properties
مصرف کنندگان خود را کنترل کنید: برای اطمینان از اینکه سیستم شما با جریان پیام همراه است ، تاخیر مصرف کننده را پیگیری کنید.
خاموش کردن برازنده: همیشه تولید کنندگان و مصرف کنندگان خود را به درستی ببندید:
defer func() {
if err := kafkaProducer.Close(); err != nil {
g.Log().Errorf(ctx, "Failed to close producer: %v", err)
}
if err := kafkaConsumer.Close(); err != nil {
g.Log().Errorf(ctx, "Failed to close consumer: %v", err)
}
}()
چه چیزی بعدی؟ 🚀
اکنون که یک پایه محکم دارید ، ممکن است بخواهید اکتشاف کنید:
- تنظیم فشرده سازی پیام برای عملکرد بهتر
- اجرای صف نامه های مرده برای پیام های ناموفق
- اضافه کردن معیارها و نظارت
- راه اندازی چندین گروه مصرف کننده
بسته بندی
ما با استفاده از Kafka و Goframe یک سیستم صف پیام قوی ایجاد کرده ایم! این ترکیب یک راه حل مقیاس پذیر و قابل اعتماد برای رسیدگی به پردازش پیام با توان بالا ارائه می دهد.
آیا کافکا را در پروژه های GO خود پیاده سازی کرده اید؟ با چه چالش هایی روبرو شدید؟ تجربیات خود را در نظرات زیر به اشتراک بگذارید! 👇
این مفید است؟ برای آموزش بیشتر مرا دنبال کنید و فراموش نکنید که این پست!