برنامه نویسی

ساختن یک سیستم صف پیام قوی با کافکا و گوفام

سلام Devs همکار! 👋 امروز ، ما در حال تبدیل شدن به نحوه ساخت یک سیستم صف پیام قوی با استفاده از Apache Kafka با چارچوب Goframe هستیم. این که آیا شما در حال استفاده از جریان داده های با توان بالا ، ساختمان معماری های محور رویداد هستید ، یا فقط می خواهید خدمات خود را جدا کنید ، این راهنما شما را تحت پوشش قرار داده است!

آنچه خواهیم ساخت

ما یک سیستم صف پیام کامل ایجاد خواهیم کرد که می تواند:

  • تولید کنندگان و مصرف کنندگان کافکا را در Goframe راه اندازی کنید
  • انتشار پیام و مصرف پیام
  • اجرای خطای قوی
  • مکانیسم های آزمایش مجدد را برای عملیات ناموفق اضافه کنید

پیش نیازها

قبل از شروع ، اطمینان حاصل کنید که:

  • روی دستگاه خود نصب کنید
  • درک اساسی از صف های GO و پیام
  • Kafka و Zookeeper که به صورت محلی در حال اجرا هستند یا یک خوشه Kafka که می توانید به آن وصل شوید
  • چارچوب Goframe نصب شده

کافکا سریع آغازگر

اگر تازه وارد کافکا هستید ، آنچه را که باید بدانید در اینجا آورده شده است. Kafka یک سیستم پیام رسانی توزیع شده است که در استفاده از جریان داده های با توان بالا برتری دارد. مفاهیم کلیدی عبارتند از:

  • تولید کننده: فرستنده پیام شما
  • مصرف کننده: گیرنده پیام شما
  • کارگزار: سرور کافکا
  • موضوع: دسته بندی برای پیام های شما
  • پارتیشن: چگونه مباحث برای مقیاس پذیری تقسیم می شوند

بیایید کد کنیم! 💻

1. ابتدا مشتری Kafka را نصب کنید

go get github.com/IBM/sarama
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

2. پیکربندی خود را تنظیم کنید

یک فایل پیکربندی ایجاد کنید که Kafka از آن استفاده خواهد کرد:

# config.yaml
kafka:
  address: 
    - "localhost:9092"
  topic: "my_topic"
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

3. تولید کننده خود را ایجاد کنید

package main

import (
    "context"
    "github.com/gogf/gf/v2/frame/g"
    "github.com/gogf/gf/v2/os/gctx"
    "github.com/IBM/sarama"
)

var kafkaProducer sarama.SyncProducer

func initKafkaProducer(ctx context.Context) {
    // Create producer config
    config := sarama.NewConfig()
    config.Producer.Return.Successes = true

    // Get address from config
    address := g.Cfg().MustGet(ctx, "kafka.address").Strings()

    // Initialize producer
    producer, err := sarama.NewSyncProducer(address, config)
    if err != nil {
       panic(err)
    }
    kafkaProducer = producer
}
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

4. مصرف کننده خود را تنظیم کنید

var kafkaConsumer sarama.Consumer

func initKafkaConsumer(ctx context.Context) {
    address := g.Cfg().MustGet(ctx, "kafka.address").Strings()

    consumer, err := sarama.NewConsumer(address, nil)
    if err != nil {
       panic(err)
    }
    kafkaConsumer = consumer
}
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

ساخت آن آماده تولید

ارسال پیام با رسیدگی به خطا

func sendMessage(ctx context.Context, message string) error {
    msg := &sarama.ProducerMessage{
        Topic: g.Cfg().MustGet(ctx, "kafka.topic").String(),
        Value: sarama.StringEncoder(message),
    }

    partition, offset, err := kafkaProducer.SendMessage(msg)
    if err != nil {
        return fmt.Errorf("failed to send message: %w", err)
    }

    g.Log().Infof(ctx, "Message sent successfully partition=%d, offset=%d", 
        partition, offset)
    return nil
}
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

مصرف پیام هایی مانند یک حرفه ای

در اینجا یک اجرای قوی مصرف کننده با استفاده از خطا و خاموش کردن برازنده وجود دارد:

func consumeMessages(ctx context.Context) {
    topic := g.Cfg().MustGet(ctx, "kafka.topic").String()
    partitionList, err := kafkaConsumer.Partitions(topic)
    if err != nil {
        g.Log().Errorf(ctx, "Failed to get partition list: %v", err)
        return
    }

    // Create a channel to handle shutdown
    done := make(chan bool)

    for partition := range partitionList {
        // Start a goroutine for each partition
        go func(partition int32) {
            pc, err := kafkaConsumer.ConsumePartition(topic, partition, 
                sarama.OffsetNewest)
            if err != nil {
                g.Log().Errorf(ctx, "Failed to start consumer: %v", err)
                return
            }

            defer pc.Close()

            for {
                select {
                case msg := <-pc.Messages():
                    handleMessageWithRetry(ctx, msg)
                case <-ctx.Done():
                    done <- true
                    return
                }
            }
        }(int32(partition))
    }

    <-done // Wait for shutdown signal
}
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

افزودن منطق آزمایش مجدد

در اینجا یک مکانیزم آزمایشگاه قوی برای رسیدگی به خرابی های پردازش پیام وجود دارد:

const (
    maxRetries = 3
    retryDelay = time.Second
)

func handleMessageWithRetry(ctx context.Context, msg *sarama.ConsumerMessage) {
    var err error
    for attempt := 0; attempt < maxRetries; attempt++ {
        err = processMessage(ctx, msg)
        if err == nil {
            // Success! Let's mark the message as processed
            markMessageProcessed(msg)
            return
        }

        g.Log().Warningf(ctx, 
            "Failed to process message (attempt %d/%d): %v", 
            attempt+1, maxRetries, err)

        if attempt < maxRetries-1 {
            time.Sleep(retryDelay * time.Duration(attempt+1))
        }
    }

    // If we get here, all retries failed
    g.Log().Errorf(ctx, 
        "Failed to process message after %d attempts: %v", 
        maxRetries, err)

    // Here you might want to:
    // 1. Send to a dead letter queue
    // 2. Store in an error log
    // 3. Trigger an alert
    handleFailedMessage(ctx, msg, err)
}
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

نکات حرفه ای

باغ وحش اول: همیشه اطمینان حاصل کنید که Zookeeper قبل از شروع کافکا در حال اجرا است:

   # Start ZooKeeper first
   ./zookeeper-server-start.sh config/zookeeper.properties

   # Then start Kafka
   ./kafka-server-start.sh config/server.properties
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

مصرف کنندگان خود را کنترل کنید: برای اطمینان از اینکه سیستم شما با جریان پیام همراه است ، تاخیر مصرف کننده را پیگیری کنید.

خاموش کردن برازنده: همیشه تولید کنندگان و مصرف کنندگان خود را به درستی ببندید:

   defer func() {
       if err := kafkaProducer.Close(); err != nil {
           g.Log().Errorf(ctx, "Failed to close producer: %v", err)
       }
       if err := kafkaConsumer.Close(); err != nil {
           g.Log().Errorf(ctx, "Failed to close consumer: %v", err)
       }
   }()
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

چه چیزی بعدی؟ 🚀

اکنون که یک پایه محکم دارید ، ممکن است بخواهید اکتشاف کنید:

  • تنظیم فشرده سازی پیام برای عملکرد بهتر
  • اجرای صف نامه های مرده برای پیام های ناموفق
  • اضافه کردن معیارها و نظارت
  • راه اندازی چندین گروه مصرف کننده

بسته بندی

ما با استفاده از Kafka و Goframe یک سیستم صف پیام قوی ایجاد کرده ایم! این ترکیب یک راه حل مقیاس پذیر و قابل اعتماد برای رسیدگی به پردازش پیام با توان بالا ارائه می دهد.

آیا کافکا را در پروژه های GO خود پیاده سازی کرده اید؟ با چه چالش هایی روبرو شدید؟ تجربیات خود را در نظرات زیر به اشتراک بگذارید! 👇


این مفید است؟ برای آموزش بیشتر مرا دنبال کنید و فراموش نکنید که این پست!

نوشته های مشابه

دیدگاهتان را بنویسید

نشانی ایمیل شما منتشر نخواهد شد. بخش‌های موردنیاز علامت‌گذاری شده‌اند *

دکمه بازگشت به بالا