برنامه نویسی

چگونه تحمل گسل در فلینک کار می کند

Apache Flink یک چارچوب پردازش جریان توزیع شده است که تحمل گسل را از طریق مکانیسمی به نام “بازرسی” فراهم می کند.

من اطلاعاتی را در مورد به اشتراک گذاشته ام checkpoint به زودی در پست زیر
https://dev.to/kination/analytics-dont-want-duplicated-data-so-get-it-it-it-once-once-tith-flinkkafka-ga4

و در اینجا توضیحات جزئیات بیشتری برای حفظ تحمل گسل ارائه شده است.

Checkpointing اجازه می دهد تا با صرفه جویی در وضعیت برنامه جریان به صورت دوره ای ، از خرابی ها بهبود یابد. در اینجا توضیح می دهد که چگونه تحمل گسل فلینک با جزئیات کار می کند و به عنوان مثال کد برای نشان دادن مفهوم.

تحمل گسل چگونه کار می کند

بازرسی:

فلینک به طور دوره ای پاسگاه هایی ایجاد می کند که کل وضعیت برنامه جریان را مدیریت می کند. این ایست بازرسی نوعی “عکس فوری سازگار” از وضعیت کلیه اپراتورها در کار است ، از جمله موقعیت هایی در جریان جریان ورودی.

بازرسی ها در سیستم ذخیره سازی بادوام (به عنوان مثال ، HDFS ، S3 یا یک سیستم فایل توزیع شده) ذخیره می شوند و می توانید مسیر یا گزینه را از طریق پیکربندی تغییر دهید.

موانع:

فلینک از مکانیسمی به نام استفاده می کند barriers برای تراز کردن ایالات در همه اپراتورهای موجود در logical graph(نمودار DataFlow).

این نمودار یک نمودار مستقیم است ، که در آن گره ها “اپراتورها” هستند و لبه ها روابط ورودی/خروجی اپراتورها را تعریف می کنند و با جریان داده ها یا مجموعه داده ها مطابقت دارند. نمودار منطقی با ارسال شغل از یک برنامه فلینک ایجاد می شود.

موانع سوابق ویژه ای هستند که به جریان داده هایی که در کنار داده ها جریان دارند ، تزریق می شوند.

هنگامی که اپراتور سدی را دریافت می کند ، تمام سوابق موجود در پشت سد را پردازش می کند ، وضعیت خود را ذخیره می کند و سپس سد را در پایین دست به جلو می برد.

پس زمینه ایالتی:

فلینک برای ذخیره و مدیریت وضعیت اپراتورها از یک پس زمینه دولتی استفاده می کند. اینها انواع باطنی هستند که از آن پشتیبانی می کند

  • MemoryStateBackend: حالت را در حافظه ذخیره می کند (با دوام نیست)
  • FSStateBackend: حالت را در حافظه ذخیره می کند اما پاسگاه ها را به یک سیستم فایل می نویسد (HDFS ، S3 و غیره)
  • RocksDBStateBackend: Stores States in RocksDB (روی دیسک) و پاسگاه ها به یک سیستم پرونده

بهبودی

در صورت عدم موفقیت ، فلینک کار را از آخرین پاسگاه تکمیل شده مجدداً راه اندازی می کند. وضعیت کلیه اپراتورها ترمیم می شود و از سریال پاسگاه آخرین بازرسی را پردازش می کند.

شرح تصویر

در کد

حال بیایید ببینیم که چگونه این کار بر اساس کد کار می کند.

...

public class CommonStreamingJob {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);

        env.setStateBackend(new FsStateBackend("hdfs:///path/to/checkpoints"));

        // Configure sample kafka source
        KafkaSource<String> source = KafkaSource.<String>builder()
            .setBootstrapServers(brokers)
            .setTopics("input-topic")
            .setGroupId("my-group")
            .setValueOnlyDeserializer(new SimpleStringSchema())
            .build();

        env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

        // Add the Kafka consumer as a source
        DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

        // Process the stream (e.g., word count)
        DataStream<Tuple2<String, Integer>> wordCounts = stream
                .flatMap(new Tokenizer())
                .keyBy(0)
                .sum(1);

        // Configure kafka sink
        KafkaSink<String> sink = KafkaSink.<String>builder()
        .setBootstrapServers(brokers)
        .setRecordSerializer(KafkaRecordSerializationSchema.builder()
            .setTopic("output-topic")
            .setValueSerializationSchema(new SimpleStringSchema())
            .build()
        )
        .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
        .build();

        wordCounts.sinkTo(sink);

        // Execute the job
        env.execute("Execute Streaming Job");
    }

    // Tokenizer function to split sentences into words
    public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            String[] words = value.toLowerCase().split("\\W+");
            for (String word : words) {
                if (word.length() > 0) {
                    out.collect(new Tuple2<>(word, 1));
                }
            }
        }
    }
}
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

در این کد ، ابتدا می توانید تعریف بازرسی را از کد پیدا کنید env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE) که هر 5 ثانیه با معناشناسی دقیق یک بازرسی را امکان پذیر می کند.

و از env.setStateBackend(new FsStateBackend("hdfs:///path/to/checkpoints")) می توانید دریابید که با حمایت دولت برای ذخیره پاسگاه ها در سیستم HDFS تعریف می کند.

این برنامه دارای اتصالات کافکا برای منبع/سینک است ، به این معنی که ورودی جریان را از موضوع کافکا “ورودی-موضوعی” در داخل کارگزار مصرف می کند و به موضوع “خروجی-موضوعی” می پردازد که در داخل یک کارگزار دیگر است.

قبل از تولید بعد از مصرف ، Tokenizer عملکرد جملات را به کلمات تقسیم می کند و ساطع می کند (کلمه ، 1) تاپل.

عملیات Keyby (0) .Sum (1) تعداد کلمات را انجام می دهد.

و البته ، که این تنظیمات ، در هنگام وقوع خرابی ، کار را از آخرین پاسگاه کامل مجدداً راه اندازی می کند و هیچ از دست دادن داده و پردازش دقیقاً یکپارچه را تضمین نمی کند.

مرجع:

نوشته های مشابه

دیدگاهتان را بنویسید

نشانی ایمیل شما منتشر نخواهد شد. بخش‌های موردنیاز علامت‌گذاری شده‌اند *

دکمه بازگشت به بالا