ساختن یک عامل ارز A2A با لانگگراف

این راهنما توضیح مفصلی در مورد نحوه ساخت یک عامل سازگار با A2A با استفاده از Langgraph و مدل Google Gemini ارائه می دهد. ما از طریق نمونه عامل ارز از A2A Python SDK قدم می زنیم و هر مؤلفه ، جریان داده ها و چگونگی پروتکل A2A تعامل عامل را تسهیل می کند.
فهرست مطالب
- نمای کلی
- معماری
- راه اندازی و نصب
-
مؤلفه ها توضیح داده شده اند
- جریان اجرا
- تعامل مشتری
-
ویژگی های پیشرفته
- مراحل بعدی
نمای کلی
عامل ارز یک عامل تخصصی است که به تبدیل ارز کمک می کند. از آن استفاده می کند:
- پروتکل A2a: برای ارتباطات استاندارد
- لنگرافی: برای ارکستر استدلال عامل
- مدل جمینی گوگل: به عنوان موتور استدلال
- API خارجی: برای واکشی نرخ ارز در زمان واقعی
این مثال چندین قابلیت مهم A2A را نشان می دهد:
- پاسخ های جریان برای به روزرسانی در زمان واقعی
- مکالمات چند چرخش برای شفاف سازی
- مدیریت دولت وظیفه
- ادغام با LLM
معماری
در اینجا معماری سطح بالایی از ارز:
graph TD
Client[Client] <-->|A2A Protocol| Server[A2A Server]
Server --> RequestHandler[DefaultA2ARequestHandler]
RequestHandler --> Executor[CurrencyAgentExecutor]
Executor --> Agent[CurrencyAgent]
Agent --> |Uses| Gemini[Gemini LLM]
Agent --> |Calls| ExchangeAPI[Exchange Rate API]
subgraph "Task Management"
Executor --> TaskStore[InMemoryTaskStore]
end
راه اندازی و نصب
برای اجرای این مثال ، شما نیاز دارید:
- پایتون 3.10 یا بالاتر
- یک کلید API جمینی
ابتدا مخزن A2A را کلون کرده و وابستگی ها را نصب کنید:
git clone https://github.com/google/A2A.git -b main --depth 1
cd A2A/a2a-python-sdk
python -m venv .venv
source .venv/bin/activate # On Windows: .venv\Scripts\activate
pip install -e .[dev]
سپس ، ایجاد کنید .env
پرونده در examples/langgraph/
دایرکتوری:
echo "GOOGLE_API_KEY=YOUR_API_KEY_HERE" > a2a-python-sdk/examples/langgraph/.env
مؤلفه ها توضیح داده شده اند
بیایید با جزئیات هر یک از مؤلفه های ارز را بررسی کنیم:
کارت و مهارت های عامل
کارت عامل هویت ، قابلیت ها و مهارت های عامل را تعریف می کند. این در ایجاد شده است __main__.py
:
def get_agent_card(host: str, port: int):
"""Returns the Agent Card for the Currency Agent."""
capabilities = AgentCapabilities(streaming=True, pushNotifications=True)
skill = AgentSkill(
id='convert_currency',
name='Currency Exchange Rates Tool',
description='Helps with exchange values between various currencies',
tags=['currency conversion', 'currency exchange'],
examples=['What is exchange rate between USD and GBP?'],
)
return AgentCard(
name='Currency Agent',
description='Helps with exchange rates for currencies',
url=f'http://{host}:{port}/',
version='1.0.0',
defaultInputModes=CurrencyAgent.SUPPORTED_CONTENT_TYPES,
defaultOutputModes=CurrencyAgent.SUPPORTED_CONTENT_TYPES,
capabilities=capabilities,
skills=[skill],
authentication=AgentAuthentication(schemes=['public']),
)
نکات کلیدی:
- نماینده یک مهارت واحد دارد:
convert_currency
- از جریان پشتیبانی می کند (
capabilities.streaming=True
) - انواع محتوای متن را می پذیرد و برمی گرداند
- از احراز هویت عمومی استفاده می کند (بدون AUTH لازم نیست)
ارز
در CurrencyAgent
در کلاس agent.py
شامل منطق اصلی عامل است:
classDiagram
class CurrencyAgent {
+SYSTEM_INSTRUCTION: str
+RESPONSE_FORMAT_INSTRUCTION: str
+SUPPORTED_CONTENT_TYPES: list
-model: ChatGoogleGenerativeAI
-tools: list
-graph: AgentGraph
+invoke(query: str, sessionId: str): dict
+stream(query: str, sessionId: str): AsyncGenerator
-get_agent_response(config: dict): dict
}
class get_exchange_rate {
<>
}
class ResponseFormat {
<>
}
CurrencyAgent ..> get_exchange_rate : uses
CurrencyAgent ..> ResponseFormat : returns
عملکرد اصلی شامل موارد زیر است:
-
تعریف ابزار:
get_exchange_rate
ابزار نرخ ارز در زمان واقعی را از یک API خارجی واگذار می کند:
@tool
def get_exchange_rate(
currency_from: str = 'USD',
currency_to: str = 'EUR',
currency_date: str = 'latest',
):
"""Use this to get current exchange rate."""
try:
response = httpx.get(
f'https://api.frankfurter.app/{currency_date}',
params={'from': currency_from, 'to': currency_to},
)
response.raise_for_status()
data = response.json()
# ... error handling code
return data
except httpx.HTTPError as e:
return {'error': f'API request failed: {e}'}
-
تعریف عامل: عامل با استفاده از Langgraph ایجاد شده است
create_react_agent
:
def __init__(self):
self.model = ChatGoogleGenerativeAI(model='gemini-2.0-flash')
self.tools = [get_exchange_rate]
self.graph = create_react_agent(
self.model,
tools=self.tools,
checkpointer=memory,
prompt=self.SYSTEM_INSTRUCTION,
response_format=(self.RESPONSE_FORMAT_INSTRUCTION, ResponseFormat),
)
- قالب پاسخ: یک قالب ساختاری برای پاسخ های عامل:
class ResponseFormat(BaseModel):
"""Respond to the user in this format."""
status: Literal['input_required', 'completed', 'error'] = 'input_required'
message: str
- روشهای فراخوانی: روش های دعوت مستقیم و جریان:
def invoke(self, query: str, sessionId: str) -> dict[str, Any]:
config: RunnableConfig = {'configurable': {'thread_id': sessionId}}
self.graph.invoke({'messages': [('user', query)]}, config)
return self.get_agent_response(config)
async def stream(
self, query: str, sessionId: str
) -> AsyncIterable[dict[str, Any]]:
inputs: dict[str, Any] = {'messages': [('user', query)]}
config: RunnableConfig = {'configurable': {'thread_id': sessionId}}
for item in self.graph.stream(inputs, config, stream_mode='values'):
message = item['messages'][-1]
if isinstance(message, AIMessage) and message.tool_calls:
yield {
'is_task_complete': False,
'require_user_input': False,
'content': 'Looking up the exchange rates...',
}
elif isinstance(message, ToolMessage):
yield {
'is_task_complete': False,
'require_user_input': False,
'content': 'Processing the exchange rates..',
}
yield self.get_agent_response(config)
مجری
در CurrencyAgentExecutor
در agent_executor.py
عامل Langgraph را با پروتکل A2A سازگار می کند:
class CurrencyAgentExecutor(BaseAgentExecutor):
"""Currency AgentExecutor Example."""
def __init__(self):
self.agent = CurrencyAgent()
@override
async def on_message_send(
self,
request: SendMessageRequest,
event_queue: EventQueue,
task: Task | None,
) -> None:
"""Handler for 'message/send' requests."""
params: MessageSendParams = request.params
query = self._get_user_query(params)
if not task:
task = create_task_obj(params)
# invoke the underlying agent
agent_response: dict[str, Any] = self.agent.invoke(
query, task.contextId
)
update_task_with_agent_response(task, agent_response)
event_queue.enqueue_event(task)
@override
async def on_message_stream(
self,
request: SendStreamingMessageRequest,
event_queue: EventQueue,
task: Task | None,
) -> None:
"""Handler for 'message/stream' requests."""
params: MessageSendParams = request.params
query = self._get_user_query(params)
if not task:
task = create_task_obj(params)
# emit the initial task so it is persisted to TaskStore
event_queue.enqueue_event(task)
# kickoff the streaming agent and process responses
async for item in self.agent.stream(query, task.contextId):
task_artifact_update_event, task_status_event = (
process_streaming_agent_response(task, item)
)
if task_artifact_update_event:
event_queue.enqueue_event(task_artifact_update_event)
event_queue.enqueue_event(task_status_event)
def _get_user_query(self, task_send_params: MessageSendParams) -> str:
"""Helper to get user query from task send params."""
part = task_send_params.message.parts[0].root
if not isinstance(part, TextPart):
raise ValueError('Only text parts are supported')
return part.text
این مجری دو روش اصلی را پیاده سازی می کند:
-
on_message_send
: درخواست های همزمان را کنترل می کند و یک پاسخ کامل را برمی گرداند -
on_message_stream
: درخواست های جریان را کنترل کرده و وقایع را هنگام وقوع منتشر می کند
یاران
در helpers.py
پرونده شامل توابع ابزار برای مدیریت وظایف و تبدیل پاسخ های عامل به رویدادهای پروتکل A2A است:
def update_task_with_agent_response(
task: Task, agent_response: dict[str, Any]
) -> None:
"""Updates the provided task with the agent response."""
task.status.timestamp = datetime.now().isoformat()
parts: list[Part] = [Part(TextPart(text=agent_response['content']))]
if agent_response['require_user_input']:
task.status.state = TaskState.input_required
message = Message(
messageId=str(uuid4()),
role=Role.agent,
parts=parts,
)
task.status.message = message
if not task.history:
task.history = []
task.history.append(message)
else:
task.status.state = TaskState.completed
task.status.message = None
if not task.artifacts:
task.artifacts = []
artifact: Artifact = Artifact(parts=parts, artifactId=str(uuid4()))
task.artifacts.append(artifact)
def process_streaming_agent_response(
task: Task,
agent_response: dict[str, Any],
) -> tuple[TaskArtifactUpdateEvent | None, TaskStatusUpdateEvent]:
"""Processes the streaming agent responses and returns TaskArtifactUpdateEvent and TaskStatusUpdateEvent."""
is_task_complete = agent_response['is_task_complete']
require_user_input = agent_response['require_user_input']
parts: list[Part] = [Part(TextPart(text=agent_response['content']))]
end_stream = False
artifact = None
message = None
# responses from this agent can be working/completed/input-required
if not is_task_complete and not require_user_input:
task_state = TaskState.working
message = Message(role=Role.agent, parts=parts, messageId=str(uuid4()))
elif require_user_input:
task_state = TaskState.input_required
message = Message(role=Role.agent, parts=parts, messageId=str(uuid4()))
end_stream = True
else:
task_state = TaskState.completed
artifact = Artifact(parts=parts, artifactId=str(uuid4()))
end_stream = True
task_artifact_update_event = None
if artifact:
task_artifact_update_event = TaskArtifactUpdateEvent(
taskId=task.id,
contextId=task.contextId,
artifact=artifact,
append=False,
lastChunk=True,
)
task_status_event = TaskStatusUpdateEvent(
taskId=task.id,
contextId=task.contextId,
status=TaskStatus(
state=task_state,
message=message,
timestamp=datetime.now().isoformat(),
),
final=end_stream,
)
return task_artifact_update_event, task_status_event
توابع کلیدی قالب پاسخ داخلی عامل را به مدل رویداد پروتکل A2A تبدیل می کند.
جریان اجرا
بیایید کل جریان اجرای را در هنگام تعامل مشتری با نماینده ارز تجسم کنیم:
sequenceDiagram
participant Client
participant A2AServer
participant RequestHandler
participant Executor as CurrencyAgentExecutor
participant Agent as CurrencyAgent
participant LLM as Gemini LLM
participant API as Exchange Rate API
Client->>A2AServer: Request Agent Card
A2AServer->>Client: Return Agent Card
%% Single turn conversation
Client->>A2AServer: message/send
A2AServer->>RequestHandler: Route request
RequestHandler->>Executor: on_message_send
Executor->>Agent: invoke(query, contextId)
Agent->>LLM: Process with Gemini
LLM-->>Agent: Need exchange rate info
Agent->>API: get_exchange_rate
API-->>Agent: Return exchange data
Agent->>LLM: Process with data
LLM-->>Agent: Generate response
Agent-->>Executor: Return response dict
Executor-->>RequestHandler: Update Task & enqueue event
RequestHandler-->>A2AServer: Return Task with results
A2AServer-->>Client: Send response
%% Streaming example
Client->>A2AServer: message/sendStream
A2AServer->>RequestHandler: Route streaming request
RequestHandler->>Executor: on_message_stream
Executor->>Agent: stream(query, contextId)
Agent->>LLM: Start processing
LLM-->>Agent: Need exchange rate
Agent-->>Executor: Yield status update
Executor-->>RequestHandler: Enqueue TaskStatusUpdateEvent
RequestHandler-->>A2AServer: SSE event
A2AServer-->>Client: Stream status update
Agent->>API: get_exchange_rate
API-->>Agent: Return exchange data
Agent-->>Executor: Yield process update
Executor-->>RequestHandler: Enqueue TaskStatusUpdateEvent
RequestHandler-->>A2AServer: SSE event
A2AServer-->>Client: Stream process update
Agent->>LLM: Process with data
LLM-->>Agent: Generate final response
Agent-->>Executor: Yield final response
Executor-->>RequestHandler: Enqueue TaskArtifactUpdateEvent & TaskStatusUpdateEvent
RequestHandler-->>A2AServer: Final SSE events
A2AServer-->>Client: Stream final result
تعامل مشتری
در test_client.py
اسکریپت نحوه تعامل با عامل را از طریق پروتکل A2A نشان می دهد:
async def run_single_turn_test(client: A2AClient) -> None:
"""Runs a single-turn non-streaming test."""
send_payload = create_send_message_payload(
text='how much is 100 USD in CAD?'
)
# Send Message
send_response: SendMessageResponse = await client.send_message(
payload=send_payload
)
print_json_response(send_response, 'Single Turn Request Response')
# Query the task if needed
if isinstance(send_response.root, SendMessageSuccessResponse) and \
isinstance(send_response.root.result, Task):
task_id: str = send_response.root.result.id
task_id_payload = {'id': task_id}
get_response: GetTaskResponse = await client.get_task(
payload=task_id_payload
)
print_json_response(get_response, 'Query Task Response')
async def run_streaming_test(client: A2AClient) -> None:
"""Runs a single-turn streaming test."""
send_payload = create_send_message_payload(
text='how much is 50 EUR in JPY?'
)
print('--- Single Turn Streaming Request ---')
stream_response = client.send_message_streaming(payload=send_payload)
async for chunk in stream_response:
print_json_response(chunk, 'Streaming Chunk')
async def run_multi_turn_test(client: A2AClient) -> None:
"""Runs a multi-turn non-streaming test."""
print('--- Multi-Turn Request ---')
# --- First Turn ---
first_turn_payload = create_send_message_payload(
text='how much is 100 USD?'
)
first_turn_response: SendMessageResponse = await client.send_message(
payload=first_turn_payload
)
print_json_response(first_turn_response, 'Multi-Turn: First Turn Response')
context_id: str | None = None
if isinstance(first_turn_response.root, SendMessageSuccessResponse) and \
isinstance(first_turn_response.root.result, Task):
task: Task = first_turn_response.root.result
context_id = task.contextId # Capture context ID
# --- Second Turn (if input required) ---
if task.status.state == TaskState.input_required and context_id:
print('--- Multi-Turn: Second Turn (Input Required) ---')
second_turn_payload = create_send_message_payload(
'in GBP', task.id, context_id
)
second_turn_response = await client.send_message(
payload=second_turn_payload
)
print_json_response(
second_turn_response, 'Multi-Turn: Second Turn Response'
)
ویژگی های پیشرفته
جریان
نماینده ارز از پاسخ های جریان پشتیبانی می کند و به آن اجازه می دهد تا به روزرسانی های زمان واقعی را ارائه دهد زیرا این درخواست را پردازش می کند:
- وقتی نماینده شروع به فکر کردن کرد ، به روزرسانی “در حال انجام” را ارسال می کند
- وقتی API نرخ ارز را فراخوانی می کند ، به روزرسانی می شود
- هنگام پردازش داده ها ، به روزرسانی دیگری را پخش می کند
- سرانجام ، نتیجه کامل را ارسال می کند
این یک تجربه کاربری بهتری برای عملیاتی که ممکن است زمان لازم باشد ، فراهم می کند.
مکالمات چند چرخش
نماینده می تواند در صورت نیاز به اطلاعات بیشتر ، در مکالمات چند چرخش شرکت کند:
- اگر کاربر می پرسد “100 دلار چقدر است؟” بدون مشخص کردن ارز هدف
- نماینده حالت کار را بر روی آن قرار می دهد
TaskState.input_required
- مشتری سپس می تواند یک پیام پیگیری را با همان ارسال کند
contextId
- نماینده زمینه بین چرخش ها را حفظ می کند
مکالمه مثال:
User: How much is 100 USD?
Agent: To which currency would you like to convert 100 USD?
User: in GBP
Agent: 100 USD is approximately 78.45 GBP according to the current exchange rate.
مراحل بعدی
اکنون که می فهمید که عامل ارز چگونه کار می کند ، می توانید:
- سؤالات مختلف را امتحان کنید: با جفت های مختلف ارز تست کنید
-
عامل را اصلاح کنید: پشتیبانی از ویژگی های بیشتر
- نرخ ارز تاریخی
- تجزیه و تحلیل روند ارزی
- پشتیبانی از ارزهای بیشتر
-
معماری را گسترش دهید:
- ماندگار
TaskStore
برای جلسات طولانی مدت - مکانیسم های احراز هویت را اضافه کنید
- نماینده را به یک محیط تولید مستقر کنید
- ماندگار
برای اطلاعات بیشتر در مورد ویژگی های پروتکل A2A:
آموزش A2a Langgraph