برنامه نویسی

ساختن یک عامل ارز A2A با لانگگراف

این راهنما توضیح مفصلی در مورد نحوه ساخت یک عامل سازگار با A2A با استفاده از Langgraph و مدل Google Gemini ارائه می دهد. ما از طریق نمونه عامل ارز از A2A Python SDK قدم می زنیم و هر مؤلفه ، جریان داده ها و چگونگی پروتکل A2A تعامل عامل را تسهیل می کند.

فهرست مطالب

  1. نمای کلی
  2. معماری
  3. راه اندازی و نصب
  4. مؤلفه ها توضیح داده شده اند

  5. جریان اجرا
  6. تعامل مشتری
  7. ویژگی های پیشرفته

  8. مراحل بعدی

نمای کلی

عامل ارز یک عامل تخصصی است که به تبدیل ارز کمک می کند. از آن استفاده می کند:

  • پروتکل A2a: برای ارتباطات استاندارد
  • لنگرافی: برای ارکستر استدلال عامل
  • مدل جمینی گوگل: به عنوان موتور استدلال
  • API خارجی: برای واکشی نرخ ارز در زمان واقعی

این مثال چندین قابلیت مهم A2A را نشان می دهد:

  • پاسخ های جریان برای به روزرسانی در زمان واقعی
  • مکالمات چند چرخش برای شفاف سازی
  • مدیریت دولت وظیفه
  • ادغام با LLM

معماری

در اینجا معماری سطح بالایی از ارز:

شرح تصویر

graph TD
    Client[Client] <-->|A2A Protocol| Server[A2A Server]
    Server --> RequestHandler[DefaultA2ARequestHandler]
    RequestHandler --> Executor[CurrencyAgentExecutor]
    Executor --> Agent[CurrencyAgent]
    Agent --> |Uses| Gemini[Gemini LLM]
    Agent --> |Calls| ExchangeAPI[Exchange Rate API]

    subgraph "Task Management"
        Executor --> TaskStore[InMemoryTaskStore]
    end
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

راه اندازی و نصب

برای اجرای این مثال ، شما نیاز دارید:

  1. پایتون 3.10 یا بالاتر
  2. یک کلید API جمینی

ابتدا مخزن A2A را کلون کرده و وابستگی ها را نصب کنید:

git clone https://github.com/google/A2A.git -b main --depth 1
cd A2A/a2a-python-sdk
python -m venv .venv
source .venv/bin/activate  # On Windows: .venv\Scripts\activate
pip install -e .[dev]
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

سپس ، ایجاد کنید .env پرونده در examples/langgraph/ دایرکتوری:

echo "GOOGLE_API_KEY=YOUR_API_KEY_HERE" > a2a-python-sdk/examples/langgraph/.env
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

مؤلفه ها توضیح داده شده اند

بیایید با جزئیات هر یک از مؤلفه های ارز را بررسی کنیم:

کارت و مهارت های عامل

کارت عامل هویت ، قابلیت ها و مهارت های عامل را تعریف می کند. این در ایجاد شده است __main__.py:

def get_agent_card(host: str, port: int):
    """Returns the Agent Card for the Currency Agent."""
    capabilities = AgentCapabilities(streaming=True, pushNotifications=True)
    skill = AgentSkill(
        id='convert_currency',
        name='Currency Exchange Rates Tool',
        description='Helps with exchange values between various currencies',
        tags=['currency conversion', 'currency exchange'],
        examples=['What is exchange rate between USD and GBP?'],
    )
    return AgentCard(
        name='Currency Agent',
        description='Helps with exchange rates for currencies',
        url=f'http://{host}:{port}/',
        version='1.0.0',
        defaultInputModes=CurrencyAgent.SUPPORTED_CONTENT_TYPES,
        defaultOutputModes=CurrencyAgent.SUPPORTED_CONTENT_TYPES,
        capabilities=capabilities,
        skills=[skill],
        authentication=AgentAuthentication(schemes=['public']),
    )
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

نکات کلیدی:

  • نماینده یک مهارت واحد دارد: convert_currency
  • از جریان پشتیبانی می کند (capabilities.streaming=True)
  • انواع محتوای متن را می پذیرد و برمی گرداند
  • از احراز هویت عمومی استفاده می کند (بدون AUTH لازم نیست)

ارز

در CurrencyAgent در کلاس agent.py شامل منطق اصلی عامل است:

شرح تصویر

classDiagram
    class CurrencyAgent {
        +SYSTEM_INSTRUCTION: str
        +RESPONSE_FORMAT_INSTRUCTION: str
        +SUPPORTED_CONTENT_TYPES: list
        -model: ChatGoogleGenerativeAI
        -tools: list
        -graph: AgentGraph
        +invoke(query: str, sessionId: str): dict
        +stream(query: str, sessionId: str): AsyncGenerator
        -get_agent_response(config: dict): dict
    }

    class get_exchange_rate {
        <>
    }

    class ResponseFormat {
        <>
    }

    CurrencyAgent ..> get_exchange_rate : uses
    CurrencyAgent ..> ResponseFormat : returns

حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

عملکرد اصلی شامل موارد زیر است:

  1. تعریف ابزار: get_exchange_rate ابزار نرخ ارز در زمان واقعی را از یک API خارجی واگذار می کند:
@tool
def get_exchange_rate(
    currency_from: str = 'USD',
    currency_to: str = 'EUR',
    currency_date: str = 'latest',
):
    """Use this to get current exchange rate."""
    try:
        response = httpx.get(
            f'https://api.frankfurter.app/{currency_date}',
            params={'from': currency_from, 'to': currency_to},
        )
        response.raise_for_status()

        data = response.json()
        # ... error handling code
        return data
    except httpx.HTTPError as e:
        return {'error': f'API request failed: {e}'}
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

  1. تعریف عامل: عامل با استفاده از Langgraph ایجاد شده است create_react_agent:
def __init__(self):
    self.model = ChatGoogleGenerativeAI(model='gemini-2.0-flash')
    self.tools = [get_exchange_rate]

    self.graph = create_react_agent(
        self.model,
        tools=self.tools,
        checkpointer=memory,
        prompt=self.SYSTEM_INSTRUCTION,
        response_format=(self.RESPONSE_FORMAT_INSTRUCTION, ResponseFormat),
    )
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

  1. قالب پاسخ: یک قالب ساختاری برای پاسخ های عامل:
class ResponseFormat(BaseModel):
    """Respond to the user in this format."""
    status: Literal['input_required', 'completed', 'error'] = 'input_required'
    message: str
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

  1. روشهای فراخوانی: روش های دعوت مستقیم و جریان:
def invoke(self, query: str, sessionId: str) -> dict[str, Any]:
    config: RunnableConfig = {'configurable': {'thread_id': sessionId}}
    self.graph.invoke({'messages': [('user', query)]}, config)
    return self.get_agent_response(config)

async def stream(
    self, query: str, sessionId: str
) -> AsyncIterable[dict[str, Any]]:
    inputs: dict[str, Any] = {'messages': [('user', query)]}
    config: RunnableConfig = {'configurable': {'thread_id': sessionId}}

    for item in self.graph.stream(inputs, config, stream_mode='values'):
        message = item['messages'][-1]
        if isinstance(message, AIMessage) and message.tool_calls:
            yield {
                'is_task_complete': False,
                'require_user_input': False,
                'content': 'Looking up the exchange rates...',
            }
        elif isinstance(message, ToolMessage):
            yield {
                'is_task_complete': False,
                'require_user_input': False,
                'content': 'Processing the exchange rates..',
            }

    yield self.get_agent_response(config)
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

مجری

در CurrencyAgentExecutor در agent_executor.py عامل Langgraph را با پروتکل A2A سازگار می کند:

class CurrencyAgentExecutor(BaseAgentExecutor):
    """Currency AgentExecutor Example."""

    def __init__(self):
        self.agent = CurrencyAgent()

    @override
    async def on_message_send(
        self,
        request: SendMessageRequest,
        event_queue: EventQueue,
        task: Task | None,
    ) -> None:
        """Handler for 'message/send' requests."""
        params: MessageSendParams = request.params
        query = self._get_user_query(params)

        if not task:
            task = create_task_obj(params)

        # invoke the underlying agent
        agent_response: dict[str, Any] = self.agent.invoke(
            query, task.contextId
        )
        update_task_with_agent_response(task, agent_response)
        event_queue.enqueue_event(task)

    @override
    async def on_message_stream(
        self,
        request: SendStreamingMessageRequest,
        event_queue: EventQueue,
        task: Task | None,
    ) -> None:
        """Handler for 'message/stream' requests."""
        params: MessageSendParams = request.params
        query = self._get_user_query(params)

        if not task:
            task = create_task_obj(params)
            # emit the initial task so it is persisted to TaskStore
            event_queue.enqueue_event(task)

        # kickoff the streaming agent and process responses
        async for item in self.agent.stream(query, task.contextId):
            task_artifact_update_event, task_status_event = (
                process_streaming_agent_response(task, item)
            )

            if task_artifact_update_event:
                event_queue.enqueue_event(task_artifact_update_event)

            event_queue.enqueue_event(task_status_event)

    def _get_user_query(self, task_send_params: MessageSendParams) -> str:
        """Helper to get user query from task send params."""
        part = task_send_params.message.parts[0].root
        if not isinstance(part, TextPart):
            raise ValueError('Only text parts are supported')
        return part.text
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

این مجری دو روش اصلی را پیاده سازی می کند:

  1. on_message_send: درخواست های همزمان را کنترل می کند و یک پاسخ کامل را برمی گرداند
  2. on_message_stream: درخواست های جریان را کنترل کرده و وقایع را هنگام وقوع منتشر می کند

یاران

در helpers.py پرونده شامل توابع ابزار برای مدیریت وظایف و تبدیل پاسخ های عامل به رویدادهای پروتکل A2A است:

def update_task_with_agent_response(
    task: Task, agent_response: dict[str, Any]
) -> None:
    """Updates the provided task with the agent response."""
    task.status.timestamp = datetime.now().isoformat()
    parts: list[Part] = [Part(TextPart(text=agent_response['content']))]
    if agent_response['require_user_input']:
        task.status.state = TaskState.input_required
        message = Message(
            messageId=str(uuid4()),
            role=Role.agent,
            parts=parts,
        )
        task.status.message = message
        if not task.history:
            task.history = []

        task.history.append(message)
    else:
        task.status.state = TaskState.completed
        task.status.message = None
        if not task.artifacts:
            task.artifacts = []

        artifact: Artifact = Artifact(parts=parts, artifactId=str(uuid4()))
        task.artifacts.append(artifact)

def process_streaming_agent_response(
    task: Task,
    agent_response: dict[str, Any],
) -> tuple[TaskArtifactUpdateEvent | None, TaskStatusUpdateEvent]:
    """Processes the streaming agent responses and returns TaskArtifactUpdateEvent and TaskStatusUpdateEvent."""
    is_task_complete = agent_response['is_task_complete']
    require_user_input = agent_response['require_user_input']
    parts: list[Part] = [Part(TextPart(text=agent_response['content']))]

    end_stream = False
    artifact = None
    message = None

    # responses from this agent can be working/completed/input-required
    if not is_task_complete and not require_user_input:
        task_state = TaskState.working
        message = Message(role=Role.agent, parts=parts, messageId=str(uuid4()))
    elif require_user_input:
        task_state = TaskState.input_required
        message = Message(role=Role.agent, parts=parts, messageId=str(uuid4()))
        end_stream = True
    else:
        task_state = TaskState.completed
        artifact = Artifact(parts=parts, artifactId=str(uuid4()))
        end_stream = True

    task_artifact_update_event = None

    if artifact:
        task_artifact_update_event = TaskArtifactUpdateEvent(
            taskId=task.id,
            contextId=task.contextId,
            artifact=artifact,
            append=False,
            lastChunk=True,
        )

    task_status_event = TaskStatusUpdateEvent(
        taskId=task.id,
        contextId=task.contextId,
        status=TaskStatus(
            state=task_state,
            message=message,
            timestamp=datetime.now().isoformat(),
        ),
        final=end_stream,
    )

    return task_artifact_update_event, task_status_event
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

توابع کلیدی قالب پاسخ داخلی عامل را به مدل رویداد پروتکل A2A تبدیل می کند.

جریان اجرا

بیایید کل جریان اجرای را در هنگام تعامل مشتری با نماینده ارز تجسم کنیم:

شرح تصویر

sequenceDiagram
    participant Client
    participant A2AServer
    participant RequestHandler
    participant Executor as CurrencyAgentExecutor
    participant Agent as CurrencyAgent
    participant LLM as Gemini LLM
    participant API as Exchange Rate API

    Client->>A2AServer: Request Agent Card
    A2AServer->>Client: Return Agent Card

    %% Single turn conversation
    Client->>A2AServer: message/send
    A2AServer->>RequestHandler: Route request
    RequestHandler->>Executor: on_message_send
    Executor->>Agent: invoke(query, contextId)
    Agent->>LLM: Process with Gemini
    LLM-->>Agent: Need exchange rate info
    Agent->>API: get_exchange_rate
    API-->>Agent: Return exchange data
    Agent->>LLM: Process with data
    LLM-->>Agent: Generate response
    Agent-->>Executor: Return response dict
    Executor-->>RequestHandler: Update Task & enqueue event
    RequestHandler-->>A2AServer: Return Task with results
    A2AServer-->>Client: Send response

    %% Streaming example
    Client->>A2AServer: message/sendStream
    A2AServer->>RequestHandler: Route streaming request
    RequestHandler->>Executor: on_message_stream
    Executor->>Agent: stream(query, contextId)
    Agent->>LLM: Start processing

    LLM-->>Agent: Need exchange rate
    Agent-->>Executor: Yield status update
    Executor-->>RequestHandler: Enqueue TaskStatusUpdateEvent
    RequestHandler-->>A2AServer: SSE event
    A2AServer-->>Client: Stream status update

    Agent->>API: get_exchange_rate
    API-->>Agent: Return exchange data
    Agent-->>Executor: Yield process update
    Executor-->>RequestHandler: Enqueue TaskStatusUpdateEvent
    RequestHandler-->>A2AServer: SSE event
    A2AServer-->>Client: Stream process update

    Agent->>LLM: Process with data
    LLM-->>Agent: Generate final response
    Agent-->>Executor: Yield final response
    Executor-->>RequestHandler: Enqueue TaskArtifactUpdateEvent & TaskStatusUpdateEvent
    RequestHandler-->>A2AServer: Final SSE events
    A2AServer-->>Client: Stream final result
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

تعامل مشتری

در test_client.py اسکریپت نحوه تعامل با عامل را از طریق پروتکل A2A نشان می دهد:

async def run_single_turn_test(client: A2AClient) -> None:
    """Runs a single-turn non-streaming test."""
    send_payload = create_send_message_payload(
        text='how much is 100 USD in CAD?'
    )
    # Send Message
    send_response: SendMessageResponse = await client.send_message(
        payload=send_payload
    )
    print_json_response(send_response, 'Single Turn Request Response')

    # Query the task if needed
    if isinstance(send_response.root, SendMessageSuccessResponse) and \
       isinstance(send_response.root.result, Task):
        task_id: str = send_response.root.result.id
        task_id_payload = {'id': task_id}
        get_response: GetTaskResponse = await client.get_task(
            payload=task_id_payload
        )
        print_json_response(get_response, 'Query Task Response')

async def run_streaming_test(client: A2AClient) -> None:
    """Runs a single-turn streaming test."""
    send_payload = create_send_message_payload(
        text='how much is 50 EUR in JPY?'
    )
    print('--- Single Turn Streaming Request ---')
    stream_response = client.send_message_streaming(payload=send_payload)
    async for chunk in stream_response:
        print_json_response(chunk, 'Streaming Chunk')

async def run_multi_turn_test(client: A2AClient) -> None:
    """Runs a multi-turn non-streaming test."""
    print('--- Multi-Turn Request ---')
    # --- First Turn ---
    first_turn_payload = create_send_message_payload(
        text='how much is 100 USD?'
    )
    first_turn_response: SendMessageResponse = await client.send_message(
        payload=first_turn_payload
    )
    print_json_response(first_turn_response, 'Multi-Turn: First Turn Response')

    context_id: str | None = None
    if isinstance(first_turn_response.root, SendMessageSuccessResponse) and \
       isinstance(first_turn_response.root.result, Task):
        task: Task = first_turn_response.root.result
        context_id = task.contextId  # Capture context ID

        # --- Second Turn (if input required) ---
        if task.status.state == TaskState.input_required and context_id:
            print('--- Multi-Turn: Second Turn (Input Required) ---')
            second_turn_payload = create_send_message_payload(
                'in GBP', task.id, context_id
            )
            second_turn_response = await client.send_message(
                payload=second_turn_payload
            )
            print_json_response(
                second_turn_response, 'Multi-Turn: Second Turn Response'
            )
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

ویژگی های پیشرفته

جریان

نماینده ارز از پاسخ های جریان پشتیبانی می کند و به آن اجازه می دهد تا به روزرسانی های زمان واقعی را ارائه دهد زیرا این درخواست را پردازش می کند:

  1. وقتی نماینده شروع به فکر کردن کرد ، به روزرسانی “در حال انجام” را ارسال می کند
  2. وقتی API نرخ ارز را فراخوانی می کند ، به روزرسانی می شود
  3. هنگام پردازش داده ها ، به روزرسانی دیگری را پخش می کند
  4. سرانجام ، نتیجه کامل را ارسال می کند

این یک تجربه کاربری بهتری برای عملیاتی که ممکن است زمان لازم باشد ، فراهم می کند.

مکالمات چند چرخش

نماینده می تواند در صورت نیاز به اطلاعات بیشتر ، در مکالمات چند چرخش شرکت کند:

  1. اگر کاربر می پرسد “100 دلار چقدر است؟” بدون مشخص کردن ارز هدف
  2. نماینده حالت کار را بر روی آن قرار می دهد TaskState.input_required
  3. مشتری سپس می تواند یک پیام پیگیری را با همان ارسال کند contextId
  4. نماینده زمینه بین چرخش ها را حفظ می کند

مکالمه مثال:

User: How much is 100 USD?
Agent: To which currency would you like to convert 100 USD?
User: in GBP
Agent: 100 USD is approximately 78.45 GBP according to the current exchange rate.
حالت تمام صفحه را وارد کنید

از حالت تمام صفحه خارج شوید

مراحل بعدی

اکنون که می فهمید که عامل ارز چگونه کار می کند ، می توانید:

  1. سؤالات مختلف را امتحان کنید: با جفت های مختلف ارز تست کنید
  2. عامل را اصلاح کنید: پشتیبانی از ویژگی های بیشتر

    • نرخ ارز تاریخی
    • تجزیه و تحلیل روند ارزی
    • پشتیبانی از ارزهای بیشتر
  3. معماری را گسترش دهید:

    • ماندگار TaskStore برای جلسات طولانی مدت
    • مکانیسم های احراز هویت را اضافه کنید
    • نماینده را به یک محیط تولید مستقر کنید

برای اطلاعات بیشتر در مورد ویژگی های پروتکل A2A:

آموزش A2a Langgraph

نوشته های مشابه

دیدگاهتان را بنویسید

نشانی ایمیل شما منتشر نخواهد شد. بخش‌های موردنیاز علامت‌گذاری شده‌اند *

دکمه بازگشت به بالا