برنامه نویسی

Python SDK: غواصی در Workers و Workflows

در پست قبلی خود، اعلام کردیم که Python SDK خود را در GA داریم و کمی تاریخچه را در مورد اینکه چرا کاری را که انجام دادیم ساخته‌ایم و برخی پیش‌نیازهایی که برای شروع به آن نیاز داشتید به اشتراک گذاشتیم. اکنون، بیایید کمی عمیق‌تر شیرجه بزنیم تا از کنار شما بگذریم. برای اهداف این پست، ما می‌خواهیم فایل واحدی را که در مثال hello_activity.py نشان داده شده است، تجزیه کنیم و آن را به چندین فایل تقسیم کنیم تا بیشتر با ظاهر برنامه شما مطابقت داشته باشیم!

کد گردش کار: workflow.py

ما می خواهیم ابتدا گردش کار را ایجاد کنیم، زیرا کلاس کارگر قرار است این را فراخوانی کند. اگر ابتدا این کار را انجام ندهیم، کد شما اجرا نخواهد شد و این کار جالبی نیست.

بالای فایل ما، مانند هر فایل پایتون خوب، دارای وارداتی است که ما برای از بین بردن این مورد نیاز داریم. ما همچنین در حال تعریف یک کلاس داده پایتون هستیم که شیء مورد استفاده ما برای ارسال داده به گردش کار خواهد بود. با ارسال یک کلاس داده پایتون به جای چندین پارامتر به گردش کار، می توانیم به جای تغییر نحوه فراخوانی گردش کار، فیلدهایی را از کلاس داده اضافه یا حذف کنیم. این اجازه می دهد تا برخی از انعطاف پذیری را برای ایجاد تغییرات آینده بدون نیاز به نسخه گردش کار.

در این مورد، ComposeGreetingInput به ما اجازه می دهد تا a greeting رشته و الف name رشته را وارد جریان کار کنید.

from dataclasses import dataclass
from datetime import timedelta
from temporalio import activity, workflow

# Temporal strongly encourages using a single dataclass so 
# that you can add fields in a backwards-compatible way.
@dataclass
class ComposeGreetingInput:
    greeting: str
    name: str
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

درست در زیر ComposeGreetingInput ما قصد داریم دو قطعه کد جدید اضافه کنیم، اول یک فعالیت است.

Temporal Python SDK همراه با دکوراتورهایی است که کارهایی را برای شما انجام می دهند. در مورد یک اکتیویتی، این روش را به عنوان یک فعالیت معتبر ثبت می کند که می تواند توسط Worker اجرا شود (به زیر مراجعه کنید). یک اکتیویتی برخی از داده ها را می گیرد، آنها را پردازش می کند و می تواند مقادیری را برگرداند. (در واقع، این دقیقاً مانند هر روش دیگری است). تنها چیزی که باید به آن اشاره کنیم، این است که اگر می‌خواهید کاری انجام دهید که غیرقطعی است، مانند ایجاد یک UUID، می‌خواهید آن را در فعالیت‌های خود یا روش‌هایی که فعالیت‌های شما به آن نیاز دارند، انجام دهید.

compose_greeting در می گیرد ComposeGreetingInput شیء کلاس داده ای که در بالا تعریف کردیم، برخی از اطلاعات را به ترمینال ثبت می کند، و سپس رشته های به هم پیوسته را از شی ارسال شده برمی گرداند. party API در این روش.

# Basic activity that logs and does string concatenation
@activity.defn
async def compose_greeting(input: ComposeGreetingInput) -> str:
    activity.logger.info("Running activity with parameter %s" % input)
    return f"{input.greeting}, {input.name}!"
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

اکنون که یک اکتیویتی داریم، می‌خواهیم کلاس Workflow را اضافه کنیم. این کد ستون فقرات اسکریپت Temporal شما است. همه چیزهایی که Temporal در این مثال برای شما اجرا می کند، در یک کلاس Workflow قرار دارد.
کلاس را با workflow.defn برای ثبت این مورد به درستی، بنابراین Temporal Worker (به زیر مراجعه کنید) بداند که این یک کلاس گردش کار معتبر است.
اولین متد در این کلاس مثال است run، اما چند نکته وجود دارد که باید گفت:
را run روش با تزئین شده است @workflow.run، که به Worker می گوید که این روشی است که هنگام شروع یک گردش کار اجرا می شود. می توان از آن برای تنظیم متغیرهای گردش کار، فراخوانی یک یا چند فعالیت و بسیاری موارد دیگر استفاده کرد. برای کسب اطلاعات بیشتر، بررسی کنید:

این متد خود یک روش همگام است، زیرا کتابخانه پایتون Temporal از asyncio در زیر هود استفاده می‌کند و متدهایی که نامیده می‌شوند باید به صورت ناهمزمان اجرا شوند.
گردش کار run متد انتظار یک رشته را دارد name زمانی که توسط Worker فراخوانی شود، تصویب شود

در متد، ما Temporals را فراخوانی می کنیم workflow.execute_activity روش، که طول می کشد:

  • مرجع روش فعالیت، در این مورد compose_greeting
  • آرگومان‌هایی که در این مورد از ویژگی کلاس‌های داده استفاده می‌کنیم تا در خوشامدگویی «Hello» و نام آن عبور کنیم. name به ComposeGreetingInput کلاس داده ای که قبلا ایجاد کردیم.
  • یک تایم اوت در این مورد، ما ارائه می دهیم start_to_close_timeout، که به سرور موقت می گوید که این فعالیت را 10 ثانیه از هر زمان که فعالیت شروع می شود، از بین ببرد.
# Basic workflow that logs and invokes an activity
@workflow.defn
class GreetingWorkflow:
    @workflow.run
    async def run(self, name: str) -> str:
        workflow.logger.info("Running workflow with parameter %s" % name)
        return await workflow.execute_activity(
            compose_greeting,
            ComposeGreetingInput("Hello", name),
            start_to_close_timeout=timedelta(seconds=10),
        )
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

وقتی کار شما با این فایل تمام شد، به شکل زیر در می آید:

workflow.py

from dataclasses import dataclass
from datetime import timedelta
from temporalio import activity, workflow

@dataclass
class ComposeGreetingInput:
    greeting: str
    name: str

# Basic activity that logs and does string concatenation
@activity.defn
async def compose_greeting(input: ComposeGreetingInput) -> str:
    activity.logger.info("Running activity with parameter %s" % input)
    return f"{input.greeting}, {input.name}!"

# Basic workflow that logs and invokes an activity
@workflow.defn
class GreetingWorkflow:
    @workflow.run
    async def run(self, name: str) -> str:
        workflow.logger.info("Running workflow with parameter %s" % name)
        return await workflow.execute_activity(
            compose_greeting,
            ComposeGreetingInput("Hello", name),
            start_to_close_timeout=timedelta(seconds=10),
        )
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

اکنون که کد گردش کار را داریم، همه اینها را با Worker گره می زنیم، این کدی است که Temporal برای اجرای کد گردش کار شما به صورت ناهمزمان به هنگام نمایش وظایف در صف استفاده می کند. در این مورد، مثال ما بسیار ساده است، بنابراین اسکریپت بلافاصله پس از ایجاد کارگر، وظیفه را به صف اضافه می کند. در مخزن نمونه‌های ما، نمونه‌های عمیق‌تری از نحوه ساخت برنامه‌هایی را خواهید یافت که در آن اسکریپت‌های مختلف خارج از این دو می‌توانند باعث شوند که گردش کار کارهای دیگر را به صورت ناهمزمان انجام دهد. حتی می توانید با استفاده از یک پرس و جو از گردش کار در مورد خودش یا داده هایی که برای شما نگهداری می کند بپرسید.

کد اجرایی: worker.py

بیایید بالای اسکریپت خود را تنظیم کنیم تا همه واردات و محیط مناسب را داشته باشیم. این اسکلت برنامه ما خواهد بود:

worker.py

import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
from workflow import GreetingWorkflow, compose_greeting

interrupt_event = asyncio.Event()

async def main():
    print("Hello, world!")

if __name__ == "__main__":
   asyncio.run(main())
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

این مثال در حالت بالا اجرا می شود، اما تنها کاری که انجام می دهد چاپ “سلام، دنیا!” در این نقطه به ترمینال خود بروید.

برای استفاده از مزیت Temporal، ما به کد خود برای صحبت با سرور Temporal نیاز داریم. ما از طریق یک کلاینت به سرور متصل می شویم. کد ما از طریق این اتصال، فعالیت‌ها، سیگنال‌ها و پرس و جوها را به صف وظایف موقت در سرور موقت ارسال می‌کند. هنگامی که این وظایف در صف اجرا می شوند، به تاریخچه گردش کار متعهد می شوند و این به Temporal اجازه می دهد دقیقاً بداند چه کدی اجرا شده است، چه کدی برای اجرا باقی مانده است و وضعیت برنامه شما در هر لحظه از زمان در چه وضعیتی است. !

یک مشتری ایجاد کنید

عوض کن main() عملکرد بالا با کد اتصال کلاینت، با استفاده از همان مسیر و پورت URL پیش‌فرض لوکال هاست برای سرور موقت:

async def main():
   # Uncomment the line below to see logging
   # logging.basicConfig(level=logging.INFO)

   # Start client
   client = await Client.connect("localhost:7233")
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

توجه: ما استفاده نمی کنیم http://localhost:7233، اما به جای آن localhost:7233!

بعد، می‌خواهیم چیزی را اضافه کنیم که کارگر می‌نامیم، که قطعه کدی است که در واقع کد گردش کار ما را برای فعالیت‌های موجود در صف فراخوانی می‌کند.

ایجاد کارگر

در خط درست زیر مشتری، داخل main()، موارد زیر را اضافه کنید:

   # Run a worker for the workflow
   async with Worker(
       client,
       task_queue="hello-activity-task-queue",
       workflows=[GreetingWorkflow],
       activities=[compose_greeting],
   ):
       print("Still saying ‘hello’ to you, world!")
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

اجرای این کد به Temporal Server می گوید که یک کارگر آماده است تا با ارسال اطلاعات زیر وظایف را پردازش کند:

  • client – به کارگر اجازه می دهد دستش را دراز کند و بگوید “من اینجا هستم، سرور موقت، به من کار بده!”
  • task_queue – به سرور موقت می گوید: “من فقط برای پردازش وظایف از این صف تنظیم شده ام”
  • workflows – فهرستی از مراجع کلاس پایتون، به نام Workflows (به زیر مراجعه کنید)، که به طور خاص برای کنترل دویدن نوشته شده است activities که وظایف درخواستی را پردازش می کند task_queue
  • activities – آرایه ای از مراجع تابع پایتون که می تواند وظایف را در صف وظایف پردازش کند

در این مرحله کد شما به این شکل خواهد بود. تمام کاری که این کد انجام می دهد این است که به سرور موقت متصل می شود و به کد Worker اجازه می دهد تا شما را اجرا کند print بیانیه. ما هنوز پتانسیل کامل Temporal را درک نکرده ایم. این کد چاپ خواهد شد Still saying ‘hello’ to you, world! به ترمینال شما پس از اجرای این کد، اگر به آدرس وب UI بروید (پیش‌فرض است 127.0.0.1:8233)، هنوز چیزی در لیست گردش کار خود نخواهید دید.

import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
from workflow import GreetingWorkflow, compose_greeting

interrupt_event = asyncio.Event()

async def main():
   # Uncomment the line below to see logging
   # logging.basicConfig(level=logging.INFO)

   # Start client
   client = await Client.connect("localhost:7233")

   # Run a worker for the workflow
   async with Worker(
       client,
       task_queue="hello-activity-task-queue",
       workflows=[GreetingWorkflow],
       activities=[compose_greeting],
   ):
       print("Still saying ‘hello’ to you, world!")

if __name__ == "__main__":
   asyncio.run(main())
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

برای اجرای ما GreetingWorkflow، از مشتری می خواهیم گردش کار را اجرا کند. سپس، هنگامی که اسکریپت خود را اجرا می کنیم، Temporal Server کارهایی که کد تاکنون انجام داده را پیگیری می کند و شما می توانید تمام کارهایی که Temporal برای شما انجام داده است را در رابط کاربری وب مشاهده کنید.

در کد بالا، عبارت print را با عبارت زیر جایگزین کنید:

       # While the worker is running, use the client to run the workflow and
       # print out its result.
       result = await client.execute_workflow(
           GreetingWorkflow.run,
           "World",
           id="hello-activity-workflow-id",
           task_queue="hello-activity-task-queue",
       )
       print(f"Result: {result}")
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

این کد به کارگر اجازه می دهد تا به مشتری بگوید:

  • به صورت ناهمزمان execute_workflow را فراخوانی کنید (یعنی گردش کار را اجرا کنید run روش)
  • ورودی “World” را به آن ارسال کنید GreetingWorkflow.run
  • به گردش کار یک شناسه بدهید hello-activity-workflow-id
  • وظایفی را که Workflow ایجاد می کند در آن قرار دهید hello-activity-task-queue (که ممکن است هنگام ایجاد Worker از مکان های دوردست مانند چند خط بالا تشخیص دهید)

در حال اجرا آن همه

این را در ترمینال خود اجرا کنید: python worker.py

هنگامی که Worker کد را اجرا می کند، رشته برگشتی را از آن تنظیم می کند compose_greeting به result و سپس آن را در ترمینال چاپ می کند. باید در نهایت ببینی Hello, World! در ترمینال شما

در رابط کاربری وب، چیزی شبیه به این را خواهید دید، با یک ردیف گردش کار برای هر بار اجرای کد شما:

جریان های کاری اخیر

با کلیک بر روی شناسه گردش کار برای یکی از ردیف‌ها، تمام اتفاقاتی که با کد شما و سرور موقتی که کارگر شما کد گردش کار را اجرا می‌کند را می‌بینید:

جزئیات گردش کار

گفتگوی واقعی: Temporal برای برنامه‌هایی ساخته شده است که کارهایی بیش از چاپ متن در ترمینال شما انجام می‌دهند، اما ما می‌خواستیم راهی بسیار ساده برای نشان دادن قطعات مختلف به شما ارائه دهیم. در یک پست وبلاگ بعدی، یک برنامه واقعی را به شما نشان خواهیم داد که از Temporal برای حفظ وضعیت در جریان کار استفاده می کند.

برای سرگرمی‌ها، اگر می‌خواهید کار ساده‌ای انجام دهید، اما عملکرد کمی به شما می‌دهد، می‌توانید با این نسخه از کنسول ورودی دریافت کنید. worker.py:

import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
from workflow import GreetingWorkflow, compose_greeting

interrupt_event = asyncio.Event()

async def main():
   # Uncomment the line below to see logging
   # logging.basicConfig(level=logging.INFO)

   # Start client
   client = await Client.connect("localhost:7233")

   # Run a worker for the workflow
   async with Worker(
       client,
       task_queue="hello-activity-task-queue",
       workflows=[GreetingWorkflow],
       activities=[compose_greeting],
   ):
       print("What's your name?")
       name = input()

       # While the worker is running, use the client to run the workflow and
       # print out its result.
       result = await client.execute_workflow(
           GreetingWorkflow.run,
           name,
           id="hello-activity-workflow-id",
           task_queue="hello-activity-task-queue",
       )
       print(f"Result: {result}")

if __name__ == "__main__":
   asyncio.run(main())
وارد حالت تمام صفحه شوید

از حالت تمام صفحه خارج شوید

در مورد من، خروجی به این صورت بود:

سلام مت

این کد نیز کمی متفاوت است WorkflowExecutionStarted رویداد در تاریخچه رویداد گردش کار در رابط کاربری وب. می‌توانیم ببینیم که ورودی «Matt» به‌جای «World» به فعالیت منتقل شده است.

روی «Workflow Execution Started» در گردش کار خود کلیک کنید تا تفاوت را ببینید!

اجرای گردش کار شروع شد

موفقیت!

در اینجا شما آن را دارید، گردش کارها و کارگران به برخی مناطق تقسیم می شوند تا بتوانید در برنامه خود روی آنها کار کنید. امیدواریم این به شما کمک کند تا با برنامه پایتون خود پیش بروید و درک بهتری از اینکه چگونه می توانید با Temporal پیش بروید.

آیا می خواهید آن را یک قدم جلوتر بردارید؟ پست مدافع توسعه دهندگان ما را در مورد نحوه ساخت یک برنامه پوکر با استفاده از برخی از اصول اولیه Temporal بررسی کنید. برای آموزش کامل، ساختن یک برنامه زمانی از ابتدا در پایتون را بررسی کنید.

نوشته های مشابه

دیدگاهتان را بنویسید

نشانی ایمیل شما منتشر نخواهد شد. بخش‌های موردنیاز علامت‌گذاری شده‌اند *

دکمه بازگشت به بالا