ساخت برنامه های مشاوره سفر با Cloudera Data Flow (ساخته شده بر روی Apache NiFi)
مشاوره سفر – پردازش RSS – Apache NiFi – Apache Kafka – Apache Flink – SQL
بررسی اجمالی
جریان نهایی
اضافه کردن پردازنده به طراح
در اینجا بیشتر پردازنده های موجود را لیست می کنم
https://www.datainmotion.dev/2023/04/dataflow-processors.html
پارامترهای جریان
به پارامترها بروید و تمام آنچه برای جریان نیاز دارید را وارد کنید.
شما می توانید تمام موارد ذکر شده در زیر را اضافه کنید.
جریان راه رفتن از طریق
اگر جریان از پیش ساخته شده من را بارگیری می کنید، هنگام ورود، جزئیات مربوط به گروه فرآیند را در پالت پیکربندی مشاهده خواهید کرد.
یک پردازنده invokeHTTP اضافه می کنیم و پارامترها را تنظیم می کنیم.
اکنون می توانیم یک پارامتر برای URL HTTP برای مشاوره سفر اضافه کنیم.
InvokeHTTP را به QueryRecord متصل کنید. نام اتصال خود را برای نظارت بعداً نامگذاری کنید.
QueryRecord، XML (RSS) را به JSON تبدیل کنید، به RSSXMLReader و TravelJsonRecordSetWriter نیاز دارید.
در صورت عدم وجود خطا، QueryRecord را به SplitJson وصل کنید.
SplitJson عبارت JsonPath را روی آن تنظیم می کنیم
`$.*.*.item
`
سپس SplitJson را به SplitRecord متصل می کنیم.
برای SplitRecord، Record Reader را روی JSON_Reader_InferRoot، Record Writer را روی TravelJsonRecordSetWriter و رکوردهای هر تقسیم را روی 1 تنظیم می کنیم.
SplitRecord به EvaluateJSONPath متصل شد
Destination را روی flowfile-attribute، Return Type را روی json قرار می دهیم و چندین فیلد جدید اضافه می کنیم.
- توضیحات – $.description
- راهنما – $.guid
- شناسایی – $.identify
- پیوند – $.link
- pubdate – $.pubDate
- عنوان – $.title
ما EvaluateJsonPath را به SplitJson متصل می کنیم.
برای SplitJson عبارت JsonPath را روی $.category قرار می دهیم
از SplitJson تا UpdateRecord
در UpdateRecord، Record Reader را روی JSON_Reader_InferRoot و Record Writer را روی TravelJsonRecordSetWriter قرار دادیم. ما استراتژی ارزش جایگزینی را روی Literal Value تنظیم کردیم.
ما فیلدهای جدیدی را برای فرمت رکورد جدید خود اضافه می کنیم.
- /advisoryId – ${filename}
- /description – ${description}
- /domain – ${identifier:trim()}
- /guid – ${guid}
- /link – ${link}
- /pubdate – ${pubdate}
- /title – ${title}
- /ts – ${now():toNumber()}
- /uuid – ${uuid}
سپس UpdateRecord را به گروه Slack Sub-Processor خود متصل می کنیم
شاخه های دیگر از UpdateRecord تا Write به کافکا جریان دارند
برای PublishKafka2RecordCDP، پارامترهای زیادی برای تنظیم وجود دارد. به همین دلیل است که توصیه می کنیم با ReadyFlow شروع کنید.
پارامترهای زیادی در اینجا وجود دارد، ما باید بروکرهای کافکا، نام موضوع مقصد، JSON_Reader_InferRoot برای Reader، AvroRecordSetWriterHWX را برای نویسنده تنظیم کنیم،
غیرفعال کردن تراکنشها، تضمین تحویل تکراری، استفاده از محتوا بهعنوان مقدار رکورد، SASL_SSL/امنیت ساده، نام کاربری برای شناسه کاربری ورود به سیستم یا کاربر دستگاه و سپس رمز عبور مرتبط،
متن SSL به سرویس پیشفرض NiFi SSL Context نگاشت میشود، uuid را بهعنوان فیلد کلید پیام تنظیم میکند و در نهایت client.id را روی یک شناسه تولیدکننده منحصربهفرد Kafka تنظیم میکند.
سپس در مورد توصیه های سفر خود به Slack نیز پیام می فرستیم.
ما فقط به یک پردازنده برای ارسال به Slack نیاز داریم.
ورودی را به پردازنده PutSlack خود وصل می کنیم.
برای PutSlack باید URL Webhook را روی آدرس مدیر گروه Slack تنظیم کنیم و متن را از ورودی قرار دهیم، کانال خود را روی کانالی که در قلاب وب نگاشت شده است تنظیم کنیم و یک نام کاربری برای ربات خود تنظیم کنیم.
خدمات جریان
همه این خدمات باید تنظیم شوند.
@کپی 🀄؛ 2023 تیم اسپن https://datainmotion.dev/