برنامه نویسی

درک محرک های سفارشی در آپاچی

ویندوز و محرک ها

محرک های پنجره آپاچی فلینک تعیین می کنند کی محاسبه پنجره در هنگام پردازش جریان منتشر می شود (و به صورت اختیاری پاک می شود).

هر پنجره در فلینک دارای یک ماشه مرتبط است که هر عنصر ورودی و هر رویداد تایمر مربوطه را ارزیابی می کند تا تصمیم بگیرد FIRE (انتشار نتایج) ، PURGE (محتویات پنجره را رها کنید) یا هر دو.

به عبارت دیگر ، ماشه شرایط پنجره را کنترل می کند و به فلینک می گوید چه موقع می توانید خروجی جمع شده از آن پنجره را تولید کنید. به طور پیش فرض ، فلینک محرک های داخلی را برای سناریوهای مشترک فراهم می کند ، اما محدودیت هایی دارند و این جایی است که محرک های سفارشی مفید می شوند.

ساخت و سازها

فلینک با چند محرک داخلی برای رفتارهای استاندارد همراه است:

  • EventTimetrigger: آتش سوزی هنگامی که واترمارک رویداد از انتهای پنجره عبور می کند (یعنی پنجره در زمان رویداد بسته می شود). این پیش فرض برای ویندوزهای زمان رویداد است و تضمین می کند که پس از رسیدن زمان بندی پایان پنجره مطابق با علامت های آبی ، نتایج منتشر می شود.

  • پردازش Timetrigger: هنگامی که ساعت زمان پردازش به انتهای پنجره می رسد ، آتش می گیرد. این پیش فرض برای ویندوزهای زمان پردازش است.

  • شمارش ماشه: هنگامی که تعداد عناصر موجود در پنجره به یک آستانه شمارش مشخص می رسد آتش می گیرد

  • PurgingTrigger: بسته بندی که هر محرک دیگری را به یک ماشه پاکسازی تبدیل می کند ، به این معنی که هر زمان که آتش می گیرد ، محتوای پنجره را پاک می کند.

این محرک های داخلی موارد اساسی را پوشش می دهند- time-driven یا count-driven شلیک ، اما آنها به طور مستقل عمل می کنند. اگر با یکی از این موارد ، ماشه پنجره را نادیده بگیرید ، ماشه پیش فرض را جایگزین می کنید ، نه آن را تکمیل کنید. به عنوان مثال ، اگر یک Counttrigger را به یک پنجره رویداد اختصاص دهید ، پنجره فقط براساس شمارش آتش می گیرد و علامت زمانی رویداد را نادیده می گیرد. این بدان معناست که محرک های داخلی نمی توانند خارج از جعبه ترکیب شوند-شما نمی توانید مستقیماً پنجره ای را که با استفاده از فقط با محرک های ارائه شده ، در یک شمارش یا شرایط زمانی آتش می گیرد ، بدست آورید.

در عمل ، سناریوهای پیچیده اغلب به محرک های سفارشی نیاز دارند. محرک های سفارشی به شما امکان می دهد شرایط دلخواه (یا ترکیب شرایط) را برای شلیک ویندوز تعریف کنید ، مانند “آتش سوزی یا زمان ، هر کدام که اول باشد” یا “آتش هنگامی که یک رویداد خاص رخ می دهد.”

با نوشتن یک ماشه سفارشی ، می توانید بر محدودیت های ساخت و سازها غلبه کرده و نتایج اولیه سفارشی ، منطق خاص جلسه ، سیاست های شلیک دیرهنگام و موارد دیگر را اجرا کنید.

ایجاد محرک های سفارشی

برای اجرای یک ماشه سفارشی در فلینک (DataStream API) ، کلاس ایجاد می کنید که چکیده را گسترش می دهد Trigger کلاس ، کجا T نوع عناصر موجود در پنجره است و W نوع پنجره (به عنوان مثال TimeWindow) است.

در زیر کلاس خود ، چندین روش را که فلینک برای هدایت رفتار ماشه استفاده می کند ، نادیده می گیرید. در اینجا می توانید یک لیست و توضیحات کامل را با آن روش ها پیدا کنید.

بوها TriggerContext به این روش ها ارائه شده است ، که خدمات لازم را برای ثبت و حذف تایمرها ارائه می دهد (در اینجا بیشتر ببینید) و دسترسی به حالت پارتیشن بندی شده به پنجره. برای ردیابی اطلاعاتی مانند شمارش عناصر یا پرچم ها می توانید از حالت فلینک در محرک ها استفاده کنید. به عنوان مثال ، در صورت اجرای ماشه مبتنی بر تعداد ، ممکن است یک ValueState برای شمارش عناصری که تاکنون در پنجره دیده می شود.

توجه: محرک ها باید سریال باشند زیرا بخشی از نمودار شغلی است که برای کارگران ارسال می شود. همچنین ، به یاد داشته باشید که بازگشت FIRE وضعیت پنجره را پاک نمی کند به طور پیش فرض – پنجره باقی مانده است و ممکن است داده های بیشتری را جمع کند ، به طور بالقوه بعداً دوباره شلیک می کند. اگر می خواهید پنجره هنگام شلیک پاک شود (فقط یک بار نتایج را منتشر می کند) ، باید برگردید FIRE_AND_PURGE یا از a استفاده کنید PurgingTrigger بسته بندی

نمونه هایی از هر دو رفتار را در زیر مشاهده خواهیم کرد ، بنابراین بیایید با استفاده از محرک های سفارشی در فلینک ، نمونه های دنیای واقعی را طی کنیم.

کد منبع کامل را می توان در اینجا یافت و اگر به دنبال گزینه های تولید هستید می توانید Cloud Ververica را به صورت رایگان بررسی کنید.

مثال 1: پنجره پرشور و شلیک زودرس مبتنی بر تعداد

سناریو: تصور کنید یک برنامه تجارت الکترونیکی رویدادهای کاربر را ردیابی می کند-به عنوان مثال page viewsبا add-to-cartبا checkout initiations – به عنوان یک جریان. ما می خواهیم برخی از تجزیه و تحلیل ها را برای هر کاربر در یک پنجره زمانی پرشور محاسبه کنیم (مثلاً ویندوز 1 ساعته در هر کاربر).

با این حال ، اگر کاربر بسیار فعال باشد ، ما نمی خواهیم تا پایان ساعت صبر کنیم تا نتایج واسطه ای کسب کنیم. ما تصمیم می گیریم که برای پنجره هر کاربر ، نتیجه باید زودتر از 5 رویداد منتشر شود (ما از آستانه کوچکی برای اهداف نمایش استفاده خواهیم کرد) به جای اینکه منتظر ساعت کامل باشیم ، در آن پنجره دریافت شده است.

ما یک ماشه سفارشی را اجرا خواهیم کرد که تعداد عناصر موجود در پنجره را ردیابی می کند. این کار آتش سوزی هنگامی که شمارش به 5 برسد ، و همچنین اطمینان حاصل کنید که پنجره در انتهای پنجره زمان آتش می گیردبشر

بخش مهم در داخل است CustomCountTrigger کلاس:

 @Override
    public TriggerResult onElement(UserEvent userEvent, 
                                   long timestamp,
                                   TimeWindow timeWindow,
                                   TriggerContext triggerContext) throws Exception {
        // Get or initialize the current count
        ValueState<Integer> countState = triggerContext.getPartitionedState(countStateDesc);
        Integer count = countState.value();
        if (count == null) {
            count = 0;
        }
        // Increment count for every element
        count += 1;
        countState.update(count);

        // If this is the first element, register an event-time timer for end-of-window
        // (Timers are set at window end timestamp, so when watermark passes window.end, onEventTime will fire)
        if (count == 1) {
            long windowEnd = timeWindow.getEnd();  // end timestamp of this TimeWindow
            triggerContext.registerEventTimeTimer(windowEnd);
        }

        // Check if we've reached 5 events in this window
        if (count >= 5) {
            // Fire (emit the window) *now*, but do NOT purge (we return FIRE, not FIRE_AND_PURGE).
            // This means the window contents remain, and the window will possibly fire again at end-of-window.
            return TriggerResult.FIRE;
        } else {
            // Not yet reached 5, so continue accumulating
            return TriggerResult.CONTINUE;
        }    }

    @Override
    public TriggerResult onProcessingTime(long timestamp, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
        // We don't use processing-time timers in this trigger.
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(long timestamp, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
        // This is called when the event-time timer for the window fires (i.e., watermark reached window end).
        if (timestamp == timeWindow.getEnd()) {
            // Window end reached, so fire the window result.
            // We return FIRE_AND_PURGE to emit the result and clear the window state.
            return TriggerResult.FIRE_AND_PURGE;
        }
        // If it's not the window-end timer (e.g., some other timer), we ignore.
        return TriggerResult.CONTINUE;        }

    @Override
    public void clear(TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
        // Clean up the count state when the window is purged/closed.
        ValueState<Integer> countState = triggerContext.getPartitionedState(countStateDesc);
        countState.clear();
        // We don't need to manually delete the event-time timer for window end – Flink does that when window closes.
    }
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

بیایید این را تجزیه کنیم. در هر عنصر:

  1. این یک تایمر زمان رویداد را برای پایان پنجره هنگام ورود به اولین عنصر ثبت می کند (برای اطمینان از اینکه پنجره در پایان در پایان شلیک می شود اگر ماشه شمارش قبلاً آتش نگرفته باشد)-مشابه آنچه پیش فرض پیش فرض Flink در داخل انجام می دهد.

  2. اگر شمارش به 5 برسد ، برمی گردد TriggerResult.FIREبشر ما انتخاب می کنیم FIRE (نه FIRE_AND_PURGE) به طوری که محتوای پنجره از بین نرود. این بدان معنی است که عناصر بیشتری هنوز هم می توانند برای خروجی نهایی جمع شوند. ما همچنین می توانیم تصمیم بگیریم که اگر بخواهیم هر 5 رویداد دیگر را شلیک کنیم ، حالت شمارش را به 0 تنظیم کنیم ، اما در این مثال ما فقط یک بار در 5 ساعت در 5 آتش می گیریم و سپس به تایمر نهایی تکیه می کنیم.

  3. در onEventTime()، هنگامی که Watermark به Window End Timestamp برخورد می کند ، ما برمی گردیم FIRE_AND_PURGEبشر این نتیجه پنجره نهایی را منتشر می کند و از وضعیت پنجره پاک می شود ( clear() روش برای رها کردن حالت شمارش فراخوانی خواهد شد). ما استفاده می کنیم FIRE_AND_PURGE در اینجا زیرا پس از رسیدن به پایان پنجره ، پنجره باید بسته شود و منابع را نگه ندارد.

  4. در clear() روش اطمینان می دهد که ما تعداد را از حالت خارج می کنیم تا از نشت جلوگیری کنیم. فلینک تماس خواهد گرفت clear() پس از پاکسازی پنجره (یا از طریق آتش نهایی ما یا اگر پنجره به هر دلیلی دفع شود).

این ماشه سفارشی نشان دهنده ترکیب شرایط شمارش با یک ماشه پنجره زمان رویداد است. بدون یک ماشه سفارشی ، دستیابی به “5 رویداد یا پایان پنجره” به تنهایی با ساختهای فلینک امکان پذیر نخواهد بود

اگر مثال فوق را اجرا کردید ، باید خروجی زیر را مشاهده کنید:

کاربر user_1 5 رویداد داشت در پنجره [2025-02-01 00:00:00.0,2025-02-01 01:00:00.0)
User user_1 had 6 events in window [2025-02-01 00:00:00.0,2025-02-01 01:00:00.0)
User user_2 had 5 events in window [2025-02-01 00:00:00.0,2025-02-01 01:00:00.0)
Enter fullscreen mode

Exit fullscreen mode

Notice here that the window gets triggered for user_1 when it gets 5 events and also when the window ends.

Example 2: Session Inactivity With Event Signals

Scenario: Now let’s consider a session window example. In many applications, events are grouped by sessions (a period of user activity separated by inactivity). Flink’s session windows can close a window after a period of inactivity. However, sometimes a session might also end due to a specific event –for example, a user explicitly logs out or completes a checkout.

In an e-commerce or login-based app, you may define a session to end either after 30 minutes of inactivity or when a “logout” or “checkout complete” event occurs.

Flink’s built-in session windows don’t know about the semantics of a logout event, they’d only close based on the timeout. We can create a custom trigger to handle both: fire the window when a special session-end event is seen, or if the session times out with no activity.

The important part is within the SessionEndTrigger class:

 @Override
    public TriggerResult onElement(UserEvent userEvent, long timestamp, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
        // Register an event-time timer for the end-of-window (session timeout)
        triggerContext.registerEventTimeTimer(timeWindow.getEnd());
        // If this event is a session terminating event, fire and purge the window
        if ("LOGOUT".equals(userEvent.getEventType()) || "CHECKOUT_COMPLETE".equals(userEvent.getEventType())) {
            return TriggerResult.FIRE_AND_PURGE;
        }
        // Otherwise, continue
        return TriggerResult.CONTINUE;    }

    @Override
    public TriggerResult onProcessingTime(long timestamp, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
        // not used in this trigger
        return TriggerResult.CONTINUE;    }

    @Override
    public TriggerResult onEventTime(long timestamp, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
        if (timestamp == timeWindow.getEnd()) {
            // Session inactivity timeout reached, fire and purge the window
            return TriggerResult.FIRE_AND_PURGE;
        }
        return TriggerResult.CONTINUE;
    }

    @Override
    public void onMerge(TimeWindow window, OnMergeContext ctx) throws Exception {
        // If windows merge, register a new timer for the new window end and let the old timers lapse.
        // (Flink will call onEventTime for the exact timestamps that occur; by re-registering the new end we ensure final firing.)
        ctx.registerEventTimeTimer(window.getEnd());
    }

    @Override
    public boolean canMerge() {
        return true;
    }

    @Override
    public void clear(TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
        triggerContext.deleteEventTimeTimer(timeWindow.maxTimestamp());
    }
Enter fullscreen mode

Exit fullscreen mode

Let’s break this down. On each element:

  1. We register a timer, which ensures an event-time timer is set for the current end-of-window. (If the session window extends or merges, window.getEnd() will be updated accordingly on each trigger invocation.)

  2. We then check the event’s type. If it is a designated end-of-session event (in this case, "LOGOUT" or "CHECKOUT_COMPLETE"), we immediately return FIRE_AND_PURGE. This emits the window’s results and clears the window state right away, effectively closing the session as soon as that event is encountered.

  3. If it’s not an end signal, we return CONTINUE to keep collecting events.

  4. In onEventTime(), we check if the timer is at the window’s end timestamp. If yes, that means the watermark has reached the session’s end (i.e., no new events for 15 minutes), so we return FIRE_AND_PURGE to close out the session. (If the timer firing is for some other timestamp, we ignore it with CONTINUE.)

  5. We don’t use onProcessingTime() here, so it just continues.

  6. The clear() method doesn’t have to do much because we didn’t use manual state (timers will be cleaned up by Flink after purging). If we had used any ValueState in the trigger, we’d clear it here.

If you run the code above you should see the following output:

Session for user user_1 [2025-02-01 00:20:00.0,2025-02-01 00:35:20.0) -> Events: [login, click, click, LOGOUT]

جلسه برای کاربر کاربر_2 [2025-02-01 02:31:35.0,2025-02-01 02:48:19.0) -> Events: [login, click, CHECKOUT_COMPLETE]

جلسه برای کاربر کاربر_1 [2025-02-01 01:20:00.0,2025-02-01 01:35:25.0) -> Events: [login, click, click]
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

توجه کنید که پنجره فقط در صورت وقایع خاص مانند آتش سوزی می شود LOGOUT ماشه را درست کنید.

پایان

محرک های سفارشی در Apache Flink مکانیسم قدرتمندی برای کنترل ارزیابی پنجره فراتر از زمان استاندارد یا سیاستهای شمارش فراهم می کند. در این پست ، ما دیدیم که محرک ها تصمیم می گیرند که نتایج یک پنجره ساطع می شود و می تواند چندین بار شلیک کند یا حالت پنجره را پاک کند. محرک های داخلی فلینک (زمان رویداد ، زمان پردازش ، تعداد و غیره) نیازهای اساسی را تأمین می کند ، اما آنها را نمی توان با شرایط پیچیده ترکیب یا متناسب با آنها قرار داد. با اجرای یک ماشه سفارشی ، می توانید بر این محدودیت ها غلبه کنید – به عنوان مثال ، شلیک زودرس بر اساس تعداد عناصر در حالی که هنوز هم مرزهای رویداد را مشاهده می کنید، یا پایان دادن به یک پنجره وقتی a رویداد خاص دامنه رخ می دهدبشر

غذای کلیدی و بهترین روشها:

چرخه عمر پنجره را درک کنید: ماشه ای که آتش می گیرد (باز می گردد FIRE) پنجره را نمی بندد. پنجره می تواند داده های بیشتری را جمع کند و دوباره آتش کند. برای بستن آن ، استفاده کنید FIRE_AND_PURGE یا ماشه را طراحی کنید تا در نهایت پاکسازی شود (همانطور که در پایان زمان ویندوز یا شکاف جلسه انجام دادیم).

روشهای مورد نیاز را نادیده بگیرید: در حداقل نادیده گرفتن onElementبا onEventTimeبا onProcessingTimeوت clearبشر برای ادغام سناریوهای پنجره (مانند جلسات) از Onmerge استفاده کنید تا به طور مداوم تایمرها را اداره کنید

از TriggerContext برای حالت و تایمر استفاده کنید: اگر منطق شما نیاز به شمارش رویدادها یا ردیابی پرچم دارد ، از حالت پارتیشن بندی شده استفاده کنید TriggerContextبشر همیشه حالت را پاک کنید clear() برای جلوگیری از نشت

چه موقع از محرک های سفارشی استفاده کنید: از آنها در هنگام ایجاد محرک های داخلی کافی استفاده کنید-به عنوان مثال ، شرایط ترکیبی (زمان + تعداد) ، شلیک نامنظم رویداد محور ، نتایج زودرس و افزایشی یا جلوگیری از رفتارهای پیش فرض (یک مورد استفاده مشترک ، جلوگیری از وقوع اواخر با نوشتن ماشه ای است که عناصر دیر را نادیده می گیرد). اگر یک ماشه داخلی ساده نیاز را برآورده می کند ، برای سادگی با آن بچسبید. اما همانطور که نشان داده شده است ، باعث می شود که سفارشی برای اجرای قوانین تجاری به طور مستقیم در منطق جریان درخشش داشته باشد.

نوشته های مشابه

دیدگاهتان را بنویسید

نشانی ایمیل شما منتشر نخواهد شد. بخش‌های موردنیاز علامت‌گذاری شده‌اند *

دکمه بازگشت به بالا