اشیاء S3 را با توابع مرحله ای با استفاده از CDK + TS پردازش کنید

پیشگفتار
هر زمان که کاربر نیاز به راهاندازی اتوماسیون داشت، پیشرو فناوری ما باید در یک فرآیند دستی گسترده که حدود دو روز طول میکشید تا همه چیز را پیکربندی کند، فرو میرفت.
این را تصویر کنید: یک فایل اکسل با بیش از شش تب و دستکاری داده ها با استفاده از توابع اکسل (یک کابوس صفحه گسترده 👻).
با دیدن این جنون، متوجه شدم که میتوانیم این فرآیند را خودکار کنیم (به من اعتماد کنید، برای رسیدن به این نتیجه بحث داغی بود). بنابراین، معماری زیر متولد شد.
به یاد داشته باشید، ما تعدادی کد اختصاصی داریم، بنابراین این یک نسخه ساده شده و کمی تخیلی برای نشان دادن راه حل است.
راه حل
ما باید فرآیندی را تنظیم کنیم تا هر بار که یک شی در S3 ایجاد میشود، یک تابع Step را راهاندازی کند. با این حال، فراخوانی مستقیم یک تابع مرحله در حال حاضر پشتیبانی نمی شود. بنابراین، من یک قانون EventBridge برای نظارت بر S3 برای ایجاد شی ایجاد کردم. هنگامی که یک شی ایجاد شد، قانون تابع Step را برای پردازش شی با استفاده از یکی از وظایف Lambda فراخوانی می کند.
من از نوع Express استفاده کردم زیرا نیاز داشتم که این یک روند سریع باشد. با این تصمیم، مجبور شدم برخی از مراحل را موازی کنم (بله، اتوماسیون من این اجازه را می داد)، بنابراین، از حالت موازی استفاده کردم. در نهایت، اگر خطایی رخ داد، یک اعلان با استفاده از یک موضوع برای آن تنظیم کردم.
اکسپرس x استاندارد
استاندارد: برای فرآیندهای طولانی مدت با ضمانت اجرای دقیقاً یک بار و ویژگی های بازیابی پیشرفته.
بیان: برای فرآیندهای سریع و پرتوان با حداقل یک ضمانت اجرا و صورتحساب بر اساس زمان و حافظه.
دست هایت را کثیف کردن
بیایید آن را با استفاده از CDK با Typescript پیاده سازی کنیم!
بیایید با ایجاد سطل خود و وارد کردن یک موضوع برای اطلاع از هرگونه خطا شروع کنیم
const userNotificationTopic = Topic.fromTopicArn(
this,
'userNotificationTopic',
Fn.importValue('infrastructure::sns::user-notification-topic::arn'),
)
const bucket = new Bucket(this, 'bucket', {
bucketName: 'automation-configuration',
removalPolicy: RemovalPolicy.DESTROY,
})
حال، بیایید Lambdas خود را ایجاد کنیم و مجوز خواندن را به Lambda اعطا کنیم که به شی S3 دسترسی خواهد داشت:
const etlLambda = new NodejsFunction(this, 'etl-lambda', {
entry: 'src/lambda/etl/etl.handler.ts',
functionName: 'etl',
timeout: Duration.seconds(30),
architecture: Architecture.ARM_64,
})
bucket.grantReadWrite(
new Alias(this, id.concat('alias'), {
aliasName: 'current',
version: etlLambda.currentVersion,
}),
)
const categoriesLambda = new NodejsFunction(this, 'insert-categories-lambda', {
entry: 'src/lambda/categories/categories.handler.ts',
functionName: 'insert-categories',
timeout: Duration.seconds(30),
architecture: Architecture.ARM_64,
})
const hospitalsLambda = new NodejsFunction(this, 'hospitals-categories-lambda', {
entry: 'src/lambda/categories/categories.handler.ts',
functionName: 'insert-categories',
timeout: Duration.seconds(30),
architecture: Architecture.ARM_64,
})
برای فراخوانی یک Lambda در تابع Step خود، باید یک Lambda Invoke برای هر یک از Lambda ایجاد کنیم. بنابراین، من یک تابع ایجاد کردم تا از تکرار آن برای هر Lambda جلوگیری شود:
private mountLambdaInvokes(
lambdasInvoke: Array<{
function: IFunction
name?: string
output?: string
}>,
) {
return lambdasInvoke.map(lambdaInvoke => {
return new LambdaInvoke(this, `${lambdaInvoke.name || lambdaInvoke.function.functionName}-task`, {
lambdaFunction: lambdaInvoke.function,
inputPath: '$',
outputPath: lambdaInvoke?.output || '$',
})
})
}
استفاده كردن mountLambdaInvokes
:
const [etlTask, categoriesTask, hospitalsTask] = this.mountLambdaInvokes([
{ function: etlLambda, output: '$.Payload' },
{ function: categoriesLambda },
{ function: hospitalsLambda },
])
ما باید مرحله شکست خود و SnsPublish را ایجاد کنیم تا رویدادهای ناموفق را به موضوعی که قبلا وارد کردیم ارسال کنیم:
const errorTopicConfig = {
topic: userNotificationTopic,
subject: 'Automation Config Failed 😥',
message: TaskInput.fromObject({
message: 'Automation Config Failed due to an unexpected error.',
cause: 'Unexpected Error',
channel: 'email',
destination: { ToAddresses: ['suporte@somemail.com'] },
}),
}
const publishFailed = (publishFailedId: string) =>
new SnsPublish(this, `automation-config-sns-failed-${publishFailedId}`, errorTopicConfig)
const jobFailed = new Fail(this, 'automation-config-job-failed', {
cause: 'Unexpected Error',
})
پس از انجام این کار، بیایید حالت موازی خود را راه اندازی کنیم. هر شاخه یک لامبدا خواهد بود که به طور همزمان پردازش می شود. ما همچنین یک تلاش مجدد برای تلاش مجدد در صورت بروز هر گونه مشکل و مشکلی برای رسیدگی به هر گونه شکست در این فرآیند موازی اضافه خواهیم کرد:
const hospitalsCategoriesParallel = new Parallel(this, 'auto-config-exams-parallel-map')
.branch(categoriesTask)
.branch(hospitalsTask)
.addRetry({ errors: ['States.ALL'], interval: Duration.seconds(5), maxAttempts: 1 })
.addCatch(publishFailed('exams').next(jobFailed), {
errors: ['States.ALL'],
})
در این مرحله، ما تعریف وظایف خود، یک گروه گزارش و ماشین وضعیت را ایجاد می کنیم که اساساً تابع Step ما است:
const definition = etlTask.next(hospitalsCategoriesParallel)
const logGroup = new LogGroup(this, 'automation-configuration-log-group', {
retention: RetentionDays.ONE_WEEK,
removalPolicy: RemovalPolicy.DESTROY,
})
const stateMachine = new StateMachine(this, `${id}-state-machine`, {
definition,
timeout: Duration.minutes(5),
stateMachineName: 'automation-configuration',
stateMachineType: StateMachineType.EXPRESS,
logs: {
destination: logGroup,
includeExecutionData: true,
level: LogLevel.ALL,
},
})
پس از این، باید قوانینی را برای مرتبط کردن EventBridge با S3 و اجرای تابع Step خود ایجاد کنیم:
const s3EventRule = new Rule(this, 'automation-config-s3-event-rule', {
ruleName: 'automation-config-s3-event-rule',
})
const eventRole = new Role(this, 'eventRole', {
assumedBy: new ServicePrincipal('events.amazonaws.com'),
})
stateMachine.grantStartExecution(eventRole)
s3EventRule.addTarget(
new SfnStateMachine(stateMachine, {
input: RuleTargetInput.fromObject({
detail: EventField.fromPath('$.detail'),
}),
role: eventRole,
}),
)
s3EventRule.addEventPattern({
source: ['aws.s3'],
detailType: ['Object Created'],
detail: {
bucket: {
name: [bucket.bucketName],
},
object: {
key: [
{
wildcard: 'csv/automation-configuration/*.csv',
},
],
},
},
})
در نهایت، بیایید یک قانون برای گوش دادن به هر گونه شکست غیرمنتظره ای که در عملکرد Step ما از طریق EventBridge نیز رخ می دهد ایجاد کنیم، بنابراین ماهیت رویداد محوری را که بسیار دوست داریم حفظ کنیم:
const unexpectedFailRule = new Rule(this, 'exam-automation-config-unexpected-fail-rule', {
ruleName: 'exam-automation-config-unexpected-fail-rule',
})
unexpectedFailRule.addTarget(
new SnsTopic(userNotificationTopic, {
message: RuleTargetInput.fromObject({
subject: 'Exam Automation Config Failed 😥',
message: 'Exam Automation Config Failed due to an unexpected error.',
cause: 'Unexpected Error',
channel: 'email',
destination: { ToAddresses: ['it@somemail.com'] },
}),
}),
)
unexpectedFailRule.addEventPattern({
source: ['aws.states'],
detailType: ['Step Functions Execution Status Change'],
detail: {
stateMachineArn: [stateMachine.stateMachineArn],
status: ['FAILED', 'TIMED_OUT', 'ABORTED'],
},
})
قرار دادن همه چیز در یک کلاس تا بتوانید کل جریان یکپارچه را درک کنید:
import { Duration, Fn, RemovalPolicy } from 'aws-cdk-lib'
import { Rule, RuleTargetInput, EventField } from 'aws-cdk-lib/aws-events'
import { SfnStateMachine, SnsTopic } from 'aws-cdk-lib/aws-events-targets'
import { Role, ServicePrincipal } from 'aws-cdk-lib/aws-iam'
import { Alias, Architecture, IFunction } from 'aws-cdk-lib/aws-lambda'
import { NodejsFunction } from 'aws-cdk-lib/aws-lambda-nodejs'
import { LogGroup, RetentionDays } from 'aws-cdk-lib/aws-logs'
import { Bucket } from 'aws-cdk-lib/aws-s3'
import { Topic } from 'aws-cdk-lib/aws-sns'
import { Fail, LogLevel, Parallel, StateMachine, StateMachineType, TaskInput } from 'aws-cdk-lib/aws-stepfunctions'
import { LambdaInvoke, SnsPublish } from 'aws-cdk-lib/aws-stepfunctions-tasks'
import { Construct } from 'constructs'
export default class AutomationConfiguration extends Construct {
constructor(scope: Construct, id: string) {
super(scope, id)
const userNotificationTopic = Topic.fromTopicArn(
this,
'userNotificationTopic',
Fn.importValue('infrastructure::sns::user-notification-topic::arn'),
)
const bucket = new Bucket(this, 'bucket', {
bucketName: 'automation-configuration',
removalPolicy: RemovalPolicy.DESTROY,
})
const etlLambda = new NodejsFunction(this, 'etl-lambda', {
entry: 'src/lambda/etl/etl.handler.ts',
functionName: 'etl',
timeout: Duration.seconds(30),
architecture: Architecture.ARM_64,
})
bucket.grantRead(
new Alias(this, id.concat('alias'), {
aliasName: 'current',
version: etlLambda.currentVersion,
}),
)
const categoriesLambda = new NodejsFunction(this, 'insert-categories-lambda', {
entry: 'src/lambda/categories/categories.handler.ts',
functionName: 'insert-categories',
timeout: Duration.seconds(30),
architecture: Architecture.ARM_64,
})
const hospitalsLambda = new NodejsFunction(this, 'hospitals-categories-lambda', {
entry: 'src/lambda/categories/categories.handler.ts',
functionName: 'insert-categories',
timeout: Duration.seconds(30),
architecture: Architecture.ARM_64,
})
const [etlTask, categoriesTask, hospitalsTask] = this.mountLambdaInvokes([
{ function: etlLambda, output: '$.Payload' },
{ function: categoriesLambda },
{ function: hospitalsLambda },
])
const errorTopicConfig = {
topic: userNotificationTopic,
subject: 'Automation Config Failed 😥',
message: TaskInput.fromObject({
message: 'Automation Config Failed due to an unexpected error.',
cause: 'Unexpected Error',
channel: 'email',
destination: { ToAddresses: ['suporte@somemail.com'] },
}),
}
const publishFailed = (publishFailedId: string) =>
new SnsPublish(this, `automation-config-sns-failed-${publishFailedId}`, errorTopicConfig)
const jobFailed = new Fail(this, 'automation-config-job-failed', {
cause: 'Unexpected Error',
})
const hospitalsCategoriesParallel = new Parallel(this, 'auto-config-exams-parallel-map')
.branch(categoriesTask)
.branch(hospitalsTask)
.addRetry({ errors: ['States.ALL'], interval: Duration.seconds(5), maxAttempts: 1 })
.addCatch(publishFailed('exams').next(jobFailed), {
errors: ['States.ALL'],
})
const definition = etlTask.next(hospitalsCategoriesParallel)
const logGroup = new LogGroup(this, 'automation-configuration-log-group', {
retention: RetentionDays.ONE_WEEK,
removalPolicy: RemovalPolicy.DESTROY,
})
const stateMachine = new StateMachine(this, `${id}-state-machine`, {
definition,
timeout: Duration.minutes(5),
stateMachineName: 'automation-configuration',
stateMachineType: StateMachineType.EXPRESS,
logs: {
destination: logGroup,
includeExecutionData: true,
level: LogLevel.ALL,
},
})
const s3EventRule = new Rule(this, 'automation-config-s3-event-rule', {
ruleName: 'automation-config-s3-event-rule',
})
const eventRole = new Role(this, 'eventRole', {
assumedBy: new ServicePrincipal('events.amazonaws.com'),
})
stateMachine.grantStartExecution(eventRole)
s3EventRule.addTarget(
new SfnStateMachine(stateMachine, {
input: RuleTargetInput.fromObject({
detail: EventField.fromPath('$.detail'),
}),
role: eventRole,
}),
)
s3EventRule.addEventPattern({
source: ['aws.s3'],
detailType: ['Object Created'],
detail: {
bucket: {
name: [bucket.bucketName],
},
object: {
key: [
{
wildcard: 'csv/automation-configuration/*.csv',
},
],
},
},
})
const unexpectedFailRule = new Rule(this, 'exam-automation-config-unexpected-fail-rule', {
ruleName: 'exam-automation-config-unexpected-fail-rule',
})
unexpectedFailRule.addTarget(
new SnsTopic(userNotificationTopic, {
message: RuleTargetInput.fromObject({
subject: 'Exam Automation Config Failed 😥',
message: 'Exam Automation Config Failed due to an unexpected error.',
cause: 'Unexpected Error',
channel: 'email',
destination: { ToAddresses: ['it@somemail.com'] },
}),
}),
)
unexpectedFailRule.addEventPattern({
source: ['aws.states'],
detailType: ['Step Functions Execution Status Change'],
detail: {
stateMachineArn: [stateMachine.stateMachineArn],
status: ['FAILED', 'TIMED_OUT', 'ABORTED'],
},
})
}
private mountLambdaInvokes(
lambdasInvoke: Array<{
function: IFunction
name?: string
output?: string
}>,
) {
return lambdasInvoke.map(lambdaInvoke => {
return new LambdaInvoke(this, `${lambdaInvoke.name || lambdaInvoke.function.functionName}-task`, {
lambdaFunction: lambdaInvoke.function,
inputPath: '$',
outputPath: lambdaInvoke?.output || '$',
})
})
}
}
[🇧🇷 PT-BR]
پیشگفتار
هر زمان که کاربر نیاز به آپلود اتوماسیون داشت، رهبر فناوری ما مجبور بود کارهای دستی گستردهای را انجام دهد، که پیکربندی همه چیز حدود دو روز طول کشید، با استفاده از یک فایل xls با بیش از 6 تب و دستکاری دادهها با برخی از توابع اکسل، با دیدن این مورد. موقعیت من متوجه شدم که میتوانیم این جریان را خودکار کنیم (در واقع بحث داغی برای رسیدن به این نتیجه بود)، بنابراین معماری زیر پدیدار شد (ما برخی از مشکلات کد اختصاصی داریم بنابراین این معماری یک نمای ساده و با موارد ساختگی برای نشان دادن این راهحل است)
راه حل
زمانی که یک شی در S3 ایجاد میشد، باید یک تابع Step را راهاندازی کنیم، متأسفانه امروز راهی برای فراخوانی مستقیم تابع step نداریم، بنابراین من مجبور شدم یک قانون پل رویداد ایجاد کنم تا به ایجاد یک شی در من گوش دهم. S3، و زمانی که برای فراخوانی تابع گام و مصرف شی مورد نظر توسط یکی از وظایف لامبدا ایجاد شد، از آنجایی که من نیاز داشتم که فرآیند سریعتری باشد، از نوع اکسپرس استفاده کردم، با این تصمیم باید برخی از مراحل را موازی کنم ( بله، اتوماسیون من این امکان را فراهم کرد) بنابراین از حالت Parallel استفاده کردم، در نهایت اگر خطایی داشتم، با استفاده از یک موضوع برای این موضوع به شما اطلاع می دهم.
اکسپرس x استاندارد
- استاندارد: برای فرآیندهای طولانی، با اجرای دقیق یکبار تضمین شده و قابلیت های بازیابی پیشرفته.
- بیان: برای فرآیندهای سریع و با توان بالا، با حداقل یک بار اجرای تضمینی و صورتحساب بر اساس زمان و حافظه.
دست هایت را کثیف کردن
بیایید این را با CDK و Typescript پیاده سازی کنیم!
بیایید با ایجاد سطل و مهمتر از همه موضوعی برای آگاه کردن ما از خطاهایمان شروع کنیم
const userNotificationTopic = Topic.fromTopicArn(
this,
'userNotificationTopic',
Fn.importValue('infrastructure::sns::user-notification-topic::arn'),
)
const bucket = new Bucket(this, 'bucket', {
bucketName: 'automation-configuration',
removalPolicy: RemovalPolicy.DESTROY,
})
حالا بیایید لامبداهای خود را ایجاد کنیم و اجازه خواندن را به لامبدا بدهیم که به شی S3 دسترسی دارد:
const etlLambda = new NodejsFunction(this, 'etl-lambda', {
entry: 'src/lambda/etl/etl.handler.ts',
functionName: 'etl',
timeout: Duration.seconds(30),
architecture: Architecture.ARM_64,
})
bucket.grantReadWrite(
new Alias(this, id.concat('alias'), {
aliasName: 'current',
version: etlLambda.currentVersion,
}),
)
const categoriesLambda = new NodejsFunction(this, 'insert-categories-lambda', {
entry: 'src/lambda/categories/categories.handler.ts',
functionName: 'insert-categories',
timeout: Duration.seconds(30),
architecture: Architecture.ARM_64,
})
const hospitalsLambda = new NodejsFunction(this, 'hospitals-categories-lambda', {
entry: 'src/lambda/categories/categories.handler.ts',
functionName: 'insert-categories',
timeout: Duration.seconds(30),
architecture: Architecture.ARM_64,
})
برای اینکه بتوانیم یک لامبدا را در تابع Step خود صدا کنیم، باید برای هر یک از لامبداها یک Lambda Invoke ایجاد کنیم، بنابراین من یک تابع ایجاد کردم تا مجبور نباشیم این کار را برای هر lambda تکرار کنیم:
private mountLambdaInvokes(
lambdasInvoke: Array<{
function: IFunction
name?: string
output?: string
}>,
) {
return lambdasInvoke.map(lambdaInvoke => {
return new LambdaInvoke(this, `${lambdaInvoke.name || lambdaInvoke.function.functionName}-task`, {
lambdaFunction: lambdaInvoke.function,
inputPath: '$',
outputPath: lambdaInvoke?.output || '$',
})
})
}
با استفاده از تابع mountLambdaInvokes
:
const [etlTask, categoriesTask, hospitalsTask] = this.mountLambdaInvokes([
{ function: etlLambda, output: '$.Payload' },
{ function: categoriesLambda },
{ function: hospitalsLambda },
])
ما باید مرحله شکست خود و SnsPublish را ایجاد کنیم تا رویدادهای ناموفق را به موضوعی که قبلا وارد کرده بودیم ارسال کنیم:
const errorTopicConfig = {
topic: userNotificationTopic,
subject: 'Automation Config Failed 😥',
message: TaskInput.fromObject({
message: 'Automation Config Failed due to an unexpected error.',
cause: 'Unexpected Error',
channel: 'email',
destination: { ToAddresses: ['suporte@somemail.com'] },
}),
}
const publishFailed = (publishFailedId: string) =>
new SnsPublish(this, `automation-config-sns-failed-${publishFailedId}`, errorTopicConfig)
const jobFailed = new Fail(this, 'automation-config-job-failed', {
cause: 'Unexpected Error',
})
هنگامی که این کار انجام شد، Parallel خود را راهاندازی میکنیم، جایی که هر شاخه یک لامبدا است که به طور همزمان پردازش میشود، همچنین یک امتحان مجدد اضافه میکنیم تا در صورت بروز مشکل دوباره تلاش کنیم و شکستهای مربوطه را ضبط کنیم. این فرآیند موازی:
const hospitalsCategoriesParallel = new Parallel(this, 'auto-config-exams-parallel-map')
.branch(categoriesTask)
.branch(hospitalsTask)
.addRetry({ errors: ['States.ALL'], interval: Duration.seconds(5), maxAttempts: 1 })
.addCatch(publishFailed('exams').next(jobFailed), {
errors: ['States.ALL'],
})
در این مرحله ما تعریف وظایف خود را ایجاد می کنیم، یک گروه گزارش و ماشین وضعیت، که چیزی بیش از تابع Step واقعی ما نیست.
const definition = etlTask.next(hospitalsCategoriesParallel)
const logGroup = new LogGroup(this, 'automation-configuration-log-group', {
retention: RetentionDays.ONE_WEEK,
removalPolicy: RemovalPolicy.DESTROY,
})
const stateMachine = new StateMachine(this, `${id}-state-machine`, {
definition,
timeout: Duration.minutes(5),
stateMachineName: 'automation-configuration',
stateMachineType: StateMachineType.EXPRESS,
logs: {
destination: logGroup,
includeExecutionData: true,
level: LogLevel.ALL,
},
})
پس از آن باید قوانینی را ایجاد کنیم تا پل رویداد را با S3 و با اجرای تابع گام خود مرتبط کنیم:
const s3EventRule = new Rule(this, 'automation-config-s3-event-rule', {
ruleName: 'automation-config-s3-event-rule',
})
const eventRole = new Role(this, 'eventRole', {
assumedBy: new ServicePrincipal('events.amazonaws.com'),
})
stateMachine.grantStartExecution(eventRole)
s3EventRule.addTarget(
new SfnStateMachine(stateMachine, {
input: RuleTargetInput.fromObject({
detail: EventField.fromPath('$.detail'),
}),
role: eventRole,
}),
)
s3EventRule.addEventPattern({
source: ['aws.s3'],
detailType: ['Object Created'],
detail: {
bucket: {
name: [bucket.bucketName],
},
object: {
key: [
{
wildcard: 'csv/automation-configuration/*.csv',
},
],
},
},
})
و در نهایت، بیایید یک قانون برای گوش دادن به شکستهای غیرمنتظرهای که در عملکرد گام ما از طریق پل رویداد رخ میدهد ایجاد کنیم، بنابراین شخصیت رویداد محوری را که بسیار دوست داریم حفظ کنیم:
const unexpectedFailRule = new Rule(this, 'exam-automation-config-unexpected-fail-rule', {
ruleName: 'exam-automation-config-unexpected-fail-rule',
})
unexpectedFailRule.addTarget(
new SnsTopic(userNotificationTopic, {
message: RuleTargetInput.fromObject({
subject: 'Exam Automation Config Failed 😥',
message: 'Exam Automation Config Failed due to an unexpected error.',
cause: 'Unexpected Error',
channel: 'email',
destination: { ToAddresses: ['it@somemail.com'] },
}),
}),
)
unexpectedFailRule.addEventPattern({
source: ['aws.states'],
detailType: ['Step Functions Execution Status Change'],
detail: {
stateMachineArn: [stateMachine.stateMachineArn],
status: ['FAILED', 'TIMED_OUT', 'ABORTED'],
},
})
قرار دادن همه چیز در یک کلاس تا بتوانید کل جریان یکپارچه را درک کنید:
import { Duration, Fn, RemovalPolicy } from 'aws-cdk-lib'
import { Rule, RuleTargetInput, EventField } from 'aws-cdk-lib/aws-events'
import { SfnStateMachine, SnsTopic } from 'aws-cdk-lib/aws-events-targets'
import { Role, ServicePrincipal } from 'aws-cdk-lib/aws-iam'
import { Alias, Architecture, IFunction } from 'aws-cdk-lib/aws-lambda'
import { NodejsFunction } from 'aws-cdk-lib/aws-lambda-nodejs'
import { LogGroup, RetentionDays } from 'aws-cdk-lib/aws-logs'
import { Bucket } from 'aws-cdk-lib/aws-s3'
import { Topic } from 'aws-cdk-lib/aws-sns'
import { Fail, LogLevel, Parallel, StateMachine, StateMachineType, TaskInput } from 'aws-cdk-lib/aws-stepfunctions'
import { LambdaInvoke, SnsPublish } from 'aws-cdk-lib/aws-stepfunctions-tasks'
import { Construct } from 'constructs'
export default class AutomationConfiguration extends Construct {
constructor(scope: Construct, id: string) {
super(scope, id)
const userNotificationTopic = Topic.fromTopicArn(
this,
'userNotificationTopic',
Fn.importValue('infrastructure::sns::user-notification-topic::arn'),
)
const bucket = new Bucket(this, 'bucket', {
bucketName: 'automation-configuration',
removalPolicy: RemovalPolicy.DESTROY,
})
const etlLambda = new NodejsFunction(this, 'etl-lambda', {
entry: 'src/lambda/etl/etl.handler.ts',
functionName: 'etl',
timeout: Duration.seconds(30),
architecture: Architecture.ARM_64,
})
bucket.grantRead(
new Alias(this, id.concat('alias'), {
aliasName: 'current',
version: etlLambda.currentVersion,
}),
)
const categoriesLambda = new NodejsFunction(this, 'insert-categories-lambda', {
entry: 'src/lambda/categories/categories.handler.ts',
functionName: 'insert-categories',
timeout: Duration.seconds(30),
architecture: Architecture.ARM_64,
})
const hospitalsLambda = new NodejsFunction(this, 'hospitals-categories-lambda', {
entry: 'src/lambda/categories/categories.handler.ts',
functionName: 'insert-categories',
timeout: Duration.seconds(30),
architecture: Architecture.ARM_64,
})
const [etlTask, categoriesTask, hospitalsTask] = this.mountLambdaInvokes([
{ function: etlLambda, output: '$.Payload' },
{ function: categoriesLambda },
{ function: hospitalsLambda },
])
const errorTopicConfig = {
topic: userNotificationTopic,
subject: 'Automation Config Failed 😥',
message: TaskInput.fromObject({
message: 'Automation Config Failed due to an unexpected error.',
cause: 'Unexpected Error',
channel: 'email',
destination: { ToAddresses: ['suporte@somemail.com'] },
}),
}
const publishFailed = (publishFailedId: string) =>
new SnsPublish(this, `automation-config-sns-failed-${publishFailedId}`, errorTopicConfig)
const jobFailed = new Fail(this, 'automation-config-job-failed', {
cause: 'Unexpected Error',
})
const hospitalsCategoriesParallel = new Parallel(this, 'auto-config-exams-parallel-map')
.branch(categoriesTask)
.branch(hospitalsTask)
.addRetry({ errors: ['States.ALL'], interval: Duration.seconds(5), maxAttempts: 1 })
.addCatch(publishFailed('exams').next(jobFailed), {
errors: ['States.ALL'],
})
const definition = etlTask.next(hospitalsCategoriesParallel)
const logGroup = new LogGroup(this, 'automation-configuration-log-group', {
retention: RetentionDays.ONE_WEEK,
removalPolicy: RemovalPolicy.DESTROY,
})
const stateMachine = new StateMachine(this, `${id}-state-machine`, {
definition,
timeout: Duration.minutes(5),
stateMachineName: 'automation-configuration',
stateMachineType: StateMachineType.EXPRESS,
logs: {
destination: logGroup,
includeExecutionData: true,
level: LogLevel.ALL,
},
})
const s3EventRule = new Rule(this, 'automation-config-s3-event-rule', {
ruleName: 'automation-config-s3-event-rule',
})
const eventRole = new Role(this, 'eventRole', {
assumedBy: new ServicePrincipal('events.amazonaws.com'),
})
stateMachine.grantStartExecution(eventRole)
s3EventRule.addTarget(
new SfnStateMachine(stateMachine, {
input: RuleTargetInput.fromObject({
detail: EventField.fromPath('$.detail'),
}),
role: eventRole,
}),
)
s3EventRule.addEventPattern({
source: ['aws.s3'],
detailType: ['Object Created'],
detail: {
bucket: {
name: [bucket.bucketName],
},
object: {
key: [
{
wildcard: 'csv/automation-configuration/*.csv',
},
],
},
},
})
const unexpectedFailRule = new Rule(this, 'exam-automation-config-unexpected-fail-rule', {
ruleName: 'exam-automation-config-unexpected-fail-rule',
})
unexpectedFailRule.addTarget(
new SnsTopic(userNotificationTopic, {
message: RuleTargetInput.fromObject({
subject: 'Exam Automation Config Failed 😥',
message: 'Exam Automation Config Failed due to an unexpected error.',
cause: 'Unexpected Error',
channel: 'email',
destination: { ToAddresses: ['it@somemail.com'] },
}),
}),
)
unexpectedFailRule.addEventPattern({
source: ['aws.states'],
detailType: ['Step Functions Execution Status Change'],
detail: {
stateMachineArn: [stateMachine.stateMachineArn],
status: ['FAILED', 'TIMED_OUT', 'ABORTED'],
},
})
}
private mountLambdaInvokes(
lambdasInvoke: Array<{
function: IFunction
name?: string
output?: string
}>,
) {
return lambdasInvoke.map(lambdaInvoke => {
return new LambdaInvoke(this, `${lambdaInvoke.name || lambdaInvoke.function.functionName}-task`, {
lambdaFunction: lambdaInvoke.function,
inputPath: '$',
outputPath: lambdaInvoke?.output || '$',
})
})
}
}