با استفاده از سیاهههای برنامه کاربردی پایگاه دانش آمازون برای اعلان ها

مقدمه
در پست وبلاگ قبلی ، یک راه حل مصرف داده برای پایگاه های دانش آمازون بستر آمازون ، ما یک راه حل مصرف داده را تهیه کردیم که شامل اعلان های تکمیل شغل با مکانیسم کشش وضعیت است که به همان اندازه کارآمد نبود. از آن زمان ، ما پایگاه های دانش را بررسی کردیم که رویدادهای ورود به سیستم را به سیاهههای مربوط به CloudWatch منتشر می کند ، که فرصتی جدید برای طراحی بهتر با مکانیسم فشار وضعیت بر اساس فیلترهای اشتراک ایجاد می کند. در این پست وبلاگ ، نحوه به روزرسانی راه حل اصلی را با طراحی جدید بررسی خواهیم کرد.
نمای کلی طراحی به روز شده
طراحی کلی راه حل به روز شده در نمودار زیر نشان داده شده است:
راه حل به روز شده به شرح زیر است:
-
ورود به سیستم برای پایگاه دانش Bedrock برای ارائه سیاهههای مربوط به سیاهههای CloudWatch تنظیم شده است. یک فیلتر اشتراک در گروه ورود به سیستم برای فیلتر کردن وضعیت تغییر وضعیت شغلی که مطابق با حالت نهایی است و رویدادهای ورود به سیستم را به یک عملکرد لامبدا ارسال می کند ، ایجاد می شود.
-
یک تابع Lambda ، که توسط یک قانون برنامه Eventbridge ایجاد شده است ، به طور دوره ای کار مصرف (AKA SYNC) را برای هر پایه دانش و منبع داده مشخص شروع می کند. توجه داشته باشید که صف SQS از آنجا که دیگر لازم نیست حذف می شود.
-
یکی دیگر از عملکردهای لامبدا به عنوان مقصد فیلتر اشتراک عمل می کند. برای هر پیام رویداد که از رویدادهای ورود به سیستم استخراج می شود ، این عملکرد از اطلاعات شناسه کار برای به دست آوردن جزئیات مربوط به کار مصرف استفاده می کند. بسته به اینکه کار موفقیت آمیز باشد یا ناموفق باشد ، یک اعلان به یکی از دو موضوع SNS ارسال می شود.
به روزرسانی مؤلفه ها
از آنجا که صف SQS مورد نیاز نیست ، تنها تغییر عملکرد لامبدا که کار مصرف را شروع می کند ، پاکسازی جزئی است. کد عملکرد Lambda به روز شده به شرح زیر است:
import boto3
import json
from botocore.exceptions import ClientError
bedrock_agent = boto3.client('bedrock-agent')
ssm = boto3.client('ssm')
def lambda_handler(event, context):
try:
# Retrieve the JSON config from Parameter Store
response = ssm.get_parameter(Name='/start-kb-ingestion-jobs/config-json')
config_json = response['Parameter']['Value']
config = json.loads(config_json)
for record in config:
knowledge_base_id = record.get('knowledge_base_id')
for data_source_id in record.get('data_source_ids'):
# Start the ingestion job
print(f'Starting ingestion job for data source {data_source_id} of knowledge base {knowledge_base_id}')
response = bedrock_agent.start_ingestion_job(
knowledgeBaseId=knowledge_base_id,
dataSourceId=data_source_id
)
return {
'statusCode': 200,
'body': 'Success'
}
except ClientError as e:
return {
'statusCode': 500,
'body': f'Client error: {str(e)}'
}
except Exception as e:
return {
'statusCode': 500,
'body': f'Unexpected error: {str(e)}'
}
در همین حال ، به روزرسانی مؤلفه ای که وضعیت شغلی مصرف را بررسی می کند کمی پیچیده تر است. اول ، ما باید به روز کنیم check-kb-job-statuses
عملکرد لامبدا به عنوان یک هدف فیلتر اشتراک است. همانطور که در صفحه فیلترهای اشتراک گروه Log از راهنمای کاربر CloudWatch Logs توضیح داده شده است ، داده های ورود به سیستم دریافت شده توسط عملکرد فشرده شده ، Base64 رمزگذاری شده و Batched است. من توانستم به راحتی این سؤال StackOverflow را پیدا کنم که دارای کد دقیقی است که در اولین پاسخ به آن نیاز داریم.
در مرحله بعد ، ما باید بدانیم که یک رویداد ورود به سیستم مربوط به چه شکلی است. نمونه هایی از سیاهههای مربوط به دانش در مستندات AWS ، قالب کلی برای یک رویداد شغلی در مصرف را ارائه می دهد ، اما ترجیحاً به دنبال یک رویداد ورود به سیستم واقعی است. در اینجا یکی از مواردی است که یک رویداد تکمیل شغل را برای یک کار موفق ضبط می کند:
{
"event_timestamp": 1740895462316,
"event": {
"ingestion_job_id": "W0V45LVZY6",
"data_source_id": "ATUWOVZJOD",
"ingestion_job_status": "COMPLETE",
"knowledge_base_arn": "arn:aws:bedrock:us-east-1::knowledge-base/R1K1UIZKKQ" ,
"resource_statistics": {
"number_of_resources_updated": 366,
"number_of_resources_ingested": 0,
"number_of_resources_deleted": 0,
"number_of_resources_with_metadata_updated": 0,
"number_of_resources_failed": 15
}
},
"event_version": "1.0",
"event_type": "StartIngestionJob.StatusChanged",
"level": "INFO"
}
عملکرد Lambda باید جزئیات زیر را از رویداد ورود به سیستم استخراج کند:
-
شناسه پایه دانش ، که می تواند از ارزش آن استخراج شود
event.knowledge_base_arn
، به طور خاص بعد از/
بشر -
شناسه منبع داده ، که مقدار آن است
event.data_source_id
میدان -
شناسه کار مصرف ، که ارزش آن است
event.ingestion_job_id
میدان
اگرچه ما قادر به استخراج تمام اطلاعات مورد نیاز برای اعلان هستیم ، اما رویداد ورود به سیستم شامل خرابی های احتمالی محتوای Verbose نیست که ما از پاسخ آن دریافت می کنیم GetIngestionJob
عمل API. اگرچه این رویکرد کمی کارآمدتر است ، اما ما هنوز هم برای کامل بودن با API تماس خواهیم گرفت. عملکرد لامبدا حاصل باید به این شکل باشد:
import base64
import boto3
import gzip
import json
from botocore.exceptions import ClientError
bedrock_agent = boto3.client('bedrock-agent')
ssm = boto3.client('ssm')
sns = boto3.client('sns')
def get_ssm_parameter(name):
response = ssm.get_parameter(Name=name, WithDecryption=True)
return response['Parameter']['Value']
def get_ingestion_job(knowledge_base_id, data_source_id, ingestion_job_id):
response = bedrock_agent.get_ingestion_job(
knowledgeBaseId=knowledge_base_id,
dataSourceId=data_source_id,
ingestionJobId=ingestion_job_id
)
return response['ingestionJob']
def lambda_handler(event, context):
try:
success_sns_topic_arn = get_ssm_parameter('/check-kb-ingestion-job-statuses/success-sns-topic-arn')
failure_sns_topic_arn = get_ssm_parameter('/check-kb-ingestion-job-statuses/failure-sns-topic-arn')
encoded_zipped_data = event['awslogs']['data']
zipped_data = base64.b64decode(encoded_zipped_data)
data = json.loads(gzip.decompress(zipped_data))
log_events = data['logEvents']
for log_event in log_events:
message = json.loads(log_event['message'])
knowledge_base_arn = message['event']['knowledge_base_arn']
knowledge_base_id = knowledge_base_arn.split('/')[-1]
data_source_id = message['event']['data_source_id']
ingestion_job_id = message['event']['ingestion_job_id']
print(
f'Checking ingestion job status for knowledge base {knowledge_base_id} data source {data_source_id} job {ingestion_job_id}')
ingestion_job = get_ingestion_job(knowledge_base_id, data_source_id, ingestion_job_id)
print(
f'Ingestion job summary: \n\n{json.dumps(ingestion_job, indent=2, sort_keys=True, default=str)}')
job_status = ingestion_job['status']
if job_status == 'COMPLETE':
sns.publish(
TopicArn=success_sns_topic_arn,
Subject=f'Ingestion job for knowledge base {knowledge_base_id} data source {data_source_id} job {ingestion_job_id} Completed',
Message=json.dumps(ingestion_job, indent=2, sort_keys=True, default=str)
)
elif job_status == 'FAILED':
sns.publish(
TopicArn=failure_sns_topic_arn,
Subject=f'Ingestion job for knowledge base {knowledge_base_id} data source {data_source_id} job {ingestion_job_id} FAILED',
Message=json.dumps(ingestion_job, indent=2, sort_keys=True, default=str)
)
elif job_status == 'STOPPED':
sns.publish(
TopicArn=failure_sns_topic_arn,
Subject=f'Ingestion job for knowledge base {knowledge_base_id} data source {data_source_id} job {ingestion_job_id} STOPPED',
Message=json.dumps(ingestion_job, indent=2, sort_keys=True, default=str)
)
return {
'statusCode': 200,
'body': 'Success'
}
except ClientError as e:
return {
'statusCode': 500,
'body': f'Client error: {str(e)}'
}
except Exception as e:
return {
'statusCode': 500,
'body': f'Unexpected error: {str(e)}'
}
سرانجام ، ما باید یک فیلتر اشتراک در گروه ورود به سیستم ایجاد کنیم که به عنوان مقصد تحویل ورود به سیستم دانش دانش عمل می کند. از آنجا که ما فقط به رویدادهای ورود به سیستم برای اتمام شغل مصرف علاقه مندیم ، باید یک الگوی فیلتر اشتراک مناسب را تعریف کنیم. دو زمینه وجود دارد که ما برای این هدف به آن نیاز داریم:
-
در
event_type
قسمت با مقدارStartIngestionJob.StatusChanged
بشر -
در
event.ingestion_job_status
زمینه با مقدار مطابق با یکی ازCOMPLETE
باFAILED
باCRAWLING_COMPLETED
، همانطور که در نمونه ورود به سیستم کار مصرف داده توضیح داده شده است.
بر اساس برخی از آزمایشات ، الف CRAWLING_COMPLETED
این رویداد نشانگر تکمیل کامل کار مصرف نیست. بوها COMPLETE
(و احتمالاً FAILED
) رویداد همیشه پس از اتمام کار ارسال می شود. بنابراین ما می توانیم استفاده کنیم COMPLETE
وت FAILED
برای فیلتر علاوه بر این ، متوقف کردن شغل یک رویداد ایجاد نمی کند و هیچ ارزش وضعیتی برای آن وجود ندارد. این به نظر می رسد که از طرف AWS یک اشتباه است ، بنابراین من یک پرونده پشتیبانی AWS را برای آن باز می کنم. در حال حاضر ، ما هنوز هم اضافه خواهیم کرد STOPPED
به خاطر کامل بودن به فیلتر.
با مراجعه به الگوی فیلتر اشتراک برای رویدادهای ورود به سیستم JSON ، ما می توانیم بیان مرکب خود را تعریف کنیم که نوع رویداد و وضعیت کار مصرف را به شرح زیر بررسی می کند:
{$.event_type = "StartIngestionJob.StatusChanged" && ($.event.ingestion_job_status = "COMPLETE" || $.event.ingestion_job_status = "FAILED" || $.event.ingestion_job_status = "STOPPED")}
ما ابتدا می توانیم الگوی را در گفتگوی ایجاد فیلتر اشتراک کنسول مدیریت AWS بدون ایجاد فیلتر آزمایش کنیم. بعداً ، ما آن را با استفاده از Terraform پیاده سازی خواهیم کرد. در اینجا تصویری از آنچه که گفتگو به نظر می رسد آورده شده است:
در این مثال ، فیلتر اشتراک در گروه log همانطور که توسط الگوی نامگذاری استاندارد مشهود است ایجاد می شود. سیاهههای مربوط به دانش به جریان ورود به سیستم نوشته شده است bedrock/knowledgebaselogs
، بنابراین ما باید آن را انتخاب کنیم. با استفاده از الگوی تست دکمه ، می توانیم یک ورودی فیلتر شده را در نتایج آزمون در بین 50 رویداد ورود به سیستم مشاهده کنیم. وقایع ورود به سیستم از یک کار مصرف واحد ایجاد شده است ، و سایر رویدادها یا رویدادهای تغییر منابع یا رویدادهای تغییر وضعیت غیر مرتبط هستند.
به روزرسانی پیکربندی Terraform
تغییرات زیر برای پشتیبانی از طراحی جدید به پیکربندی Terraform راه حل اصلی لازم است:
-
صف SQS ، مجوزهای IAM مرتبط و پارامترهای SSM را حذف کنید.
-
مجوز Lambda را برای
check-kb-intgestion-job-statuses
اجازه می دهد تا از طریق گروه ورود به سیستم ، از طریق گروه ورود به سیستم که پایگاه دانش بستر آن گزارش های برنامه خود را می نویسد ، از سیاهههای مربوط به CloudWatch استفاده کنید.
در آخر ، ما به یک منبع جدید برای فیلتر اشتراک به شرح زیر نیاز داریم:
resource "aws_cloudwatch_log_subscription_filter" "check_kb_ingestion_job_statuses" {
name = "check-kb-ingestion-job-statuses"
log_group_name = var.kb_app_log_group_name
filter_pattern = "{$.event_type = \"StartIngestionJob.StatusChanged\" && ($.event.ingestion_job_status = \"COMPLETE\" || $.event.ingestion_job_status = \"FAILED\" || $.event.ingestion_job_status = \"STOPPED\")}"
destination_arn = aws_lambda_function.check_kb_ingestion_job_statuses.arn
depends_on = [aws_lambda_permission.check_kb_ingestion_job_statuses]
}
توجه داشته باشید که نام گروه ورود به عنوان متغیر ارائه شده است. نام گروه ورود باید از قالب پیش فرض ارائه شده توسط AWS پیروی کند ، یعنی /aws/vendedlogs/bedrock/knowledge-base/APPLICATION_LOGS/
، کجا
شناسه پایه دانش بستر است.
استقرار و آزمایش راه حل
✅ می توانید پیکربندی کامل و کد منبع را در آن پیدا کنید
5_kb_data_ingestion_via_logs
دایرکتوری در این مخزن GitHub.
برای استقرار و تست راه حل ، شما به یک پایگاه دانش با حداقل یک منبع داده نیاز دارید که محتوا برای مصرف آن در یک سطل S3 یا یک وب سایت خزنده باشد. شما می توانید این کار را با استفاده از گزینه های شروع سریع پایگاه داده وکتور در کنسول Bedrock تنظیم کنید. از طرف دیگر ، یک پایگاه دانش نمونه را با استفاده از پیکربندی Terraform از پست وبلاگ من نحوه مدیریت یک پایگاه دانش آمازون بستر آمازون با استفاده از Terraform مستقر کنید. این پیکربندی همچنین در همان مخزن GitHub در زیر موجود است 2_knowledge_base
دایرکتوری
علاوه بر این ، شما همچنین باید پیکربندی ورود به سیستم دانش دانش را برای ارائه سیاهههای مربوط به برنامه های CloudWatch تغییر دهید. شما می توانید آن را به صورت دستی دنبال کنید و به دنبال مستندات AWS یا با استفاده از پیکربندی Terraform از پست وبلاگ قبلی من ، امکان ورود به پایگاه های دانش آمازون Bedrock را با استفاده از Terraform فراهم کنید. این پیکربندی همچنین در همان مخزن GitHub در زیر موجود است 4_kb_logging
دایرکتوری
با پیش نیازهای موجود ، راه حل را به شرح زیر مستقر کنید:
-
از ریشه مخزن GitHub کلون شده ، به سمت آن حرکت کنید
5_kb_data_ingestion_via_logs
بشر -
کپی کردن
terraform.tfvars.example
به عنوانterraform.tfvars
و متغیرها را به روز کنید تا با پیکربندی خود مطابقت داشته باشید.
* By default, the `start-kb-ingestion-jobs` Lambda function runs daily at 0:00 UTC.
-
اعتبارنامه های AWS خود را پیکربندی کنید.
-
دویدن
terraform init
و Terraformapply -var-file terraform.tfvars
بشر
پس از استقرار ، با افزودن اشتراک ایمیل به مباحث SNS ، راه حل را آزمایش کنید check-kb-ingestion-job-statuses-success
وت check-kb-ingestion-job-statuses-failure
برای آدرس ایمیل خود به گونه ای که می توانید اعلان های ایمیل را دریافت کنید. اشتراک های خود را با استفاده از پیوند در ایمیل های تأیید تأیید کنید.
بعد ، به صورت دستی فراخوانی کنید start-kb-ingestion-jobs
عملکرد لامبدا در کنسول لامبدا.
با پایان کار و اتمام کار ، سیاهههای مربوط به سیاهههای مربوط به CloudWatch نوشته شده و از طریق فیلتر اشتراک عبور می کنند. رویدادهای تغییر وضعیت باید فیلتر شده و برای اطلاع رسانی به عملکرد Lambda ارسال شود و در نهایت منجر به ایمیلهایی شود که دریافت خواهید کرد. در اینجا یک مثال آورده شده است:
پس از تأیید راه حل ، اشتراک SNS را برداشته و آن را با مواردی که متناسب با نیازهای شما است جایگزین کنید. اگر قصد ندارید دانش را حفظ کنید ، آن را به همراه فروشگاه بردار (به عنوان مثال ، شاخص OSS) حذف کنید تا از هزینه های غیر ضروری جلوگیری کنید.
خلاصه
در این پست وبلاگ ، ما راه حل اصلی دانش پایه Bedrock Base Data را با اطلاع رسانی مبتنی بر فشار با استفاده از ویژگی های CloudWatch بهبود بخشیدیم. این احتمالاً از یک مکانیسم مبتنی بر کشش برنامه ریزی شده کارآمدتر است و به ما امکان می دهد تا از یک فیلتر اشتراک Lambda استفاده کنیم.
گفته می شود ، راه حل ایده آل یک قانون Eventbridge برای واکنش به رویدادهای شغلی در مصرف بومی از Bedrock است. سرویس Bedrock متأسفانه امروز چنین رویدادهایی را منتشر نمی کند ، اما من از طریق یک مورد پشتیبانی AWS درخواست ویژگی کردم. امیدوارم این به زودی پشتیبانی شود و ما بتوانیم راه حل مصرف داده های خود را بیشتر تکامل دهیم.
امیدوارم این پست وبلاگ را مفید و جذاب پیدا کنید. لطفاً برای بررسی سایر پست های وبلاگ من در وبلاگ Avangards احساس راحتی کنید. مراقبت و یادگیری شاد!