برنامه نویسی

اشیاء S3 را با توابع مرحله ای با استفاده از CDK + TS پردازش کنید

پیشگفتار

هر زمان که کاربر نیاز به راه‌اندازی اتوماسیون داشت، پیشرو فناوری ما باید در یک فرآیند دستی گسترده که حدود دو روز طول می‌کشید تا همه چیز را پیکربندی کند، فرو می‌رفت.
این را تصویر کنید: یک فایل اکسل با بیش از شش تب و دستکاری داده ها با استفاده از توابع اکسل (یک کابوس صفحه گسترده 👻).
با دیدن این جنون، متوجه شدم که می‌توانیم این فرآیند را خودکار کنیم (به من اعتماد کنید، برای رسیدن به این نتیجه بحث داغی بود). بنابراین، معماری زیر متولد شد.

به یاد داشته باشید، ما تعدادی کد اختصاصی داریم، بنابراین این یک نسخه ساده شده و کمی تخیلی برای نشان دادن راه حل است.

معماری

راه حل

ما باید فرآیندی را تنظیم کنیم تا هر بار که یک شی در S3 ایجاد می‌شود، یک تابع Step را راه‌اندازی کند. با این حال، فراخوانی مستقیم یک تابع مرحله در حال حاضر پشتیبانی نمی شود. بنابراین، من یک قانون EventBridge برای نظارت بر S3 برای ایجاد شی ایجاد کردم. هنگامی که یک شی ایجاد شد، قانون تابع Step را برای پردازش شی با استفاده از یکی از وظایف Lambda فراخوانی می کند.
من از نوع Express استفاده کردم زیرا نیاز داشتم که این یک روند سریع باشد. با این تصمیم، مجبور شدم برخی از مراحل را موازی کنم (بله، اتوماسیون من این اجازه را می داد)، بنابراین، از حالت موازی استفاده کردم. در نهایت، اگر خطایی رخ داد، یک اعلان با استفاده از یک موضوع برای آن تنظیم کردم.

اکسپرس x استاندارد
استاندارد: برای فرآیندهای طولانی مدت با ضمانت اجرای دقیقاً یک بار و ویژگی های بازیابی پیشرفته.
بیان: برای فرآیندهای سریع و پرتوان با حداقل یک ضمانت اجرا و صورتحساب بر اساس زمان و حافظه.

دست هایت را کثیف کردن

مخلوط کردن خمیر
بیایید آن را با استفاده از CDK با Typescript پیاده سازی کنیم!

بیایید با ایجاد سطل خود و وارد کردن یک موضوع برای اطلاع از هرگونه خطا شروع کنیم

const userNotificationTopic = Topic.fromTopicArn(
  this,
  'userNotificationTopic',
  Fn.importValue('infrastructure::sns::user-notification-topic::arn'),
)

const bucket = new Bucket(this, 'bucket', {
  bucketName: 'automation-configuration',
  removalPolicy: RemovalPolicy.DESTROY,
})
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

حال، بیایید Lambdas خود را ایجاد کنیم و مجوز خواندن را به Lambda اعطا کنیم که به شی S3 دسترسی خواهد داشت:

const etlLambda = new NodejsFunction(this, 'etl-lambda', {
  entry: 'src/lambda/etl/etl.handler.ts',
  functionName: 'etl',
  timeout: Duration.seconds(30),
  architecture: Architecture.ARM_64,
})

bucket.grantReadWrite(
  new Alias(this, id.concat('alias'), {
    aliasName: 'current',
    version: etlLambda.currentVersion,
  }),
)

const categoriesLambda = new NodejsFunction(this, 'insert-categories-lambda', {
  entry: 'src/lambda/categories/categories.handler.ts',
  functionName: 'insert-categories',
  timeout: Duration.seconds(30),
  architecture: Architecture.ARM_64,
})

const hospitalsLambda = new NodejsFunction(this, 'hospitals-categories-lambda', {
  entry: 'src/lambda/categories/categories.handler.ts',
  functionName: 'insert-categories',
  timeout: Duration.seconds(30),
  architecture: Architecture.ARM_64,
})
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

برای فراخوانی یک Lambda در تابع Step خود، باید یک Lambda Invoke برای هر یک از Lambda ایجاد کنیم. بنابراین، من یک تابع ایجاد کردم تا از تکرار آن برای هر Lambda جلوگیری شود:

private mountLambdaInvokes(
    lambdasInvoke: Array<{
      function: IFunction
      name?: string
      output?: string
    }>,
  ) {
    return lambdasInvoke.map(lambdaInvoke => {
      return new LambdaInvoke(this, `${lambdaInvoke.name || lambdaInvoke.function.functionName}-task`, {
        lambdaFunction: lambdaInvoke.function,
        inputPath: '$',
        outputPath: lambdaInvoke?.output || '$',
      })
    })
  }
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

استفاده كردن mountLambdaInvokes:

const [etlTask, categoriesTask, hospitalsTask] = this.mountLambdaInvokes([
  { function: etlLambda, output: '$.Payload' },
  { function: categoriesLambda },
  { function: hospitalsLambda },
])
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

ما باید مرحله شکست خود و SnsPublish را ایجاد کنیم تا رویدادهای ناموفق را به موضوعی که قبلا وارد کردیم ارسال کنیم:

const errorTopicConfig = {
  topic: userNotificationTopic,
  subject: 'Automation Config Failed 😥',
  message: TaskInput.fromObject({
    message: 'Automation Config Failed due to an unexpected error.',
    cause: 'Unexpected Error',
    channel: 'email',
    destination: { ToAddresses: ['suporte@somemail.com'] },
  }),
}

const publishFailed = (publishFailedId: string) =>
  new SnsPublish(this, `automation-config-sns-failed-${publishFailedId}`, errorTopicConfig)

const jobFailed = new Fail(this, 'automation-config-job-failed', {
  cause: 'Unexpected Error',
})
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

پس از انجام این کار، بیایید حالت موازی خود را راه اندازی کنیم. هر شاخه یک لامبدا خواهد بود که به طور همزمان پردازش می شود. ما همچنین یک تلاش مجدد برای تلاش مجدد در صورت بروز هر گونه مشکل و مشکلی برای رسیدگی به هر گونه شکست در این فرآیند موازی اضافه خواهیم کرد:

const hospitalsCategoriesParallel = new Parallel(this, 'auto-config-exams-parallel-map')
  .branch(categoriesTask)
  .branch(hospitalsTask)
  .addRetry({ errors: ['States.ALL'], interval: Duration.seconds(5), maxAttempts: 1 })
  .addCatch(publishFailed('exams').next(jobFailed), {
    errors: ['States.ALL'],
  })
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

در این مرحله، ما تعریف وظایف خود، یک گروه گزارش و ماشین وضعیت را ایجاد می کنیم که اساساً تابع Step ما است:

const definition = etlTask.next(hospitalsCategoriesParallel)

const logGroup = new LogGroup(this, 'automation-configuration-log-group', {
  retention: RetentionDays.ONE_WEEK,
  removalPolicy: RemovalPolicy.DESTROY,
})

const stateMachine = new StateMachine(this, `${id}-state-machine`, {
  definition,
  timeout: Duration.minutes(5),
  stateMachineName: 'automation-configuration',
  stateMachineType: StateMachineType.EXPRESS,
  logs: {
    destination: logGroup,
    includeExecutionData: true,
    level: LogLevel.ALL,
  },
})
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

پس از این، باید قوانینی را برای مرتبط کردن EventBridge با S3 و اجرای تابع Step خود ایجاد کنیم:

const s3EventRule = new Rule(this, 'automation-config-s3-event-rule', {
  ruleName: 'automation-config-s3-event-rule',
})

const eventRole = new Role(this, 'eventRole', {
  assumedBy: new ServicePrincipal('events.amazonaws.com'),
})

stateMachine.grantStartExecution(eventRole)
s3EventRule.addTarget(
  new SfnStateMachine(stateMachine, {
    input: RuleTargetInput.fromObject({
      detail: EventField.fromPath('$.detail'),
    }),
    role: eventRole,
  }),
)
s3EventRule.addEventPattern({
  source: ['aws.s3'],
  detailType: ['Object Created'],
  detail: {
    bucket: {
      name: [bucket.bucketName],
    },
    object: {
      key: [
        {
          wildcard: 'csv/automation-configuration/*.csv',
        },
      ],
    },
  },
})
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

در نهایت، بیایید یک قانون برای گوش دادن به هر گونه شکست غیرمنتظره ای که در عملکرد Step ما از طریق EventBridge نیز رخ می دهد ایجاد کنیم، بنابراین ماهیت رویداد محوری را که بسیار دوست داریم حفظ کنیم:

 const unexpectedFailRule = new Rule(this, 'exam-automation-config-unexpected-fail-rule', {
      ruleName: 'exam-automation-config-unexpected-fail-rule',
    })
    unexpectedFailRule.addTarget(
      new SnsTopic(userNotificationTopic, {
        message: RuleTargetInput.fromObject({
          subject: 'Exam Automation Config Failed 😥',
          message: 'Exam Automation Config Failed due to an unexpected error.',
          cause: 'Unexpected Error',
          channel: 'email',
          destination: { ToAddresses: ['it@somemail.com'] },
        }),
      }),
    )

    unexpectedFailRule.addEventPattern({
      source: ['aws.states'],
      detailType: ['Step Functions Execution Status Change'],
      detail: {
        stateMachineArn: [stateMachine.stateMachineArn],
        status: ['FAILED', 'TIMED_OUT', 'ABORTED'],
      },
    })
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

قرار دادن همه چیز در یک کلاس تا بتوانید کل جریان یکپارچه را درک کنید:

import { Duration, Fn, RemovalPolicy } from 'aws-cdk-lib'
import { Rule, RuleTargetInput, EventField } from 'aws-cdk-lib/aws-events'
import { SfnStateMachine, SnsTopic } from 'aws-cdk-lib/aws-events-targets'
import { Role, ServicePrincipal } from 'aws-cdk-lib/aws-iam'
import { Alias, Architecture, IFunction } from 'aws-cdk-lib/aws-lambda'
import { NodejsFunction } from 'aws-cdk-lib/aws-lambda-nodejs'
import { LogGroup, RetentionDays } from 'aws-cdk-lib/aws-logs'
import { Bucket } from 'aws-cdk-lib/aws-s3'
import { Topic } from 'aws-cdk-lib/aws-sns'
import { Fail, LogLevel, Parallel, StateMachine, StateMachineType, TaskInput } from 'aws-cdk-lib/aws-stepfunctions'
import { LambdaInvoke, SnsPublish } from 'aws-cdk-lib/aws-stepfunctions-tasks'
import { Construct } from 'constructs'

export default class AutomationConfiguration extends Construct {
  constructor(scope: Construct, id: string) {
    super(scope, id)

    const userNotificationTopic = Topic.fromTopicArn(
      this,
      'userNotificationTopic',
      Fn.importValue('infrastructure::sns::user-notification-topic::arn'),
    )

    const bucket = new Bucket(this, 'bucket', {
      bucketName: 'automation-configuration',
      removalPolicy: RemovalPolicy.DESTROY,
    })

    const etlLambda = new NodejsFunction(this, 'etl-lambda', {
      entry: 'src/lambda/etl/etl.handler.ts',
      functionName: 'etl',
      timeout: Duration.seconds(30),
      architecture: Architecture.ARM_64,
    })

    bucket.grantRead(
      new Alias(this, id.concat('alias'), {
        aliasName: 'current',
        version: etlLambda.currentVersion,
      }),
    )

    const categoriesLambda = new NodejsFunction(this, 'insert-categories-lambda', {
      entry: 'src/lambda/categories/categories.handler.ts',
      functionName: 'insert-categories',
      timeout: Duration.seconds(30),
      architecture: Architecture.ARM_64,
    })

    const hospitalsLambda = new NodejsFunction(this, 'hospitals-categories-lambda', {
      entry: 'src/lambda/categories/categories.handler.ts',
      functionName: 'insert-categories',
      timeout: Duration.seconds(30),
      architecture: Architecture.ARM_64,
    })

    const [etlTask, categoriesTask, hospitalsTask] = this.mountLambdaInvokes([
      { function: etlLambda, output: '$.Payload' },
      { function: categoriesLambda },
      { function: hospitalsLambda },
    ])

    const errorTopicConfig = {
      topic: userNotificationTopic,
      subject: 'Automation Config Failed 😥',
      message: TaskInput.fromObject({
        message: 'Automation Config Failed due to an unexpected error.',
        cause: 'Unexpected Error',
        channel: 'email',
        destination: { ToAddresses: ['suporte@somemail.com'] },
      }),
    }

    const publishFailed = (publishFailedId: string) =>
      new SnsPublish(this, `automation-config-sns-failed-${publishFailedId}`, errorTopicConfig)

    const jobFailed = new Fail(this, 'automation-config-job-failed', {
      cause: 'Unexpected Error',
    })

    const hospitalsCategoriesParallel = new Parallel(this, 'auto-config-exams-parallel-map')
      .branch(categoriesTask)
      .branch(hospitalsTask)
      .addRetry({ errors: ['States.ALL'], interval: Duration.seconds(5), maxAttempts: 1 })
      .addCatch(publishFailed('exams').next(jobFailed), {
        errors: ['States.ALL'],
      })
    const definition = etlTask.next(hospitalsCategoriesParallel)

    const logGroup = new LogGroup(this, 'automation-configuration-log-group', {
      retention: RetentionDays.ONE_WEEK,
      removalPolicy: RemovalPolicy.DESTROY,
    })

    const stateMachine = new StateMachine(this, `${id}-state-machine`, {
      definition,
      timeout: Duration.minutes(5),
      stateMachineName: 'automation-configuration',
      stateMachineType: StateMachineType.EXPRESS,
      logs: {
        destination: logGroup,
        includeExecutionData: true,
        level: LogLevel.ALL,
      },
    })

    const s3EventRule = new Rule(this, 'automation-config-s3-event-rule', {
      ruleName: 'automation-config-s3-event-rule',
    })

    const eventRole = new Role(this, 'eventRole', {
      assumedBy: new ServicePrincipal('events.amazonaws.com'),
    })

    stateMachine.grantStartExecution(eventRole)
    s3EventRule.addTarget(
      new SfnStateMachine(stateMachine, {
        input: RuleTargetInput.fromObject({
          detail: EventField.fromPath('$.detail'),
        }),
        role: eventRole,
      }),
    )
    s3EventRule.addEventPattern({
      source: ['aws.s3'],
      detailType: ['Object Created'],
      detail: {
        bucket: {
          name: [bucket.bucketName],
        },
        object: {
          key: [
            {
              wildcard: 'csv/automation-configuration/*.csv',
            },
          ],
        },
      },
    })

    const unexpectedFailRule = new Rule(this, 'exam-automation-config-unexpected-fail-rule', {
      ruleName: 'exam-automation-config-unexpected-fail-rule',
    })
    unexpectedFailRule.addTarget(
      new SnsTopic(userNotificationTopic, {
        message: RuleTargetInput.fromObject({
          subject: 'Exam Automation Config Failed 😥',
          message: 'Exam Automation Config Failed due to an unexpected error.',
          cause: 'Unexpected Error',
          channel: 'email',
          destination: { ToAddresses: ['it@somemail.com'] },
        }),
      }),
    )

    unexpectedFailRule.addEventPattern({
      source: ['aws.states'],
      detailType: ['Step Functions Execution Status Change'],
      detail: {
        stateMachineArn: [stateMachine.stateMachineArn],
        status: ['FAILED', 'TIMED_OUT', 'ABORTED'],
      },
    })
  }

  private mountLambdaInvokes(
    lambdasInvoke: Array<{
      function: IFunction
      name?: string
      output?: string
    }>,
  ) {
    return lambdasInvoke.map(lambdaInvoke => {
      return new LambdaInvoke(this, `${lambdaInvoke.name || lambdaInvoke.function.functionName}-task`, {
        lambdaFunction: lambdaInvoke.function,
        inputPath: '$',
        outputPath: lambdaInvoke?.output || '$',
      })
    })
  }
}
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید


[🇧🇷 PT-BR]

پیشگفتار

هر زمان که کاربر نیاز به آپلود اتوماسیون داشت، رهبر فناوری ما مجبور بود کارهای دستی گسترده‌ای را انجام دهد، که پیکربندی همه چیز حدود دو روز طول کشید، با استفاده از یک فایل xls با بیش از 6 تب و دستکاری داده‌ها با برخی از توابع اکسل، با دیدن این مورد. موقعیت من متوجه شدم که می‌توانیم این جریان را خودکار کنیم (در واقع بحث داغی برای رسیدن به این نتیجه بود)، بنابراین معماری زیر پدیدار شد (ما برخی از مشکلات کد اختصاصی داریم بنابراین این معماری یک نمای ساده و با موارد ساختگی برای نشان دادن این راه‌حل است)

راه حل

زمانی که یک شی در S3 ایجاد می‌شد، باید یک تابع Step را راه‌اندازی کنیم، متأسفانه امروز راهی برای فراخوانی مستقیم تابع step نداریم، بنابراین من مجبور شدم یک قانون پل رویداد ایجاد کنم تا به ایجاد یک شی در من گوش دهم. S3، و زمانی که برای فراخوانی تابع گام و مصرف شی مورد نظر توسط یکی از وظایف لامبدا ایجاد شد، از آنجایی که من نیاز داشتم که فرآیند سریع‌تری باشد، از نوع اکسپرس استفاده کردم، با این تصمیم باید برخی از مراحل را موازی کنم ( بله، اتوماسیون من این امکان را فراهم کرد) بنابراین از حالت Parallel استفاده کردم، در نهایت اگر خطایی داشتم، با استفاده از یک موضوع برای این موضوع به شما اطلاع می دهم.

اکسپرس x استاندارد

  • استاندارد: برای فرآیندهای طولانی، با اجرای دقیق یکبار تضمین شده و قابلیت های بازیابی پیشرفته.
  • بیان: برای فرآیندهای سریع و با توان بالا، با حداقل یک بار اجرای تضمینی و صورتحساب بر اساس زمان و حافظه.

دست هایت را کثیف کردن

بیایید این را با CDK و Typescript پیاده سازی کنیم!

بیایید با ایجاد سطل و مهمتر از همه موضوعی برای آگاه کردن ما از خطاهایمان شروع کنیم

const userNotificationTopic = Topic.fromTopicArn(
  this,
  'userNotificationTopic',
  Fn.importValue('infrastructure::sns::user-notification-topic::arn'),
)

const bucket = new Bucket(this, 'bucket', {
  bucketName: 'automation-configuration',
  removalPolicy: RemovalPolicy.DESTROY,
})
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

حالا بیایید لامبداهای خود را ایجاد کنیم و اجازه خواندن را به لامبدا بدهیم که به شی S3 دسترسی دارد:

const etlLambda = new NodejsFunction(this, 'etl-lambda', {
  entry: 'src/lambda/etl/etl.handler.ts',
  functionName: 'etl',
  timeout: Duration.seconds(30),
  architecture: Architecture.ARM_64,
})

bucket.grantReadWrite(
  new Alias(this, id.concat('alias'), {
    aliasName: 'current',
    version: etlLambda.currentVersion,
  }),
)

const categoriesLambda = new NodejsFunction(this, 'insert-categories-lambda', {
  entry: 'src/lambda/categories/categories.handler.ts',
  functionName: 'insert-categories',
  timeout: Duration.seconds(30),
  architecture: Architecture.ARM_64,
})

const hospitalsLambda = new NodejsFunction(this, 'hospitals-categories-lambda', {
  entry: 'src/lambda/categories/categories.handler.ts',
  functionName: 'insert-categories',
  timeout: Duration.seconds(30),
  architecture: Architecture.ARM_64,
})
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

برای اینکه بتوانیم یک لامبدا را در تابع Step خود صدا کنیم، باید برای هر یک از لامبداها یک Lambda Invoke ایجاد کنیم، بنابراین من یک تابع ایجاد کردم تا مجبور نباشیم این کار را برای هر lambda تکرار کنیم:

private mountLambdaInvokes(
    lambdasInvoke: Array<{
      function: IFunction
      name?: string
      output?: string
    }>,
  ) {
    return lambdasInvoke.map(lambdaInvoke => {
      return new LambdaInvoke(this, `${lambdaInvoke.name || lambdaInvoke.function.functionName}-task`, {
        lambdaFunction: lambdaInvoke.function,
        inputPath: '$',
        outputPath: lambdaInvoke?.output || '$',
      })
    })
  }
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

با استفاده از تابع mountLambdaInvokes:

const [etlTask, categoriesTask, hospitalsTask] = this.mountLambdaInvokes([
  { function: etlLambda, output: '$.Payload' },
  { function: categoriesLambda },
  { function: hospitalsLambda },
])
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

ما باید مرحله شکست خود و SnsPublish را ایجاد کنیم تا رویدادهای ناموفق را به موضوعی که قبلا وارد کرده بودیم ارسال کنیم:

const errorTopicConfig = {
  topic: userNotificationTopic,
  subject: 'Automation Config Failed 😥',
  message: TaskInput.fromObject({
    message: 'Automation Config Failed due to an unexpected error.',
    cause: 'Unexpected Error',
    channel: 'email',
    destination: { ToAddresses: ['suporte@somemail.com'] },
  }),
}

const publishFailed = (publishFailedId: string) =>
  new SnsPublish(this, `automation-config-sns-failed-${publishFailedId}`, errorTopicConfig)

const jobFailed = new Fail(this, 'automation-config-job-failed', {
  cause: 'Unexpected Error',
})
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

هنگامی که این کار انجام شد، Parallel خود را راه‌اندازی می‌کنیم، جایی که هر شاخه یک لامبدا است که به طور همزمان پردازش می‌شود، همچنین یک امتحان مجدد اضافه می‌کنیم تا در صورت بروز مشکل دوباره تلاش کنیم و شکست‌های مربوطه را ضبط کنیم. این فرآیند موازی:

const hospitalsCategoriesParallel = new Parallel(this, 'auto-config-exams-parallel-map')
  .branch(categoriesTask)
  .branch(hospitalsTask)
  .addRetry({ errors: ['States.ALL'], interval: Duration.seconds(5), maxAttempts: 1 })
  .addCatch(publishFailed('exams').next(jobFailed), {
    errors: ['States.ALL'],
  })
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

در این مرحله ما تعریف وظایف خود را ایجاد می کنیم، یک گروه گزارش و ماشین وضعیت، که چیزی بیش از تابع Step واقعی ما نیست.

const definition = etlTask.next(hospitalsCategoriesParallel)

const logGroup = new LogGroup(this, 'automation-configuration-log-group', {
  retention: RetentionDays.ONE_WEEK,
  removalPolicy: RemovalPolicy.DESTROY,
})

const stateMachine = new StateMachine(this, `${id}-state-machine`, {
  definition,
  timeout: Duration.minutes(5),
  stateMachineName: 'automation-configuration',
  stateMachineType: StateMachineType.EXPRESS,
  logs: {
    destination: logGroup,
    includeExecutionData: true,
    level: LogLevel.ALL,
  },
})
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

پس از آن باید قوانینی را ایجاد کنیم تا پل رویداد را با S3 و با اجرای تابع گام خود مرتبط کنیم:

const s3EventRule = new Rule(this, 'automation-config-s3-event-rule', {
  ruleName: 'automation-config-s3-event-rule',
})

const eventRole = new Role(this, 'eventRole', {
  assumedBy: new ServicePrincipal('events.amazonaws.com'),
})

stateMachine.grantStartExecution(eventRole)
s3EventRule.addTarget(
  new SfnStateMachine(stateMachine, {
    input: RuleTargetInput.fromObject({
      detail: EventField.fromPath('$.detail'),
    }),
    role: eventRole,
  }),
)
s3EventRule.addEventPattern({
  source: ['aws.s3'],
  detailType: ['Object Created'],
  detail: {
    bucket: {
      name: [bucket.bucketName],
    },
    object: {
      key: [
        {
          wildcard: 'csv/automation-configuration/*.csv',
        },
      ],
    },
  },
})
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

و در نهایت، بیایید یک قانون برای گوش دادن به شکست‌های غیرمنتظره‌ای که در عملکرد گام ما از طریق پل رویداد رخ می‌دهد ایجاد کنیم، بنابراین شخصیت رویداد محوری را که بسیار دوست داریم حفظ کنیم:

 const unexpectedFailRule = new Rule(this, 'exam-automation-config-unexpected-fail-rule', {
      ruleName: 'exam-automation-config-unexpected-fail-rule',
    })
    unexpectedFailRule.addTarget(
      new SnsTopic(userNotificationTopic, {
        message: RuleTargetInput.fromObject({
          subject: 'Exam Automation Config Failed 😥',
          message: 'Exam Automation Config Failed due to an unexpected error.',
          cause: 'Unexpected Error',
          channel: 'email',
          destination: { ToAddresses: ['it@somemail.com'] },
        }),
      }),
    )

    unexpectedFailRule.addEventPattern({
      source: ['aws.states'],
      detailType: ['Step Functions Execution Status Change'],
      detail: {
        stateMachineArn: [stateMachine.stateMachineArn],
        status: ['FAILED', 'TIMED_OUT', 'ABORTED'],
      },
    })
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

قرار دادن همه چیز در یک کلاس تا بتوانید کل جریان یکپارچه را درک کنید:

import { Duration, Fn, RemovalPolicy } from 'aws-cdk-lib'
import { Rule, RuleTargetInput, EventField } from 'aws-cdk-lib/aws-events'
import { SfnStateMachine, SnsTopic } from 'aws-cdk-lib/aws-events-targets'
import { Role, ServicePrincipal } from 'aws-cdk-lib/aws-iam'
import { Alias, Architecture, IFunction } from 'aws-cdk-lib/aws-lambda'
import { NodejsFunction } from 'aws-cdk-lib/aws-lambda-nodejs'
import { LogGroup, RetentionDays } from 'aws-cdk-lib/aws-logs'
import { Bucket } from 'aws-cdk-lib/aws-s3'
import { Topic } from 'aws-cdk-lib/aws-sns'
import { Fail, LogLevel, Parallel, StateMachine, StateMachineType, TaskInput } from 'aws-cdk-lib/aws-stepfunctions'
import { LambdaInvoke, SnsPublish } from 'aws-cdk-lib/aws-stepfunctions-tasks'
import { Construct } from 'constructs'

export default class AutomationConfiguration extends Construct {
  constructor(scope: Construct, id: string) {
    super(scope, id)

    const userNotificationTopic = Topic.fromTopicArn(
      this,
      'userNotificationTopic',
      Fn.importValue('infrastructure::sns::user-notification-topic::arn'),
    )

    const bucket = new Bucket(this, 'bucket', {
      bucketName: 'automation-configuration',
      removalPolicy: RemovalPolicy.DESTROY,
    })

    const etlLambda = new NodejsFunction(this, 'etl-lambda', {
      entry: 'src/lambda/etl/etl.handler.ts',
      functionName: 'etl',
      timeout: Duration.seconds(30),
      architecture: Architecture.ARM_64,
    })

    bucket.grantRead(
      new Alias(this, id.concat('alias'), {
        aliasName: 'current',
        version: etlLambda.currentVersion,
      }),
    )

    const categoriesLambda = new NodejsFunction(this, 'insert-categories-lambda', {
      entry: 'src/lambda/categories/categories.handler.ts',
      functionName: 'insert-categories',
      timeout: Duration.seconds(30),
      architecture: Architecture.ARM_64,
    })

    const hospitalsLambda = new NodejsFunction(this, 'hospitals-categories-lambda', {
      entry: 'src/lambda/categories/categories.handler.ts',
      functionName: 'insert-categories',
      timeout: Duration.seconds(30),
      architecture: Architecture.ARM_64,
    })

    const [etlTask, categoriesTask, hospitalsTask] = this.mountLambdaInvokes([
      { function: etlLambda, output: '$.Payload' },
      { function: categoriesLambda },
      { function: hospitalsLambda },
    ])

    const errorTopicConfig = {
      topic: userNotificationTopic,
      subject: 'Automation Config Failed 😥',
      message: TaskInput.fromObject({
        message: 'Automation Config Failed due to an unexpected error.',
        cause: 'Unexpected Error',
        channel: 'email',
        destination: { ToAddresses: ['suporte@somemail.com'] },
      }),
    }

    const publishFailed = (publishFailedId: string) =>
      new SnsPublish(this, `automation-config-sns-failed-${publishFailedId}`, errorTopicConfig)

    const jobFailed = new Fail(this, 'automation-config-job-failed', {
      cause: 'Unexpected Error',
    })

    const hospitalsCategoriesParallel = new Parallel(this, 'auto-config-exams-parallel-map')
      .branch(categoriesTask)
      .branch(hospitalsTask)
      .addRetry({ errors: ['States.ALL'], interval: Duration.seconds(5), maxAttempts: 1 })
      .addCatch(publishFailed('exams').next(jobFailed), {
        errors: ['States.ALL'],
      })
    const definition = etlTask.next(hospitalsCategoriesParallel)

    const logGroup = new LogGroup(this, 'automation-configuration-log-group', {
      retention: RetentionDays.ONE_WEEK,
      removalPolicy: RemovalPolicy.DESTROY,
    })

    const stateMachine = new StateMachine(this, `${id}-state-machine`, {
      definition,
      timeout: Duration.minutes(5),
      stateMachineName: 'automation-configuration',
      stateMachineType: StateMachineType.EXPRESS,
      logs: {
        destination: logGroup,
        includeExecutionData: true,
        level: LogLevel.ALL,
      },
    })

    const s3EventRule = new Rule(this, 'automation-config-s3-event-rule', {
      ruleName: 'automation-config-s3-event-rule',
    })

    const eventRole = new Role(this, 'eventRole', {
      assumedBy: new ServicePrincipal('events.amazonaws.com'),
    })

    stateMachine.grantStartExecution(eventRole)
    s3EventRule.addTarget(
      new SfnStateMachine(stateMachine, {
        input: RuleTargetInput.fromObject({
          detail: EventField.fromPath('$.detail'),
        }),
        role: eventRole,
      }),
    )
    s3EventRule.addEventPattern({
      source: ['aws.s3'],
      detailType: ['Object Created'],
      detail: {
        bucket: {
          name: [bucket.bucketName],
        },
        object: {
          key: [
            {
              wildcard: 'csv/automation-configuration/*.csv',
            },
          ],
        },
      },
    })

    const unexpectedFailRule = new Rule(this, 'exam-automation-config-unexpected-fail-rule', {
      ruleName: 'exam-automation-config-unexpected-fail-rule',
    })
    unexpectedFailRule.addTarget(
      new SnsTopic(userNotificationTopic, {
        message: RuleTargetInput.fromObject({
          subject: 'Exam Automation Config Failed 😥',
          message: 'Exam Automation Config Failed due to an unexpected error.',
          cause: 'Unexpected Error',
          channel: 'email',
          destination: { ToAddresses: ['it@somemail.com'] },
        }),
      }),
    )

    unexpectedFailRule.addEventPattern({
      source: ['aws.states'],
      detailType: ['Step Functions Execution Status Change'],
      detail: {
        stateMachineArn: [stateMachine.stateMachineArn],
        status: ['FAILED', 'TIMED_OUT', 'ABORTED'],
      },
    })
  }

  private mountLambdaInvokes(
    lambdasInvoke: Array<{
      function: IFunction
      name?: string
      output?: string
    }>,
  ) {
    return lambdasInvoke.map(lambdaInvoke => {
      return new LambdaInvoke(this, `${lambdaInvoke.name || lambdaInvoke.function.functionName}-task`, {
        lambdaFunction: lambdaInvoke.function,
        inputPath: '$',
        outputPath: lambdaInvoke?.output || '$',
      })
    })
  }
}
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

نوشته های مشابه

دیدگاهتان را بنویسید

نشانی ایمیل شما منتشر نخواهد شد. بخش‌های موردنیاز علامت‌گذاری شده‌اند *

دکمه بازگشت به بالا