Python SDK: غواصی در Workers و Workflows

در پست قبلی خود، اعلام کردیم که Python SDK خود را در GA داریم و کمی تاریخچه را در مورد اینکه چرا کاری را که انجام دادیم ساختهایم و برخی پیشنیازهایی که برای شروع به آن نیاز داشتید به اشتراک گذاشتیم. اکنون، بیایید کمی عمیقتر شیرجه بزنیم تا از کنار شما بگذریم. برای اهداف این پست، ما میخواهیم فایل واحدی را که در مثال hello_activity.py نشان داده شده است، تجزیه کنیم و آن را به چندین فایل تقسیم کنیم تا بیشتر با ظاهر برنامه شما مطابقت داشته باشیم!
کد گردش کار: workflow.py
ما می خواهیم ابتدا گردش کار را ایجاد کنیم، زیرا کلاس کارگر قرار است این را فراخوانی کند. اگر ابتدا این کار را انجام ندهیم، کد شما اجرا نخواهد شد و این کار جالبی نیست.
بالای فایل ما، مانند هر فایل پایتون خوب، دارای وارداتی است که ما برای از بین بردن این مورد نیاز داریم. ما همچنین در حال تعریف یک کلاس داده پایتون هستیم که شیء مورد استفاده ما برای ارسال داده به گردش کار خواهد بود. با ارسال یک کلاس داده پایتون به جای چندین پارامتر به گردش کار، می توانیم به جای تغییر نحوه فراخوانی گردش کار، فیلدهایی را از کلاس داده اضافه یا حذف کنیم. این اجازه می دهد تا برخی از انعطاف پذیری را برای ایجاد تغییرات آینده بدون نیاز به نسخه گردش کار.
در این مورد، ComposeGreetingInput
به ما اجازه می دهد تا a greeting
رشته و الف name
رشته را وارد جریان کار کنید.
from dataclasses import dataclass
from datetime import timedelta
from temporalio import activity, workflow
# Temporal strongly encourages using a single dataclass so
# that you can add fields in a backwards-compatible way.
@dataclass
class ComposeGreetingInput:
greeting: str
name: str
درست در زیر ComposeGreetingInput
ما قصد داریم دو قطعه کد جدید اضافه کنیم، اول یک فعالیت است.
Temporal Python SDK همراه با دکوراتورهایی است که کارهایی را برای شما انجام می دهند. در مورد یک اکتیویتی، این روش را به عنوان یک فعالیت معتبر ثبت می کند که می تواند توسط Worker اجرا شود (به زیر مراجعه کنید). یک اکتیویتی برخی از داده ها را می گیرد، آنها را پردازش می کند و می تواند مقادیری را برگرداند. (در واقع، این دقیقاً مانند هر روش دیگری است). تنها چیزی که باید به آن اشاره کنیم، این است که اگر میخواهید کاری انجام دهید که غیرقطعی است، مانند ایجاد یک UUID، میخواهید آن را در فعالیتهای خود یا روشهایی که فعالیتهای شما به آن نیاز دارند، انجام دهید.
compose_greeting
در می گیرد ComposeGreetingInput
شیء کلاس داده ای که در بالا تعریف کردیم، برخی از اطلاعات را به ترمینال ثبت می کند، و سپس رشته های به هم پیوسته را از شی ارسال شده برمی گرداند. party API در این روش.
# Basic activity that logs and does string concatenation
@activity.defn
async def compose_greeting(input: ComposeGreetingInput) -> str:
activity.logger.info("Running activity with parameter %s" % input)
return f"{input.greeting}, {input.name}!"
اکنون که یک اکتیویتی داریم، میخواهیم کلاس Workflow را اضافه کنیم. این کد ستون فقرات اسکریپت Temporal شما است. همه چیزهایی که Temporal در این مثال برای شما اجرا می کند، در یک کلاس Workflow قرار دارد.
کلاس را با workflow.defn
برای ثبت این مورد به درستی، بنابراین Temporal Worker (به زیر مراجعه کنید) بداند که این یک کلاس گردش کار معتبر است.
اولین متد در این کلاس مثال است run
، اما چند نکته وجود دارد که باید گفت:
را run
روش با تزئین شده است @workflow.run
، که به Worker می گوید که این روشی است که هنگام شروع یک گردش کار اجرا می شود. می توان از آن برای تنظیم متغیرهای گردش کار، فراخوانی یک یا چند فعالیت و بسیاری موارد دیگر استفاده کرد. برای کسب اطلاعات بیشتر، بررسی کنید:
این متد خود یک روش همگام است، زیرا کتابخانه پایتون Temporal از asyncio در زیر هود استفاده میکند و متدهایی که نامیده میشوند باید به صورت ناهمزمان اجرا شوند.
گردش کار run
متد انتظار یک رشته را دارد name
زمانی که توسط Worker فراخوانی شود، تصویب شود
در متد، ما Temporals را فراخوانی می کنیم workflow.execute_activity
روش، که طول می کشد:
- مرجع روش فعالیت، در این مورد
compose_greeting
- آرگومانهایی که در این مورد از ویژگی کلاسهای داده استفاده میکنیم تا در خوشامدگویی «Hello» و نام آن عبور کنیم.
name
بهComposeGreetingInput
کلاس داده ای که قبلا ایجاد کردیم. - یک تایم اوت در این مورد، ما ارائه می دهیم
start_to_close_timeout
، که به سرور موقت می گوید که این فعالیت را 10 ثانیه از هر زمان که فعالیت شروع می شود، از بین ببرد.
# Basic workflow that logs and invokes an activity
@workflow.defn
class GreetingWorkflow:
@workflow.run
async def run(self, name: str) -> str:
workflow.logger.info("Running workflow with parameter %s" % name)
return await workflow.execute_activity(
compose_greeting,
ComposeGreetingInput("Hello", name),
start_to_close_timeout=timedelta(seconds=10),
)
وقتی کار شما با این فایل تمام شد، به شکل زیر در می آید:
workflow.py
from dataclasses import dataclass
from datetime import timedelta
from temporalio import activity, workflow
@dataclass
class ComposeGreetingInput:
greeting: str
name: str
# Basic activity that logs and does string concatenation
@activity.defn
async def compose_greeting(input: ComposeGreetingInput) -> str:
activity.logger.info("Running activity with parameter %s" % input)
return f"{input.greeting}, {input.name}!"
# Basic workflow that logs and invokes an activity
@workflow.defn
class GreetingWorkflow:
@workflow.run
async def run(self, name: str) -> str:
workflow.logger.info("Running workflow with parameter %s" % name)
return await workflow.execute_activity(
compose_greeting,
ComposeGreetingInput("Hello", name),
start_to_close_timeout=timedelta(seconds=10),
)
اکنون که کد گردش کار را داریم، همه اینها را با Worker گره می زنیم، این کدی است که Temporal برای اجرای کد گردش کار شما به صورت ناهمزمان به هنگام نمایش وظایف در صف استفاده می کند. در این مورد، مثال ما بسیار ساده است، بنابراین اسکریپت بلافاصله پس از ایجاد کارگر، وظیفه را به صف اضافه می کند. در مخزن نمونههای ما، نمونههای عمیقتری از نحوه ساخت برنامههایی را خواهید یافت که در آن اسکریپتهای مختلف خارج از این دو میتوانند باعث شوند که گردش کار کارهای دیگر را به صورت ناهمزمان انجام دهد. حتی می توانید با استفاده از یک پرس و جو از گردش کار در مورد خودش یا داده هایی که برای شما نگهداری می کند بپرسید.
کد اجرایی: worker.py
بیایید بالای اسکریپت خود را تنظیم کنیم تا همه واردات و محیط مناسب را داشته باشیم. این اسکلت برنامه ما خواهد بود:
worker.py
import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
from workflow import GreetingWorkflow, compose_greeting
interrupt_event = asyncio.Event()
async def main():
print("Hello, world!")
if __name__ == "__main__":
asyncio.run(main())
این مثال در حالت بالا اجرا می شود، اما تنها کاری که انجام می دهد چاپ “سلام، دنیا!” در این نقطه به ترمینال خود بروید.
برای استفاده از مزیت Temporal، ما به کد خود برای صحبت با سرور Temporal نیاز داریم. ما از طریق یک کلاینت به سرور متصل می شویم. کد ما از طریق این اتصال، فعالیتها، سیگنالها و پرس و جوها را به صف وظایف موقت در سرور موقت ارسال میکند. هنگامی که این وظایف در صف اجرا می شوند، به تاریخچه گردش کار متعهد می شوند و این به Temporal اجازه می دهد دقیقاً بداند چه کدی اجرا شده است، چه کدی برای اجرا باقی مانده است و وضعیت برنامه شما در هر لحظه از زمان در چه وضعیتی است. !
یک مشتری ایجاد کنید
عوض کن main()
عملکرد بالا با کد اتصال کلاینت، با استفاده از همان مسیر و پورت URL پیشفرض لوکال هاست برای سرور موقت:
async def main():
# Uncomment the line below to see logging
# logging.basicConfig(level=logging.INFO)
# Start client
client = await Client.connect("localhost:7233")
توجه: ما استفاده نمی کنیم http://localhost:7233
، اما به جای آن localhost:7233
!
بعد، میخواهیم چیزی را اضافه کنیم که کارگر مینامیم، که قطعه کدی است که در واقع کد گردش کار ما را برای فعالیتهای موجود در صف فراخوانی میکند.
ایجاد کارگر
در خط درست زیر مشتری، داخل main()
، موارد زیر را اضافه کنید:
# Run a worker for the workflow
async with Worker(
client,
task_queue="hello-activity-task-queue",
workflows=[GreetingWorkflow],
activities=[compose_greeting],
):
print("Still saying ‘hello’ to you, world!")
اجرای این کد به Temporal Server می گوید که یک کارگر آماده است تا با ارسال اطلاعات زیر وظایف را پردازش کند:
-
client
– به کارگر اجازه می دهد دستش را دراز کند و بگوید “من اینجا هستم، سرور موقت، به من کار بده!” -
task_queue
– به سرور موقت می گوید: “من فقط برای پردازش وظایف از این صف تنظیم شده ام” -
workflows
– فهرستی از مراجع کلاس پایتون، به نامWorkflows
(به زیر مراجعه کنید)، که به طور خاص برای کنترل دویدن نوشته شده استactivities
که وظایف درخواستی را پردازش می کندtask_queue
-
activities
– آرایه ای از مراجع تابع پایتون که می تواند وظایف را در صف وظایف پردازش کند
در این مرحله کد شما به این شکل خواهد بود. تمام کاری که این کد انجام می دهد این است که به سرور موقت متصل می شود و به کد Worker اجازه می دهد تا شما را اجرا کند print
بیانیه. ما هنوز پتانسیل کامل Temporal را درک نکرده ایم. این کد چاپ خواهد شد Still saying ‘hello’ to you, world!
به ترمینال شما پس از اجرای این کد، اگر به آدرس وب UI بروید (پیشفرض است 127.0.0.1:8233
)، هنوز چیزی در لیست گردش کار خود نخواهید دید.
import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
from workflow import GreetingWorkflow, compose_greeting
interrupt_event = asyncio.Event()
async def main():
# Uncomment the line below to see logging
# logging.basicConfig(level=logging.INFO)
# Start client
client = await Client.connect("localhost:7233")
# Run a worker for the workflow
async with Worker(
client,
task_queue="hello-activity-task-queue",
workflows=[GreetingWorkflow],
activities=[compose_greeting],
):
print("Still saying ‘hello’ to you, world!")
if __name__ == "__main__":
asyncio.run(main())
برای اجرای ما GreetingWorkflow
، از مشتری می خواهیم گردش کار را اجرا کند. سپس، هنگامی که اسکریپت خود را اجرا می کنیم، Temporal Server کارهایی که کد تاکنون انجام داده را پیگیری می کند و شما می توانید تمام کارهایی که Temporal برای شما انجام داده است را در رابط کاربری وب مشاهده کنید.
در کد بالا، عبارت print را با عبارت زیر جایگزین کنید:
# While the worker is running, use the client to run the workflow and
# print out its result.
result = await client.execute_workflow(
GreetingWorkflow.run,
"World",
id="hello-activity-workflow-id",
task_queue="hello-activity-task-queue",
)
print(f"Result: {result}")
این کد به کارگر اجازه می دهد تا به مشتری بگوید:
- به صورت ناهمزمان execute_workflow را فراخوانی کنید (یعنی گردش کار را اجرا کنید
run
روش) - ورودی “World” را به آن ارسال کنید
GreetingWorkflow.run
- به گردش کار یک شناسه بدهید
hello-activity-workflow-id
- وظایفی را که Workflow ایجاد می کند در آن قرار دهید
hello-activity-task-queue
(که ممکن است هنگام ایجاد Worker از مکان های دوردست مانند چند خط بالا تشخیص دهید)
در حال اجرا آن همه
این را در ترمینال خود اجرا کنید: python worker.py
هنگامی که Worker کد را اجرا می کند، رشته برگشتی را از آن تنظیم می کند compose_greeting
به result
و سپس آن را در ترمینال چاپ می کند. باید در نهایت ببینی Hello, World!
در ترمینال شما
در رابط کاربری وب، چیزی شبیه به این را خواهید دید، با یک ردیف گردش کار برای هر بار اجرای کد شما:
با کلیک بر روی شناسه گردش کار برای یکی از ردیفها، تمام اتفاقاتی که با کد شما و سرور موقتی که کارگر شما کد گردش کار را اجرا میکند را میبینید:
گفتگوی واقعی: Temporal برای برنامههایی ساخته شده است که کارهایی بیش از چاپ متن در ترمینال شما انجام میدهند، اما ما میخواستیم راهی بسیار ساده برای نشان دادن قطعات مختلف به شما ارائه دهیم. در یک پست وبلاگ بعدی، یک برنامه واقعی را به شما نشان خواهیم داد که از Temporal برای حفظ وضعیت در جریان کار استفاده می کند.
برای سرگرمیها، اگر میخواهید کار سادهای انجام دهید، اما عملکرد کمی به شما میدهد، میتوانید با این نسخه از کنسول ورودی دریافت کنید. worker.py
:
import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
from workflow import GreetingWorkflow, compose_greeting
interrupt_event = asyncio.Event()
async def main():
# Uncomment the line below to see logging
# logging.basicConfig(level=logging.INFO)
# Start client
client = await Client.connect("localhost:7233")
# Run a worker for the workflow
async with Worker(
client,
task_queue="hello-activity-task-queue",
workflows=[GreetingWorkflow],
activities=[compose_greeting],
):
print("What's your name?")
name = input()
# While the worker is running, use the client to run the workflow and
# print out its result.
result = await client.execute_workflow(
GreetingWorkflow.run,
name,
id="hello-activity-workflow-id",
task_queue="hello-activity-task-queue",
)
print(f"Result: {result}")
if __name__ == "__main__":
asyncio.run(main())
در مورد من، خروجی به این صورت بود:
این کد نیز کمی متفاوت است WorkflowExecutionStarted
رویداد در تاریخچه رویداد گردش کار در رابط کاربری وب. میتوانیم ببینیم که ورودی «Matt» بهجای «World» به فعالیت منتقل شده است.
روی «Workflow Execution Started» در گردش کار خود کلیک کنید تا تفاوت را ببینید!
موفقیت!
در اینجا شما آن را دارید، گردش کارها و کارگران به برخی مناطق تقسیم می شوند تا بتوانید در برنامه خود روی آنها کار کنید. امیدواریم این به شما کمک کند تا با برنامه پایتون خود پیش بروید و درک بهتری از اینکه چگونه می توانید با Temporal پیش بروید.
آیا می خواهید آن را یک قدم جلوتر بردارید؟ پست مدافع توسعه دهندگان ما را در مورد نحوه ساخت یک برنامه پوکر با استفاده از برخی از اصول اولیه Temporal بررسی کنید. برای آموزش کامل، ساختن یک برنامه زمانی از ابتدا در پایتون را بررسی کنید.