چگونه تحمل گسل در فلینک کار می کند

Apache Flink یک چارچوب پردازش جریان توزیع شده است که تحمل گسل را از طریق مکانیسمی به نام “بازرسی” فراهم می کند.
من اطلاعاتی را در مورد به اشتراک گذاشته ام checkpoint
به زودی در پست زیر
https://dev.to/kination/analytics-dont-want-duplicated-data-so-get-it-it-it-once-once-tith-flinkkafka-ga4
و در اینجا توضیحات جزئیات بیشتری برای حفظ تحمل گسل ارائه شده است.
Checkpointing
اجازه می دهد تا با صرفه جویی در وضعیت برنامه جریان به صورت دوره ای ، از خرابی ها بهبود یابد. در اینجا توضیح می دهد که چگونه تحمل گسل فلینک با جزئیات کار می کند و به عنوان مثال کد برای نشان دادن مفهوم.
تحمل گسل چگونه کار می کند
بازرسی:
فلینک به طور دوره ای پاسگاه هایی ایجاد می کند که کل وضعیت برنامه جریان را مدیریت می کند. این ایست بازرسی نوعی “عکس فوری سازگار” از وضعیت کلیه اپراتورها در کار است ، از جمله موقعیت هایی در جریان جریان ورودی.
بازرسی ها در سیستم ذخیره سازی بادوام (به عنوان مثال ، HDFS ، S3 یا یک سیستم فایل توزیع شده) ذخیره می شوند و می توانید مسیر یا گزینه را از طریق پیکربندی تغییر دهید.
موانع:
فلینک از مکانیسمی به نام استفاده می کند barriers
برای تراز کردن ایالات در همه اپراتورهای موجود در logical graph
(نمودار DataFlow).
این نمودار یک نمودار مستقیم است ، که در آن گره ها “اپراتورها” هستند و لبه ها روابط ورودی/خروجی اپراتورها را تعریف می کنند و با جریان داده ها یا مجموعه داده ها مطابقت دارند. نمودار منطقی با ارسال شغل از یک برنامه فلینک ایجاد می شود.
موانع سوابق ویژه ای هستند که به جریان داده هایی که در کنار داده ها جریان دارند ، تزریق می شوند.
هنگامی که اپراتور سدی را دریافت می کند ، تمام سوابق موجود در پشت سد را پردازش می کند ، وضعیت خود را ذخیره می کند و سپس سد را در پایین دست به جلو می برد.
پس زمینه ایالتی:
فلینک برای ذخیره و مدیریت وضعیت اپراتورها از یک پس زمینه دولتی استفاده می کند. اینها انواع باطنی هستند که از آن پشتیبانی می کند
- MemoryStateBackend: حالت را در حافظه ذخیره می کند (با دوام نیست)
- FSStateBackend: حالت را در حافظه ذخیره می کند اما پاسگاه ها را به یک سیستم فایل می نویسد (HDFS ، S3 و غیره)
- RocksDBStateBackend: Stores States in RocksDB (روی دیسک) و پاسگاه ها به یک سیستم پرونده
بهبودی
در صورت عدم موفقیت ، فلینک کار را از آخرین پاسگاه تکمیل شده مجدداً راه اندازی می کند. وضعیت کلیه اپراتورها ترمیم می شود و از سریال پاسگاه آخرین بازرسی را پردازش می کند.
در کد
حال بیایید ببینیم که چگونه این کار بر اساس کد کار می کند.
...
public class CommonStreamingJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
env.setStateBackend(new FsStateBackend("hdfs:///path/to/checkpoints"));
// Configure sample kafka source
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(brokers)
.setTopics("input-topic")
.setGroupId("my-group")
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
// Add the Kafka consumer as a source
DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
// Process the stream (e.g., word count)
DataStream<Tuple2<String, Integer>> wordCounts = stream
.flatMap(new Tokenizer())
.keyBy(0)
.sum(1);
// Configure kafka sink
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers(brokers)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("output-topic")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
wordCounts.sinkTo(sink);
// Execute the job
env.execute("Execute Streaming Job");
}
// Tokenizer function to split sentences into words
public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
String[] words = value.toLowerCase().split("\\W+");
for (String word : words) {
if (word.length() > 0) {
out.collect(new Tuple2<>(word, 1));
}
}
}
}
}
در این کد ، ابتدا می توانید تعریف بازرسی را از کد پیدا کنید env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE)
که هر 5 ثانیه با معناشناسی دقیق یک بازرسی را امکان پذیر می کند.
و از env.setStateBackend(new FsStateBackend("hdfs:///path/to/checkpoints"))
می توانید دریابید که با حمایت دولت برای ذخیره پاسگاه ها در سیستم HDFS تعریف می کند.
این برنامه دارای اتصالات کافکا برای منبع/سینک است ، به این معنی که ورودی جریان را از موضوع کافکا “ورودی-موضوعی” در داخل کارگزار مصرف می کند و به موضوع “خروجی-موضوعی” می پردازد که در داخل یک کارگزار دیگر است.
قبل از تولید بعد از مصرف ، Tokenizer
عملکرد جملات را به کلمات تقسیم می کند و ساطع می کند (کلمه ، 1) تاپل.
عملیات Keyby (0) .Sum (1) تعداد کلمات را انجام می دهد.
و البته ، که این تنظیمات ، در هنگام وقوع خرابی ، کار را از آخرین پاسگاه کامل مجدداً راه اندازی می کند و هیچ از دست دادن داده و پردازش دقیقاً یکپارچه را تضمین نمی کند.
مرجع: