برنامه نویسی

ذخیره داده های دستگاه اینترنت اشیا – انجمن DEV

Summarize this content to 400 words in Persian Lang
در پست قبلی نحوه دریافت اطلاعات دستگاه iot از یک کارگزار MQTT را نشان دادیم. در این پست داده ها را در پایگاه داده ذخیره می کنیم.

در یک سیستم قوی، ممکن است انتخاب کنیم که رویدادهای داده خام را در یک دریاچه داده ذخیره کنیم. شاید در آینده به بررسی آن بپردازیم. اما در حال حاضر برای سادگی آن را در PostGres ذخیره می کنیم.

پست قبلی دریافت داده های خام و باز کردن آن را به ساختاری نشان داد که قبلاً با برچسب های gorm حاشیه نویسی شده بود. Gorm یک ORM محبوب برای Go است. اگر با آن آشنایی ندارید، می توانید برای اطلاعات بیشتر اینجا.

type IoTDeviceMessage struct {
BaseModel
Time time.Time `json:”time” gorm:”index”`
DeviceID string `json:”device_id”`
DeviceType string `json:”device_type”`
DeviceData json.RawMessage `json:”device_data”`
}

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

بنابراین تنها کاری که باید انجام دهیم این است که اتصال Postgres را پیکربندی کنیم، سپس از gorm برای ذخیره داده های رویداد استفاده کنیم.

func setupPostgres(logger *zerolog.Logger) *Repository {
dbHost := os.Getenv(“POSTGRES_HOST”)
dbName := os.Getenv(“POSTGRES_DB”)
dbPort := os.Getenv(“POSTGRES_PORT”)
dbUser := os.Getenv(“POSTGRES_USER”)
dbPassword := os.Getenv(“POSTGRES_PASSWORD”)
dsn := fmt.Sprintf(“host=%s user=%s password=%s dbname=%s port=%s sslmode=disable TimeZone=UTC”,
dbHost, dbUser, dbPassword, dbName, dbPort)
logger.Info().Msg(fmt.Sprintf(“Connecting to PostgreSQL at %s”, dsn))
db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{})
if err != nil {
logger.Fatal().Err(err).Msg(“failed to connect to database”)
}

// Auto-migrate the schema
err = db.AutoMigrate(&IoTDataEvent{}, &IoTRawDeviceMessage{}, &DeviceModel{}, &TempRHDevice{})
if err != nil {
logger.Fatal().Err(err).Msg(“failed to migrate models”)
}

sqlDB, err := db.DB()
sqlDB.SetMaxIdleConns(10)
sqlDB.SetMaxOpenConns(100)
sqlDB.SetConnMaxLifetime(time.Hour)

repo := NewRepository(db, logger)
return repo
}

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

در اینجا ما اتصال Postgres را راه اندازی می کنیم. توجه داشته باشید که ما از متغیرهای محیطی برای ذخیره اطلاعات حساس خود استفاده می کنیم. این یک روش خوب برای سیستم های تولیدی است، خواه کانتینری باشند یا نه.

ما همچنین ساختاری به نام Repository را مقداردهی اولیه می کنیم. این ساختار شامل روش های ذخیره سازی و بازیابی واقعی ما است. این باعث جدایی ما از پیکربندی postgres می شود.

type Repository struct {
db *gorm.DB
logger *zerolog.Logger
}

func NewRepository(db *gorm.DB, logger *zerolog.Logger) *Repository {
return &Repository{db: db, logger: logger}
}

func (r *Repository) Close() {
sqlDb, err := r.db.DB()
if err != nil {
r.logger.Error().Err(err).Msg(“failed to close database”)
return
}
_ = sqlDb.Close()
}

// Message-related functions

func (r *Repository) CreateMessage(message *IoTDeviceMessage) error {
return r.db.Create(message).Error
}

func (r *Repository) GetMessagesByDeviceID(deviceID uint, limit int) ([]IoTDeviceMessage, error) {
var messages []IoTDeviceMessage
err := r.db.Where(“device_id = ?”, deviceID).Order(“timestamp desc”).Limit(limit).Find(&messages).Error
return messages, err
}

func (r *Repository) DeleteMessagesByDeviceID(deviceID uint) error {
return r.db.Where(“device_id = ?”, deviceID).Delete(&IoTDeviceMessage{}).Error
}

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

اکنون پیام فقط باید ادامه یابد. از آنجایی که از الگوی خط لوله برای پردازش پیام ها استفاده می کنیم، مرحله ماندگاری را به مرحله جدیدی در خط لوله اضافه می کنیم.

// pipeline stage to persist the message
func persistIoTEvent(ctx context.Context, logger *zerolog.Logger, repo *Repository, input <-chan IoTRawDeviceMessage) chan IoTRawDeviceMessage {
out := make(chan IoTRawDeviceMessage)
go func() {
defer close(out)
for iotMsg := range input {
logger.Info().Msg(fmt.Sprintf(“Persist iot msg for device: %s”, iotMsg.DeviceID))
err := repo.CreateMessage(&iotMsg)
if err != nil {
logger.Error().Err(err).Msg(“Error creating IoTRawDeviceMessage”)
}
}
}()
return out
}

finalChan := persistIoTEvent(ctx, logger, repo, processMsg(ctx, logger, mqttMsgChan))
for iotMsg := range finalChan {
// now we have the IoTRawDeviceMessage that has been persisted
logger.Info().Msg(fmt.Sprintf(“Received iot msg: %+v”, iotMsg))
// do something like check for alert conditions
}

وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

این تمام چیزی است که در آن وجود دارد.

کد این مورد را می توانید در اینجا پیدا کنید. می توانید با همان کد ناشر پست قبلی از آن استفاده کنید. حتما تنظیمات Postgres خود را به عنوان متغیرهای محیطی پیکربندی کنید.

در پست قبلی نحوه دریافت اطلاعات دستگاه iot از یک کارگزار MQTT را نشان دادیم. در این پست داده ها را در پایگاه داده ذخیره می کنیم.

در یک سیستم قوی، ممکن است انتخاب کنیم که رویدادهای داده خام را در یک دریاچه داده ذخیره کنیم. شاید در آینده به بررسی آن بپردازیم. اما در حال حاضر برای سادگی آن را در PostGres ذخیره می کنیم.

پست قبلی دریافت داده های خام و باز کردن آن را به ساختاری نشان داد که قبلاً با برچسب های gorm حاشیه نویسی شده بود. Gorm یک ORM محبوب برای Go است. اگر با آن آشنایی ندارید، می توانید برای اطلاعات بیشتر اینجا.

type IoTDeviceMessage struct {
    BaseModel
    Time       time.Time       `json:"time" gorm:"index"`
    DeviceID   string          `json:"device_id"`
    DeviceType string          `json:"device_type"`
    DeviceData json.RawMessage `json:"device_data"`
}
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

بنابراین تنها کاری که باید انجام دهیم این است که اتصال Postgres را پیکربندی کنیم، سپس از gorm برای ذخیره داده های رویداد استفاده کنیم.

func setupPostgres(logger *zerolog.Logger) *Repository {
    dbHost := os.Getenv("POSTGRES_HOST")
    dbName := os.Getenv("POSTGRES_DB")
    dbPort := os.Getenv("POSTGRES_PORT")
    dbUser := os.Getenv("POSTGRES_USER")
    dbPassword := os.Getenv("POSTGRES_PASSWORD")
    dsn := fmt.Sprintf("host=%s user=%s password=%s dbname=%s port=%s sslmode=disable TimeZone=UTC",
        dbHost, dbUser, dbPassword, dbName, dbPort)
    logger.Info().Msg(fmt.Sprintf("Connecting to PostgreSQL at %s", dsn))
    db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{})
    if err != nil {
        logger.Fatal().Err(err).Msg("failed to connect to database")
    }

    // Auto-migrate the schema
    err = db.AutoMigrate(&IoTDataEvent{}, &IoTRawDeviceMessage{}, &DeviceModel{}, &TempRHDevice{})
    if err != nil {
        logger.Fatal().Err(err).Msg("failed to migrate models")
    }

    sqlDB, err := db.DB()
    sqlDB.SetMaxIdleConns(10)
    sqlDB.SetMaxOpenConns(100)
    sqlDB.SetConnMaxLifetime(time.Hour)

    repo := NewRepository(db, logger)
    return repo
}
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

در اینجا ما اتصال Postgres را راه اندازی می کنیم. توجه داشته باشید که ما از متغیرهای محیطی برای ذخیره اطلاعات حساس خود استفاده می کنیم. این یک روش خوب برای سیستم های تولیدی است، خواه کانتینری باشند یا نه.

ما همچنین ساختاری به نام Repository را مقداردهی اولیه می کنیم. این ساختار شامل روش های ذخیره سازی و بازیابی واقعی ما است. این باعث جدایی ما از پیکربندی postgres می شود.

type Repository struct {
    db     *gorm.DB
    logger *zerolog.Logger
}

func NewRepository(db *gorm.DB, logger *zerolog.Logger) *Repository {
    return &Repository{db: db, logger: logger}
}

func (r *Repository) Close() {
    sqlDb, err := r.db.DB()
    if err != nil {
        r.logger.Error().Err(err).Msg("failed to close database")
        return
    }
    _ = sqlDb.Close()
}
...
// Message-related functions

func (r *Repository) CreateMessage(message *IoTDeviceMessage) error {
    return r.db.Create(message).Error
}

func (r *Repository) GetMessagesByDeviceID(deviceID uint, limit int) ([]IoTDeviceMessage, error) {
    var messages []IoTDeviceMessage
    err := r.db.Where("device_id = ?", deviceID).Order("timestamp desc").Limit(limit).Find(&messages).Error
    return messages, err
}

func (r *Repository) DeleteMessagesByDeviceID(deviceID uint) error {
    return r.db.Where("device_id = ?", deviceID).Delete(&IoTDeviceMessage{}).Error
}
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

اکنون پیام فقط باید ادامه یابد. از آنجایی که از الگوی خط لوله برای پردازش پیام ها استفاده می کنیم، مرحله ماندگاری را به مرحله جدیدی در خط لوله اضافه می کنیم.

// pipeline stage to persist the message
func persistIoTEvent(ctx context.Context, logger *zerolog.Logger, repo *Repository, input <-chan IoTRawDeviceMessage) chan IoTRawDeviceMessage {
    out := make(chan IoTRawDeviceMessage)
    go func() {
        defer close(out)
        for iotMsg := range input {
            logger.Info().Msg(fmt.Sprintf("Persist iot msg for device: %s", iotMsg.DeviceID))
            err := repo.CreateMessage(&iotMsg)
            if err != nil {
                logger.Error().Err(err).Msg("Error creating IoTRawDeviceMessage")
            }
        }
    }()
    return out
}
...
finalChan := persistIoTEvent(ctx, logger, repo, processMsg(ctx, logger, mqttMsgChan))
        for iotMsg := range finalChan {
            // now we have the IoTRawDeviceMessage that has been persisted
            logger.Info().Msg(fmt.Sprintf("Received iot msg: %+v", iotMsg))
            // do something like check for alert conditions
        }
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

این تمام چیزی است که در آن وجود دارد.

کد این مورد را می توانید در اینجا پیدا کنید. می توانید با همان کد ناشر پست قبلی از آن استفاده کنید. حتما تنظیمات Postgres خود را به عنوان متغیرهای محیطی پیکربندی کنید.

نوشته های مشابه

دیدگاهتان را بنویسید

نشانی ایمیل شما منتشر نخواهد شد. بخش‌های موردنیاز علامت‌گذاری شده‌اند *

دکمه بازگشت به بالا