برنامه نویسی

ساخت برنامه های مشاوره سفر با Cloudera Data Flow (ساخته شده بر روی Apache NiFi)

مشاوره سفر – پردازش RSS – Apache NiFi – Apache Kafka – Apache Flink – SQL

بررسی اجمالی

جریان نهایی

بررسی اجمالی

اضافه کردن پردازنده به طراح

در اینجا بیشتر پردازنده های موجود را لیست می کنم

https://www.datainmotion.dev/2023/04/dataflow-processors.html

پارامترهای جریان

به پارامترها بروید و تمام آنچه برای جریان نیاز دارید را وارد کنید.

بررسی اجمالی

شما می توانید تمام موارد ذکر شده در زیر را اضافه کنید.

بررسی اجمالی

جریان راه رفتن از طریق

اگر جریان از پیش ساخته شده من را بارگیری می کنید، هنگام ورود، جزئیات مربوط به گروه فرآیند را در پالت پیکربندی مشاهده خواهید کرد.

یک پردازنده invokeHTTP اضافه می کنیم و پارامترها را تنظیم می کنیم.

بررسی اجمالی

جزئیات

اکنون می توانیم یک پارامتر برای URL HTTP برای مشاوره سفر اضافه کنیم.

بررسی اجمالی

InvokeHTTP را به QueryRecord متصل کنید. نام اتصال خود را برای نظارت بعداً نامگذاری کنید.

بررسی اجمالی

QueryRecord، XML (RSS) را به JSON تبدیل کنید، به RSSXMLReader و TravelJsonRecordSetWriter نیاز دارید.

بررسی اجمالی

در صورت عدم وجود خطا، QueryRecord را به SplitJson وصل کنید.

بررسی اجمالی

SplitJson عبارت JsonPath را روی آن تنظیم می کنیم

`$.*.*.item

`

بررسی اجمالی

سپس SplitJson را به SplitRecord متصل می کنیم.

بررسی اجمالی

برای SplitRecord، Record Reader را روی JSON_Reader_InferRoot، Record Writer را روی TravelJsonRecordSetWriter و رکوردهای هر تقسیم را روی 1 تنظیم می کنیم.

بررسی اجمالی

SplitRecord به EvaluateJSONPath متصل شد

بررسی اجمالی

بررسی اجمالی

Destination را روی flowfile-attribute، Return Type را روی json قرار می دهیم و چندین فیلد جدید اضافه می کنیم.

  • توضیحات – $.description
  • راهنما – $.guid
  • شناسایی – $.identify
  • پیوند – $.link
  • pubdate – $.pubDate
  • عنوان – $.title

بررسی اجمالی

ما EvaluateJsonPath را به SplitJson متصل می کنیم.

بررسی اجمالی

برای SplitJson عبارت JsonPath را روی $.category قرار می دهیم

بررسی اجمالی

از SplitJson تا UpdateRecord

بررسی اجمالی

بررسی اجمالی

در UpdateRecord، Record Reader را روی JSON_Reader_InferRoot و Record Writer را روی TravelJsonRecordSetWriter قرار دادیم. ما استراتژی ارزش جایگزینی را روی Literal Value تنظیم کردیم.

ما فیلدهای جدیدی را برای فرمت رکورد جدید خود اضافه می کنیم.

  • /advisoryId – ${filename}
  • /description – ${description}
  • /domain – ${identifier:trim()}
  • /guid – ${guid}
  • /link – ${link}
  • /pubdate – ${pubdate}
  • /title – ${title}
  • /ts – ${now():toNumber()}
  • /uuid – ${uuid}

بررسی اجمالی

سپس UpdateRecord را به گروه Slack Sub-Processor خود متصل می کنیم

بررسی اجمالی

شاخه های دیگر از UpdateRecord تا Write به کافکا جریان دارند

بررسی اجمالی

بررسی اجمالی

برای PublishKafka2RecordCDP، پارامترهای زیادی برای تنظیم وجود دارد. به همین دلیل است که توصیه می کنیم با ReadyFlow شروع کنید.

پارامترهای زیادی در اینجا وجود دارد، ما باید بروکرهای کافکا، نام موضوع مقصد، JSON_Reader_InferRoot برای Reader، AvroRecordSetWriterHWX را برای نویسنده تنظیم کنیم،
غیرفعال کردن تراکنش‌ها، تضمین تحویل تکراری، استفاده از محتوا به‌عنوان مقدار رکورد، SASL_SSL/امنیت ساده، نام کاربری برای شناسه کاربری ورود به سیستم یا کاربر دستگاه و سپس رمز عبور مرتبط،
متن SSL به سرویس پیش‌فرض NiFi SSL Context نگاشت می‌شود، uuid را به‌عنوان فیلد کلید پیام تنظیم می‌کند و در نهایت client.id را روی یک شناسه تولیدکننده منحصربه‌فرد Kafka تنظیم می‌کند.

بررسی اجمالی

بررسی اجمالی

سپس در مورد توصیه های سفر خود به Slack نیز پیام می فرستیم.

بررسی اجمالی

ما فقط به یک پردازنده برای ارسال به Slack نیاز داریم.

بررسی اجمالی

ورودی را به پردازنده PutSlack خود وصل می کنیم.

بررسی اجمالی

برای PutSlack باید URL Webhook را روی آدرس مدیر گروه Slack تنظیم کنیم و متن را از ورودی قرار دهیم، کانال خود را روی کانالی که در قلاب وب نگاشت شده است تنظیم کنیم و یک نام کاربری برای ربات خود تنظیم کنیم.

خدمات جریان

خدمات

همه این خدمات باید تنظیم شوند.

@کپی 🀄؛ 2023 تیم اسپن https://datainmotion.dev/

نوشته های مشابه

دیدگاهتان را بنویسید

نشانی ایمیل شما منتشر نخواهد شد. بخش‌های موردنیاز علامت‌گذاری شده‌اند *

همچنین ببینید
بستن
دکمه بازگشت به بالا