diff --git a/.github/workflows/agentex-tutorials-test.yml b/.github/workflows/agentex-tutorials-test.yml index eb6da4762..bff4a1a2f 100644 --- a/.github/workflows/agentex-tutorials-test.yml +++ b/.github/workflows/agentex-tutorials-test.yml @@ -20,20 +20,17 @@ jobs: id: get-tutorials run: | cd examples/tutorials - # Find all tutorials and exclude specific temporal ones + # Find all tutorials with a manifest.yaml all_tutorials=$(find . -name "manifest.yaml" -exec dirname {} \; | sort | sed 's|^\./||') - # Filter out the specified temporal tutorials that are being updated - filtered_tutorials=$(echo "$all_tutorials" | grep -v -E "(temporal)") + # Include all tutorials (temporal tutorials are now included) + filtered_tutorials="$all_tutorials" # Convert to JSON array tutorials=$(echo "$filtered_tutorials" | jq -R -s -c 'split("\n") | map(select(length > 0))') echo "tutorials=$tutorials" >> $GITHUB_OUTPUT echo "All tutorials found: $(echo "$all_tutorials" | wc -l)" - echo "Filtered tutorials: $(echo "$filtered_tutorials" | wc -l)" - echo "Excluded tutorials:" - echo "$all_tutorials" | grep -E "(10_temporal/050_|10_temporal/070_|10_temporal/080_)" || echo " (none matched exclusion pattern)" echo "Final tutorial list: $tutorials" test-tutorial: @@ -58,8 +55,20 @@ jobs: - name: Pull latest AgentEx image run: | echo "🐳 Pulling latest Scale AgentEx Docker image..." - docker pull ghcr.io/scaleapi/scale-agentex/agentex:latest - echo "✅ Successfully pulled AgentEx Docker image" + max_attempts=3 + attempt=1 + while [ $attempt -le $max_attempts ]; do + echo "Attempt $attempt of $max_attempts..." + if docker pull ghcr.io/scaleapi/scale-agentex/agentex:latest; then + echo "✅ Successfully pulled AgentEx Docker image" + exit 0 + fi + echo "❌ Pull failed, waiting before retry..." + sleep $((attempt * 10)) + attempt=$((attempt + 1)) + done + echo "❌ Failed to pull image after $max_attempts attempts" + exit 1 - name: Checkout scale-agentex repo uses: actions/checkout@v4 diff --git a/examples/tutorials/00_sync/020_streaming/tests/test_agent.py b/examples/tutorials/00_sync/020_streaming/tests/test_agent.py index 7a649f2d3..5bde7d6b2 100644 --- a/examples/tutorials/00_sync/020_streaming/tests/test_agent.py +++ b/examples/tutorials/00_sync/020_streaming/tests/test_agent.py @@ -151,4 +151,3 @@ def test_send_stream_message(self, client: Agentex, agent_name: str, agent_id: s if __name__ == "__main__": pytest.main([__file__, "-v"]) - diff --git a/examples/tutorials/10_async/00_base/000_hello_acp/tests/test_agent.py b/examples/tutorials/10_async/00_base/000_hello_acp/tests/test_agent.py index ba3444109..c57cec448 100644 --- a/examples/tutorials/10_async/00_base/000_hello_acp/tests/test_agent.py +++ b/examples/tutorials/10_async/00_base/000_hello_acp/tests/test_agent.py @@ -73,6 +73,8 @@ async def test_send_event_and_poll(self, client: AsyncAgentex, agent_id: str): assert task is not None # Poll for the initial task creation message + task_creation_message_found = False + async for message in poll_messages( client=client, task_id=task.id, @@ -82,10 +84,15 @@ async def test_send_event_and_poll(self, client: AsyncAgentex, agent_id: str): assert isinstance(message, TaskMessage) if message.content and message.content.type == "text" and message.content.author == "agent": assert "Hello! I've received your task" in message.content.content + task_creation_message_found = True break + assert task_creation_message_found, "Task creation message not found" + # Send an event and poll for response user_message = "Hello, this is a test message!" + agent_response_found = False + async for message in send_event_and_poll_yielding( client=client, agent_id=agent_id, @@ -97,9 +104,10 @@ async def test_send_event_and_poll(self, client: AsyncAgentex, agent_id: str): assert isinstance(message, TaskMessage) if message.content and message.content.type == "text" and message.content.author == "agent": assert "Hello! I've received your task" in message.content.content + agent_response_found = True break - + assert agent_response_found, "Agent response not found" class TestStreamingEvents: """Test streaming event sending.""" @@ -111,7 +119,7 @@ async def test_send_event_and_stream(self, client: AsyncAgentex, agent_id: str): assert task is not None task_creation_found = False - # Poll for the initial task creation message + async for message in poll_messages( client=client, task_id=task.id, @@ -124,7 +132,7 @@ async def test_send_event_and_stream(self, client: AsyncAgentex, agent_id: str): task_creation_found = True break - assert task_creation_found, "Task creation message not found in poll" + assert task_creation_found, "Task creation message not found" user_message = "Hello, this is a test message!" stream_timeout = 10 @@ -136,9 +144,8 @@ async def test_send_event_and_stream(self, client: AsyncAgentex, agent_id: str): user_echo_found = False agent_response_found = False - async def collect_stream_events() -> None: + async def stream_messages() -> None: nonlocal user_echo_found, agent_response_found - async for event in stream_agent_response( client=client, task_id=task.id, @@ -161,23 +168,19 @@ async def collect_stream_events() -> None: # Check for user message echo if content.get("content") == user_message: user_echo_found = True + elif event_type == "done": + break # Exit early if we've found all expected messages if user_echo_found and agent_response_found: break - # Start streaming task - stream_task = asyncio.create_task(collect_stream_events()) + stream_task = asyncio.create_task(stream_messages()) # Send the event event_content = TextContentParam(type="text", author="user", content=user_message) await client.agents.send_event(agent_id=agent_id, params={"task_id": task.id, "content": event_content}) - - # Wait for the stream to complete (with timeout) - try: - await asyncio.wait_for(stream_task, timeout=stream_timeout) - except asyncio.TimeoutError: - pytest.fail(f"Stream timed out after {stream_timeout}s waiting for expected messages") + await stream_task # Verify all expected messages were received (fail if stream ended without finding them) assert user_echo_found, "User message echo not found in stream" diff --git a/examples/tutorials/10_async/00_base/010_multiturn/tests/test_agent.py b/examples/tutorials/10_async/00_base/010_multiturn/tests/test_agent.py index 4da1745c6..33d831858 100644 --- a/examples/tutorials/10_async/00_base/010_multiturn/tests/test_agent.py +++ b/examples/tutorials/10_async/00_base/010_multiturn/tests/test_agent.py @@ -89,6 +89,10 @@ async def test_send_event_and_poll(self, client: AsyncAgentex, agent_id: str): user_message = "Hello! Here is my test message" messages = [] + + # Flags to track what we've received + user_message_found = False + agent_response_found = False async for message in send_event_and_poll_yielding( client=client, agent_id=agent_id, @@ -98,17 +102,28 @@ async def test_send_event_and_poll(self, client: AsyncAgentex, agent_id: str): sleep_interval=1.0, ): messages.append(message) - if len(messages) == 1: - assert message.content == TextContent( - author="user", - content=user_message, - type="text", - ) - else: - assert message.content is not None - assert message.content.author == "agent" + + # Validate messages as they arrive + if message.content and hasattr(message.content, "author"): + msg_text = getattr(message.content, "content", None) + if message.content.author == "user" and msg_text == user_message: + assert message.content == TextContent( + author="user", + content=user_message, + type="text", + ) + user_message_found = True + elif message.content.author == "agent": + assert user_message_found, "Agent response arrived before user message" + agent_response_found = True + + # Exit early if we've found all expected messages + if user_message_found and agent_response_found: break + assert user_message_found, "User message not found" + assert agent_response_found, "Agent response not found" + await asyncio.sleep(1) # wait for state to be updated states = await client.states.list(agent_id=agent_id, task_id=task.id) assert len(states) == 1 @@ -152,10 +167,8 @@ async def test_send_event_and_stream(self, client: AsyncAgentex, agent_id: str): # Flags to track what we've received user_message_found = False agent_response_found = False - - async def stream_messages(): + async def stream_messages() -> None: nonlocal user_message_found, agent_response_found - async for event in stream_agent_response( client=client, task_id=task.id, @@ -175,21 +188,17 @@ async def stream_messages(): # Agent response should come after user message assert user_message_found, "Agent response arrived before user message (incorrect order)" agent_response_found = True + elif event_type == "done": + break # Exit early if we've found both messages if user_message_found and agent_response_found: break stream_task = asyncio.create_task(stream_messages()) - event_content = TextContentParam(type="text", author="user", content=user_message) await client.agents.send_event(agent_id=agent_id, params={"task_id": task.id, "content": event_content}) - - # Wait for streaming to complete (with timeout) - try: - await asyncio.wait_for(stream_task, timeout=15) - except asyncio.TimeoutError: - pytest.fail("Stream timed out after 15s waiting for expected messages") + await stream_task # Validate we received events assert len(all_events) > 0, "No events received in streaming response" diff --git a/examples/tutorials/10_async/00_base/020_streaming/tests/test_agent.py b/examples/tutorials/10_async/00_base/020_streaming/tests/test_agent.py index d863199cd..c55525191 100644 --- a/examples/tutorials/10_async/00_base/020_streaming/tests/test_agent.py +++ b/examples/tutorials/10_async/00_base/020_streaming/tests/test_agent.py @@ -28,7 +28,6 @@ ) from agentex import AsyncAgentex -from agentex.types import TaskMessage, TextContent from agentex.types.agent_rpc_params import ParamsCreateTaskRequest from agentex.types.text_content_param import TextContentParam @@ -89,6 +88,10 @@ async def test_send_event_and_poll(self, client: AsyncAgentex, agent_id: str): user_message = "Hello! Here is my test message" messages = [] + + # Flags to track what we've received + user_message_found = False + agent_response_found = False async for message in send_event_and_poll_yielding( client=client, agent_id=agent_id, @@ -98,23 +101,25 @@ async def test_send_event_and_poll(self, client: AsyncAgentex, agent_id: str): sleep_interval=1.0, yield_updates=False, ): - messages.append(message) - assert len(messages) > 0 - # the first message should be the agent re-iterating what the user sent - assert isinstance(messages, List) - assert len(messages) == 2 - first_message: TaskMessage = messages[0] - assert first_message.content == TextContent( - author="user", - content=user_message, - type="text", - ) - - second_message: TaskMessage = messages[1] - assert second_message.content is not None - assert second_message.content.author == "agent" + # Validate messages as they come in + if message.content and hasattr(message.content, "author"): + if message.content.author == "user" and message.content.content == user_message: + user_message_found = True + elif message.content.author == "agent": + # Agent response should come after user message + assert user_message_found, "Agent response arrived before user message" + agent_response_found = True + + # Exit early if we've found all expected messages + if user_message_found and agent_response_found: + break + + # Validate we received expected messages + assert len(messages) >= 2, "Expected at least 2 messages (user + agent)" + assert user_message_found, "User message not found" + assert agent_response_found, "Agent response not found" # assert the state has been updated await asyncio.sleep(1) # wait for state to be updated @@ -158,7 +163,12 @@ async def test_send_event_and_stream(self, client: AsyncAgentex, agent_id: str): # Collect events from stream all_events = [] + # Flags to track what we've received + user_message_found = False + full_agent_message_found = False + delta_messages_found = False async def stream_messages() -> None: + nonlocal user_message_found, full_agent_message_found, delta_messages_found async for event in stream_agent_response( client=client, task_id=task.id, @@ -166,33 +176,30 @@ async def stream_messages() -> None: ): all_events.append(event) - stream_task = asyncio.create_task(stream_messages()) + # Check events as they arrive + event_type = event.get("type") + if event_type == "full": + content = event.get("content", {}) + if content.get("content") == user_message and content.get("author") == "user": + user_message_found = True + elif content.get("author") == "agent": + full_agent_message_found = True + elif event_type == "delta": + delta_messages_found = True + elif event_type == "done": + break + + # Exit early if we've found all expected messages + if user_message_found and full_agent_message_found and delta_messages_found: + break + stream_task = asyncio.create_task(stream_messages()) event_content = TextContentParam(type="text", author="user", content=user_message) await client.agents.send_event(agent_id=agent_id, params={"task_id": task.id, "content": event_content}) - - # Wait for streaming to complete await stream_task # Validate we received events assert len(all_events) > 0, "No events received in streaming response" - - # Check for user message, full agent response, and delta messages - user_message_found = False - full_agent_message_found = False - delta_messages_found = False - - for event in all_events: - event_type = event.get("type") - if event_type == "full": - content = event.get("content", {}) - if content.get("content") == user_message and content.get("author") == "user": - user_message_found = True - elif content.get("author") == "agent": - full_agent_message_found = True - elif event_type == "delta": - delta_messages_found = True - assert user_message_found, "User message not found in stream" assert full_agent_message_found, "Full agent message not found in stream" assert delta_messages_found, "Delta messages not found in stream (streaming response expected)" diff --git a/examples/tutorials/10_async/00_base/040_other_sdks/tests/test_agent.py b/examples/tutorials/10_async/00_base/040_other_sdks/tests/test_agent.py index 429d8d879..72a5cf120 100644 --- a/examples/tutorials/10_async/00_base/040_other_sdks/tests/test_agent.py +++ b/examples/tutorials/10_async/00_base/040_other_sdks/tests/test_agent.py @@ -94,6 +94,7 @@ async def test_send_event_and_poll_simple_query(self, client: AsyncAgentex, agen # Send a simple message that shouldn't require tool use user_message = "Hello! Please introduce yourself briefly." messages = [] + user_message_found = False async for message in send_event_and_poll_yielding( client=client, agent_id=agent_id, @@ -105,14 +106,17 @@ async def test_send_event_and_poll_simple_query(self, client: AsyncAgentex, agen assert isinstance(message, TaskMessage) messages.append(message) - if len(messages) == 1: + if message.content and message.content.author == "user": assert message.content == TextContent( author="user", content=user_message, type="text", ) + user_message_found = True break + assert user_message_found, "User message not found" + # Verify state has been updated by polling the states for 10 seconds for i in range(20): if i == 9: @@ -140,7 +144,6 @@ async def test_send_event_and_poll_with_tool_use(self, client: AsyncAgentex, age tool_request_found = False tool_response_found = False has_final_agent_response = False - async for message in send_event_and_poll_yielding( client=client, agent_id=agent_id, @@ -176,8 +179,10 @@ async def test_multi_turn_conversation_with_state(self, client: AsyncAgentex, ag # ensure the task is created before we send the first event await asyncio.sleep(1) + # First turn user_message_1 = "My favorite color is blue." + first_turn_response_found = False async for message in send_event_and_poll_yielding( client=client, agent_id=agent_id, @@ -193,8 +198,11 @@ async def test_multi_turn_conversation_with_state(self, client: AsyncAgentex, ag and message.content.author == "agent" and message.content.content ): + first_turn_response_found = True break + assert first_turn_response_found, "First turn response not found" + ## keep polling the states for 10 seconds for the input_list and turn_number to be updated for i in range(30): if i == 29: @@ -210,9 +218,10 @@ async def test_multi_turn_conversation_with_state(self, client: AsyncAgentex, ag assert state.get("turn_number") == 1 await asyncio.sleep(1) - found_response = False + # Second turn - reference previous context user_message_2 = "What did I just tell you my favorite color was?" + second_turn_response_found = False async for message in send_event_and_poll_yielding( client=client, agent_id=agent_id, @@ -229,10 +238,10 @@ async def test_multi_turn_conversation_with_state(self, client: AsyncAgentex, ag ): response_text = message.content.content.lower() assert "blue" in response_text - found_response = True + second_turn_response_found = True break - assert found_response, "Did not receive final agent text response" + assert second_turn_response_found, "Did not receive final agent text response" for i in range(10): if i == 9: raise Exception("Timeout waiting for state updates") @@ -271,7 +280,6 @@ async def test_send_event_and_stream_simple(self, client: AsyncAgentex, agent_id # Collect events from stream # Check for user message and delta messages user_message_found = False - async def stream_messages() -> None: nonlocal user_message_found async for event in stream_agent_response( @@ -291,12 +299,12 @@ async def stream_messages() -> None: elif msg_type == "done": break - stream_task = asyncio.create_task(stream_messages()) + if user_message_found: + break + stream_task = asyncio.create_task(stream_messages()) event_content = TextContentParam(type="text", author="user", content=user_message) await client.agents.send_event(agent_id=agent_id, params={"task_id": task.id, "content": event_content}) - - # Wait for streaming to complete await stream_task assert user_message_found, "User message found in stream" ## keep polling the states for 10 seconds for the input_list and turn_number to be updated @@ -333,10 +341,7 @@ async def test_send_event_and_stream_with_tools(self, client: AsyncAgentex, agen tool_requests_seen = [] tool_responses_seen = [] text_deltas_seen = [] - async def stream_messages() -> None: - nonlocal tool_requests_seen, tool_responses_seen, text_deltas_seen - async for event in stream_agent_response( client=client, task_id=task.id, @@ -378,11 +383,8 @@ async def stream_messages() -> None: break stream_task = asyncio.create_task(stream_messages()) - event_content = TextContentParam(type="text", author="user", content=user_message) await client.agents.send_event(agent_id=agent_id, params={"task_id": task.id, "content": event_content}) - - # Wait for streaming to complete await stream_task # Verify we saw tool usage (if the agent decided to use tools) diff --git a/examples/tutorials/10_async/00_base/080_batch_events/tests/test_agent.py b/examples/tutorials/10_async/00_base/080_batch_events/tests/test_agent.py index 6ccad7d2f..1dea1300b 100644 --- a/examples/tutorials/10_async/00_base/080_batch_events/tests/test_agent.py +++ b/examples/tutorials/10_async/00_base/080_batch_events/tests/test_agent.py @@ -75,6 +75,7 @@ async def test_send_event_and_poll(self, client: AsyncAgentex, agent_id: str): # Send an event and poll for response using the helper function # there should only be one message returned about batching + agent_response_found = False async for message in send_event_and_poll_yielding( client=client, agent_id=agent_id, @@ -84,10 +85,13 @@ async def test_send_event_and_poll(self, client: AsyncAgentex, agent_id: str): sleep_interval=1.0, ): assert isinstance(message, TaskMessage) - assert isinstance(message.content, TextContent) - assert "Processed event IDs" in message.content.content - assert message.content.author == "agent" - break + if message.content and message.content.author == "agent": + assert isinstance(message.content, TextContent) + assert "Processed event IDs" in message.content.content + agent_response_found = True + break + + assert agent_response_found, "Agent response not found" @pytest.mark.asyncio async def test_send_multiple_events_batched(self, client: AsyncAgentex, agent_id: str): @@ -159,6 +163,33 @@ async def test_send_twenty_events_batched_streaming(self, client: AsyncAgentex, task = task_response.result assert task is not None + # Stream the responses and collect agent messages + print("\nStreaming batch responses...") + + # We'll collect all agent messages from the stream + agent_messages = [] + stream_timeout = 90 # Longer timeout for 20 events + async def stream_messages() -> None: + async for event in stream_agent_response( + client=client, + task_id=task.id, + timeout=stream_timeout, + ): + # Collect agent text messages + if event.get("type") == "full": + content = event.get("content", {}) + if content.get("type") == "text" and content.get("author") == "agent": + msg_content = content.get("content", "") + if msg_content and msg_content.strip(): + agent_messages.append(msg_content) + elif event.get("type") == "done": + break + + if len(agent_messages) >= 2: + break + + stream_task = asyncio.create_task(stream_messages()) + # Send 10 events in quick succession (should be batched) num_events = 10 print(f"\nSending {num_events} events in quick succession...") @@ -167,28 +198,7 @@ async def test_send_twenty_events_batched_streaming(self, client: AsyncAgentex, await client.agents.send_event(agent_id=agent_id, params={"task_id": task.id, "content": event_content}) await asyncio.sleep(0.1) # Small delay to ensure ordering - # Stream the responses and collect agent messages - print("\nStreaming batch responses...") - - # We'll collect all agent messages from the stream - agent_messages = [] - stream_timeout = 90 # Longer timeout for 20 events - - async for event in stream_agent_response( - client=client, - task_id=task.id, - timeout=stream_timeout, - ): - # Collect agent text messages - if event.get("type") == "full": - content = event.get("content", {}) - if content.get("type") == "text" and content.get("author") == "agent": - msg_content = content.get("content", "") - if msg_content and msg_content.strip(): - agent_messages.append(msg_content) - - if len(agent_messages) >= 2: - break + await stream_task print(f"\nSent {num_events} events") print(f"Received {len(agent_messages)} agent response(s)") diff --git a/examples/tutorials/10_async/00_base/090_multi_agent_non_temporal/tests/test_agent.py b/examples/tutorials/10_async/00_base/090_multi_agent_non_temporal/tests/test_agent.py index d4c1dd7dd..8af941a81 100644 --- a/examples/tutorials/10_async/00_base/090_multi_agent_non_temporal/tests/test_agent.py +++ b/examples/tutorials/10_async/00_base/090_multi_agent_non_temporal/tests/test_agent.py @@ -17,9 +17,10 @@ import os import uuid +import asyncio -# import pytest -# import pytest_asyncio +import pytest +import pytest_asyncio from test_utils.async_utils import ( stream_agent_response, send_event_and_poll_yielding, @@ -102,9 +103,10 @@ async def test_multi_agent_workflow_complete(self, client: AsyncAgentex, agent_i ): messages.append(message) # Print messages as they arrive to show real-time progress - if message.content and message.content.content: + msg_text = getattr(message.content, "content", None) if message.content else None + if isinstance(msg_text, str) and msg_text: # Track agent participation as messages arrive - content = message.content.content.lower() + content = msg_text.lower() if "starting content workflow" in content: workflow_markers["orchestrator_started"] = True @@ -178,47 +180,54 @@ async def test_multi_agent_workflow_streaming(self, client: AsyncAgentex, agent_ creator_iterations = 0 critic_feedback_count = 0 + async def stream_messages() -> None: + nonlocal creator_iterations, critic_feedback_count + async for event in stream_agent_response( + client=client, + task_id=task.id, + timeout=120, + ): + # Handle different event types + if event.get("type") == "full": + content = event.get("content", {}) + if content.get("type") == "text" and content.get("author") == "agent": + message_text = content.get("content", "") + all_messages.append(message_text) + + # Track agent participation + content_lower = message_text.lower() + + if "starting content workflow" in content_lower: + workflow_markers["orchestrator_started"] = True + + if "creator output" in content_lower: + creator_iterations += 1 + workflow_markers["creator_called"] = True + + if "critic feedback" in content_lower or "content approved by critic" in content_lower: + if "critic feedback" in content_lower: + critic_feedback_count += 1 + workflow_markers["critic_called"] = True + + if "calling formatter agent" in content_lower: + workflow_markers["formatter_called"] = True + + if "workflow complete" in content_lower or "content creation complete" in content_lower: + workflow_markers["workflow_completed"] = True + + if event.get("type") == "done": + break + + # Check if all agents have participated + if all(workflow_markers.values()): + break + + stream_task = asyncio.create_task(stream_messages()) + # Send the event to trigger the agent workflow event_content = TextContentParam(type="text", author="user", content=json.dumps(request_json)) await client.agents.send_event(agent_id=agent_id, params={"task_id": task.id, "content": event_content}) - - async for event in stream_agent_response( - client=client, - task_id=task.id, - timeout=120, - ): - # Handle different event types - if event.get("type") == "full": - content = event.get("content", {}) - if content.get("type") == "text" and content.get("author") == "agent": - message_text = content.get("content", "") - all_messages.append(message_text) - - # Track agent participation - content_lower = message_text.lower() - - if "starting content workflow" in content_lower: - workflow_markers["orchestrator_started"] = True - - if "creator output" in content_lower: - creator_iterations += 1 - workflow_markers["creator_called"] = True - - if "critic feedback" in content_lower or "content approved by critic" in content_lower: - if "critic feedback" in content_lower: - critic_feedback_count += 1 - workflow_markers["critic_called"] = True - - if "calling formatter agent" in content_lower: - workflow_markers["formatter_called"] = True - - if "workflow complete" in content_lower or "content creation complete" in content_lower: - workflow_markers["workflow_completed"] = True - - # Check if all agents have participated - all_agents_done = all(workflow_markers.values()) - if all_agents_done: - break + await stream_task # Validate we got streaming responses assert len(all_messages) > 0, "No messages received from streaming" diff --git a/examples/tutorials/10_async/10_temporal/000_hello_acp/tests/test_agent.py b/examples/tutorials/10_async/10_temporal/000_hello_acp/tests/test_agent.py index 9150afaa2..65204ac36 100644 --- a/examples/tutorials/10_async/10_temporal/000_hello_acp/tests/test_agent.py +++ b/examples/tutorials/10_async/10_temporal/000_hello_acp/tests/test_agent.py @@ -72,8 +72,8 @@ async def test_send_event_and_poll(self, client: AsyncAgentex, agent_id: str): task_response = await client.agents.create_task(agent_id, params=ParamsCreateTaskRequest(name=uuid.uuid1().hex)) task = task_response.result assert task is not None + task_creation_found = False - # Poll for the initial task creation message async for message in poll_messages( client=client, task_id=task.id, @@ -86,10 +86,12 @@ async def test_send_event_and_poll(self, client: AsyncAgentex, agent_id: str): task_creation_found = True break - assert task_creation_found, "Task creation message not found in poll" + assert task_creation_found, "Task creation message not found" await asyncio.sleep(1.5) + # Send an event and poll for response user_message = "Hello, this is a test message!" + agent_response_found = False async for message in send_event_and_poll_yielding( client=client, agent_id=agent_id, @@ -100,8 +102,11 @@ async def test_send_event_and_poll(self, client: AsyncAgentex, agent_id: str): ): if message.content and message.content.type == "text" and message.content.author == "agent": assert "Hello! I've received your message" in message.content.content + agent_response_found = True break + assert agent_response_found, "Agent response not found" + class TestStreamingEvents: """Test streaming event sending.""" @@ -126,7 +131,7 @@ async def test_send_event_and_stream(self, client: AsyncAgentex, agent_id: str): task_creation_found = True break - assert task_creation_found, "Task creation message not found in poll" + assert task_creation_found, "Task creation message not found" user_message = "Hello, this is a test message!" @@ -137,9 +142,9 @@ async def test_send_event_and_stream(self, client: AsyncAgentex, agent_id: str): user_echo_found = False agent_response_found = False stream_timeout = 30 - async def collect_stream_events(): #noqa: ANN101 - nonlocal user_echo_found, agent_response_found + async def stream_messages() -> None: + nonlocal user_echo_found, agent_response_found async for event in stream_agent_response( client=client, task_id=task.id, @@ -161,29 +166,24 @@ async def collect_stream_events(): #noqa: ANN101 # Check for user message echo if content.get("content") == user_message: user_echo_found = True + elif event_type == "done": + break # Exit early if we've found all expected messages if user_echo_found and agent_response_found: break - # Start streaming task - stream_task = asyncio.create_task(collect_stream_events()) + + stream_task = asyncio.create_task(stream_messages()) # Send the event event_content = TextContentParam(type="text", author="user", content=user_message) await client.agents.send_event(agent_id=agent_id, params={"task_id": task.id, "content": event_content}) - - # Wait for the stream to complete (with timeout) - try: - await asyncio.wait_for(stream_task, timeout=stream_timeout) - except asyncio.TimeoutError: - pytest.fail(f"Stream timed out after {stream_timeout}s waiting for expected messages") + await stream_task # Verify all expected messages were received (fail if stream ended without finding them) assert user_echo_found, "User message echo not found in stream" assert agent_response_found, "Agent response not found in stream" - # Wait for streaming to complete - await stream_task if __name__ == "__main__": pytest.main([__file__, "-v"]) \ No newline at end of file diff --git a/examples/tutorials/10_async/10_temporal/010_agent_chat/tests/test_agent.py b/examples/tutorials/10_async/10_temporal/010_agent_chat/tests/test_agent.py index 6eb03f728..6e7f46e74 100644 --- a/examples/tutorials/10_async/10_temporal/010_agent_chat/tests/test_agent.py +++ b/examples/tutorials/10_async/10_temporal/010_agent_chat/tests/test_agent.py @@ -87,6 +87,7 @@ async def test_send_event_and_poll_simple_query(self, client: AsyncAgentex, agen # Send a simple message that shouldn't require tool use user_message = "Hello! Please introduce yourself briefly." messages = [] + user_message_found = False async for message in send_event_and_poll_yielding( client=client, agent_id=agent_id, @@ -98,14 +99,17 @@ async def test_send_event_and_poll_simple_query(self, client: AsyncAgentex, agen assert isinstance(message, TaskMessage) messages.append(message) - if len(messages) == 1: + if message.content and message.content.author == "user": assert message.content == TextContent( author="user", content=user_message, type="text", ) + user_message_found = True break + assert user_message_found, "User message not found" + @pytest.mark.asyncio async def test_send_event_and_poll_with_calculator(self, client: AsyncAgentex, agent_id: str): """Test sending an event that triggers calculator tool usage and polling for the response.""" @@ -120,7 +124,6 @@ async def test_send_event_and_poll_with_calculator(self, client: AsyncAgentex, a # Send a message that could trigger the calculator tool (though with reasoning, it may not need it) user_message = "What is 15 multiplied by 37?" has_final_agent_response = False - async for message in send_event_and_poll_yielding( client=client, agent_id=agent_id, @@ -151,6 +154,7 @@ async def test_multi_turn_conversation(self, client: AsyncAgentex, agent_id: str # First turn user_message_1 = "My favorite color is blue." + first_turn_found = False async for message in send_event_and_poll_yielding( client=client, agent_id=agent_id, @@ -166,8 +170,11 @@ async def test_multi_turn_conversation(self, client: AsyncAgentex, agent_id: str and message.content.author == "agent" and message.content.content ): + first_turn_found = True break + assert first_turn_found, "First turn response not found" + # Wait a bit for state to update await asyncio.sleep(2) @@ -216,8 +223,7 @@ async def test_send_event_and_stream_with_reasoning(self, client: AsyncAgentex, user_message_found = False agent_response_found = False reasoning_found = False - - async def stream_messages() -> None: # noqa: ANN101 + async def stream_messages() -> None: nonlocal user_message_found, agent_response_found, reasoning_found async for event in stream_agent_response( client=client, @@ -249,9 +255,9 @@ async def stream_messages() -> None: # noqa: ANN101 break elif msg_type == "done": - task_message_update = StreamTaskMessageDone.model_validate(event) - if task_message_update.parent_task_message and task_message_update.parent_task_message.id: - finished_message = await client.messages.retrieve(task_message_update.parent_task_message.id) + task_message_update_done = StreamTaskMessageDone.model_validate(event) + if task_message_update_done.parent_task_message and task_message_update_done.parent_task_message.id: + finished_message = await client.messages.retrieve(task_message_update_done.parent_task_message.id) if finished_message.content and finished_message.content.type == "reasoning": reasoning_found = True elif ( @@ -260,22 +266,12 @@ async def stream_messages() -> None: # noqa: ANN101 and finished_message.content.author == "agent" ): agent_response_found = True - - # Exit early if we have what we need - if user_message_found and agent_response_found: - break + break stream_task = asyncio.create_task(stream_messages()) - event_content = TextContentParam(type="text", author="user", content=user_message) await client.agents.send_event(agent_id=agent_id, params={"task_id": task.id, "content": event_content}) - - # Wait for streaming to complete with timeout - try: - await asyncio.wait_for(stream_task, timeout=120) # Overall timeout for CI - except asyncio.TimeoutError: - stream_task.cancel() - pytest.fail("Test timed out waiting for streaming response") + await stream_task assert user_message_found, "User message not found in stream" assert agent_response_found, "Agent response not found in stream" diff --git a/examples/tutorials/10_async/10_temporal/020_state_machine/tests/test_agent.py b/examples/tutorials/10_async/10_temporal/020_state_machine/tests/test_agent.py index 5c458fe89..fac8605aa 100644 --- a/examples/tutorials/10_async/10_temporal/020_state_machine/tests/test_agent.py +++ b/examples/tutorials/10_async/10_temporal/020_state_machine/tests/test_agent.py @@ -94,6 +94,7 @@ async def test_send_event_and_poll_simple_query(self, client: AsyncAgentex, agen timeout=30, sleep_interval=1.0, ): + messages.append(message) ## we should expect to get a question from the agent if message.content.type == "text" and message.content.author == "agent": found_agent_message = True @@ -135,7 +136,8 @@ async def test_send_event_and_stream(self, client: AsyncAgentex, agent_id: str): assert task is not None found_agent_message = False - async def poll_message_in_background() -> None: + user_message = "Hello! Please tell me the latest news about AI and AI startups." + async def stream_first_turn() -> None: nonlocal found_agent_message async for message in stream_task_messages( client=client, @@ -146,19 +148,19 @@ async def poll_message_in_background() -> None: found_agent_message = True break - assert found_agent_message, "Did not find an agent message" - - poll_task = asyncio.create_task(poll_message_in_background()) - # create the first - user_message = "Hello! Please tell me the latest news about AI and AI startups." - await client.agents.send_event(agent_id=agent_id, params={"task_id": task.id, "content": TextContentParam(type="text", author="user", content=user_message)}) - - await poll_task + stream_task = asyncio.create_task(stream_first_turn()) + await client.agents.send_event( + agent_id=agent_id, + params={"task_id": task.id, "content": TextContentParam(type="text", author="user", content=user_message)}, + ) + await stream_task + assert found_agent_message, "Did not find an agent message" await asyncio.sleep(2) starting_deep_research_message = False uses_tool_requests = False - async def poll_message_in_background_2() -> None: + next_user_message = "I want to know what viral news came up and which startups failed, got acquired, or became very successful or popular in the last 3 months" + async def stream_second_turn() -> None: nonlocal starting_deep_research_message, uses_tool_requests async for message in stream_task_messages( client=client, @@ -173,14 +175,18 @@ async def poll_message_in_background_2() -> None: uses_tool_requests = True break - assert starting_deep_research_message, "Did not start deep research" - assert uses_tool_requests, "Did not use tool requests" - - poll_task_2 = asyncio.create_task(poll_message_in_background_2()) + stream_task = asyncio.create_task(stream_second_turn()) + await client.agents.send_event( + agent_id=agent_id, + params={ + "task_id": task.id, + "content": TextContentParam(type="text", author="user", content=next_user_message), + }, + ) + await stream_task - next_user_message = "I want to know what viral news came up and which startups failed, got acquired, or became very successful or popular in the last 3 months" - await client.agents.send_event(agent_id=agent_id, params={"task_id": task.id, "content": TextContentParam(type="text", author="user", content=next_user_message)}) - await poll_task_2 + assert starting_deep_research_message, "Did not start deep research" + assert uses_tool_requests, "Did not use tool requests" if __name__ == "__main__": diff --git a/examples/tutorials/10_async/10_temporal/050_agent_chat_guardrails/project/workflow.py b/examples/tutorials/10_async/10_temporal/050_agent_chat_guardrails/project/workflow.py index c6d2f11ff..b54c8fade 100644 --- a/examples/tutorials/10_async/10_temporal/050_agent_chat_guardrails/project/workflow.py +++ b/examples/tutorials/10_async/10_temporal/050_agent_chat_guardrails/project/workflow.py @@ -6,6 +6,7 @@ from typing import Any, Dict, List, override from mcp import StdioServerParameters +from agents import ModelSettings, RunContextWrapper from dotenv import load_dotenv # Simple guardrail output model for this example @@ -15,12 +16,10 @@ from agentex.lib import adk from agentex.lib.types.acp import SendEventParams, CreateTaskParams -from agentex.lib.adk.models import ModelSettings from agentex.lib.types.tracing import SGPTracingProcessorConfig from agentex.lib.utils.logging import make_logger from agentex.types.text_content import TextContent from agentex.lib.utils.model_utils import BaseModel -from agentex.lib.core.base.run_context import RunContextWrapper from agentex.lib.environment_variables import EnvironmentVariables from agentex.lib.core.temporal.types.workflow import SignalName from agentex.lib.core.temporal.workflows.workflow import BaseWorkflow @@ -36,6 +35,7 @@ class GuardrailFunctionOutput(BaseModel): """Output from a guardrail function.""" + output_info: Dict[str, Any] tripwire_triggered: bool @@ -99,10 +99,7 @@ async def calculator(context: RunContextWrapper, args: str) -> str: # noqa: ARG b = parsed_args.get("b") if operation is None or a is None or b is None: - return ( - "Error: Missing required parameters. " - "Please provide 'operation', 'a', and 'b'." - ) + return "Error: Missing required parameters. Please provide 'operation', 'a', and 'b'." # Convert to numbers try: @@ -124,10 +121,7 @@ async def calculator(context: RunContextWrapper, args: str) -> str: # noqa: ARG result = a / b else: supported_ops = "add, subtract, multiply, divide" - return ( - f"Error: Unknown operation '{operation}'. " - f"Supported operations: {supported_ops}." - ) + return f"Error: Unknown operation '{operation}'. Supported operations: {supported_ops}." # Format the result nicely if result == int(result): @@ -160,9 +154,7 @@ async def calculator(context: RunContextWrapper, args: str) -> str: # noqa: ARG # Define the spaghetti guardrail function async def check_spaghetti_guardrail( - ctx: RunContextWrapper[None], - agent: Agent, - input: str | list + ctx: RunContextWrapper[None], agent: Agent, input: str | list ) -> GuardrailFunctionOutput: """ A simple guardrail that checks if 'spaghetti' is mentioned in the input. @@ -185,25 +177,22 @@ async def check_spaghetti_guardrail( return GuardrailFunctionOutput( output_info={ "contains_spaghetti": contains_spaghetti, - "checked_text": ( - input_text[:200] + "..." - if len(input_text) > 200 else input_text - ), + "checked_text": (input_text[:200] + "..." if len(input_text) > 200 else input_text), "rejection_message": ( "I'm sorry, but I cannot process messages about spaghetti. " "This guardrail was put in place for demonstration purposes. " "Please ask me about something else!" - ) if contains_spaghetti else None + ) + if contains_spaghetti + else None, }, - tripwire_triggered=contains_spaghetti + tripwire_triggered=contains_spaghetti, ) # Define soup input guardrail function async def check_soup_guardrail( - ctx: RunContextWrapper[None], - agent: Agent, - input: str | list + ctx: RunContextWrapper[None], agent: Agent, input: str | list ) -> GuardrailFunctionOutput: """ A guardrail that checks if 'soup' is mentioned in the input. @@ -226,44 +215,33 @@ async def check_soup_guardrail( return GuardrailFunctionOutput( output_info={ "contains_soup": contains_soup, - "checked_text": ( - input_text[:200] + "..." - if len(input_text) > 200 else input_text - ), + "checked_text": (input_text[:200] + "..." if len(input_text) > 200 else input_text), "rejection_message": ( "I'm sorry, but I cannot process messages about soup. " "This is a demonstration guardrail for testing purposes. " "Please ask about something other than soup!" - ) if contains_soup else None + ) + if contains_soup + else None, }, - tripwire_triggered=contains_soup + tripwire_triggered=contains_soup, ) # Create the input guardrails -SPAGHETTI_GUARDRAIL = TemporalInputGuardrail( - guardrail_function=check_spaghetti_guardrail, - name="spaghetti_guardrail" -) +SPAGHETTI_GUARDRAIL = TemporalInputGuardrail(guardrail_function=check_spaghetti_guardrail, name="spaghetti_guardrail") -SOUP_GUARDRAIL = TemporalInputGuardrail( - guardrail_function=check_soup_guardrail, - name="soup_guardrail" -) +SOUP_GUARDRAIL = TemporalInputGuardrail(guardrail_function=check_soup_guardrail, name="soup_guardrail") # Define pizza output guardrail function -async def check_pizza_guardrail( - ctx: RunContextWrapper[None], - agent: Agent, - output: str -) -> GuardrailFunctionOutput: +async def check_pizza_guardrail(ctx: RunContextWrapper[None], agent: Agent, output: str) -> GuardrailFunctionOutput: """ An output guardrail that prevents mentioning pizza. """ output_text = output.lower() if isinstance(output, str) else "" contains_pizza = "pizza" in output_text - + return GuardrailFunctionOutput( output_info={ "contains_pizza": contains_pizza, @@ -271,24 +249,22 @@ async def check_pizza_guardrail( "I cannot provide this response as it mentions pizza. " "Due to content policies, I need to avoid discussing pizza. " "Let me provide a different response." - ) if contains_pizza else None + ) + if contains_pizza + else None, }, - tripwire_triggered=contains_pizza + tripwire_triggered=contains_pizza, ) # Define sushi output guardrail function -async def check_sushi_guardrail( - ctx: RunContextWrapper[None], - agent: Agent, - output: str -) -> GuardrailFunctionOutput: +async def check_sushi_guardrail(ctx: RunContextWrapper[None], agent: Agent, output: str) -> GuardrailFunctionOutput: """ An output guardrail that prevents mentioning sushi. """ output_text = output.lower() if isinstance(output, str) else "" contains_sushi = "sushi" in output_text - + return GuardrailFunctionOutput( output_info={ "contains_sushi": contains_sushi, @@ -296,29 +272,23 @@ async def check_sushi_guardrail( "I cannot mention sushi in my response. " "This guardrail prevents discussions about sushi for demonstration purposes. " "Please let me provide information about other topics." - ) if contains_sushi else None + ) + if contains_sushi + else None, }, - tripwire_triggered=contains_sushi + tripwire_triggered=contains_sushi, ) # Create the output guardrails -PIZZA_GUARDRAIL = TemporalOutputGuardrail( - guardrail_function=check_pizza_guardrail, - name="pizza_guardrail" -) +PIZZA_GUARDRAIL = TemporalOutputGuardrail(guardrail_function=check_pizza_guardrail, name="pizza_guardrail") -SUSHI_GUARDRAIL = TemporalOutputGuardrail( - guardrail_function=check_sushi_guardrail, - name="sushi_guardrail" -) +SUSHI_GUARDRAIL = TemporalOutputGuardrail(guardrail_function=check_sushi_guardrail, name="sushi_guardrail") # Example output guardrail function (kept for reference) async def check_output_length_guardrail( - ctx: RunContextWrapper[None], - agent: Agent, - output: str + ctx: RunContextWrapper[None], agent: Agent, output: str ) -> GuardrailFunctionOutput: """ A simple output guardrail that checks if the response is too long. @@ -326,7 +296,7 @@ async def check_output_length_guardrail( # Check the length of the output max_length = 1000 # Maximum allowed characters is_too_long = len(output) > max_length if isinstance(output, str) else False - + return GuardrailFunctionOutput( output_info={ "output_length": len(output) if isinstance(output, str) else 0, @@ -336,9 +306,11 @@ async def check_output_length_guardrail( f"I'm sorry, but my response is too long ({len(output)} characters). " f"Please ask a more specific question so I can provide a concise answer " f"(max {max_length} characters)." - ) if is_too_long else None + ) + if is_too_long + else None, }, - tripwire_triggered=is_too_long + tripwire_triggered=is_too_long, ) @@ -353,10 +325,7 @@ async def check_output_length_guardrail( # Create the calculator tool CALCULATOR_TOOL = FunctionTool( name="calculator", - description=( - "Performs basic arithmetic operations (add, subtract, multiply, " - "divide) on two numbers." - ), + description=("Performs basic arithmetic operations (add, subtract, multiply, divide) on two numbers."), params_json_schema={ "type": "object", "properties": { @@ -390,16 +359,13 @@ def __init__(self): @workflow.signal(name=SignalName.RECEIVE_EVENT) @override async def on_task_event_send(self, params: SendEventParams) -> None: - if not params.event.content: return if params.event.content.type != "text": raise ValueError(f"Expected text message, got {params.event.content.type}") if params.event.content.author != "user": - raise ValueError( - f"Expected user message, got {params.event.content.author}" - ) + raise ValueError(f"Expected user message, got {params.event.content.author}") if self._state is None: raise ValueError("State is not initialized") @@ -407,9 +373,7 @@ async def on_task_event_send(self, params: SendEventParams) -> None: # Increment the turn number self._state.turn_number += 1 # Add the new user message to the message history - self._state.input_list.append( - {"role": "user", "content": params.event.content.content} - ) + self._state.input_list.append({"role": "user", "content": params.event.content.content}) async with adk.tracing.span( trace_id=params.task.id, @@ -475,7 +439,7 @@ async def on_task_event_send(self, params: SendEventParams) -> None: input_guardrails=[SPAGHETTI_GUARDRAIL, SOUP_GUARDRAIL], output_guardrails=[PIZZA_GUARDRAIL, SUSHI_GUARDRAIL], ) - + # Update state with the final input list from result if self._state and result: final_list = getattr(result, "final_input_list", None) diff --git a/examples/tutorials/10_async/10_temporal/060_open_ai_agents_sdk_hello_world/manifest.yaml b/examples/tutorials/10_async/10_temporal/060_open_ai_agents_sdk_hello_world/manifest.yaml index 773d5ba44..b339542d5 100644 --- a/examples/tutorials/10_async/10_temporal/060_open_ai_agents_sdk_hello_world/manifest.yaml +++ b/examples/tutorials/10_async/10_temporal/060_open_ai_agents_sdk_hello_world/manifest.yaml @@ -104,7 +104,6 @@ agent: # Optional: Set Environment variables for running your agent locally as well # as for deployment later on env: - OPENAI_API_KEY: "" # OPENAI_BASE_URL: "" OPENAI_ORG_ID: "" diff --git a/examples/tutorials/10_async/10_temporal/060_open_ai_agents_sdk_hello_world/tests/test_agent.py b/examples/tutorials/10_async/10_temporal/060_open_ai_agents_sdk_hello_world/tests/test_agent.py index d571e0e7a..73cf60db9 100644 --- a/examples/tutorials/10_async/10_temporal/060_open_ai_agents_sdk_hello_world/tests/test_agent.py +++ b/examples/tutorials/10_async/10_temporal/060_open_ai_agents_sdk_hello_world/tests/test_agent.py @@ -69,6 +69,7 @@ async def test_send_event_and_poll(self, client: AsyncAgentex, agent_id: str): assert task is not None # Poll for the initial task creation message + task_creation_found = False async for message in poll_messages( client=client, task_id=task.id, @@ -80,11 +81,13 @@ async def test_send_event_and_poll(self, client: AsyncAgentex, agent_id: str): # Check for the Haiku Assistant welcome message assert "Haiku Assistant" in message.content.content assert "Temporal" in message.content.content + task_creation_found = True break + assert task_creation_found, "Task creation message not found" + # Send event and poll for response with streaming updates user_message = "Hello how is life?" - print(f"[DEBUG 060 POLL] Sending message: '{user_message}'") # Use yield_updates=True to get all streaming chunks as they're written final_message = None @@ -98,15 +101,10 @@ async def test_send_event_and_poll(self, client: AsyncAgentex, agent_id: str): yield_updates=True, # Get updates as streaming writes chunks ): if message.content and message.content.type == "text" and message.content.author == "agent": - print( - f"[DEBUG 060 POLL] Received update - Status: {message.streaming_status}, " - f"Content length: {len(message.content.content)}" - ) final_message = message # Stop polling once we get a DONE message if message.streaming_status == "DONE": - print(f"[DEBUG 060 POLL] Streaming complete!") break # Verify the final message has content (the haiku) @@ -114,10 +112,6 @@ async def test_send_event_and_poll(self, client: AsyncAgentex, agent_id: str): assert final_message.content is not None, "Final message should have content" assert len(final_message.content.content) > 0, "Final message should have haiku content" - print(f"[DEBUG 060 POLL] ✅ Successfully received haiku response!") - print(f"[DEBUG 060 POLL] Final haiku:\n{final_message.content.content}") - pass - class TestStreamingEvents: """Test streaming event sending (backend verification via polling).""" diff --git a/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/manifest.yaml b/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/manifest.yaml index 40bf9e78a..d28da57b6 100644 --- a/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/manifest.yaml +++ b/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/manifest.yaml @@ -102,7 +102,6 @@ agent: # Optional: Set Environment variables for running your agent locally as well # as for deployment later on env: - OPENAI_API_KEY: "" # OPENAI_BASE_URL: "" OPENAI_ORG_ID: "" diff --git a/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/tests/test_agent.py b/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/tests/test_agent.py index d6fdc6ff8..8bf59cafc 100644 --- a/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/tests/test_agent.py +++ b/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/tests/test_agent.py @@ -70,7 +70,8 @@ async def test_send_event_and_poll(self, client: AsyncAgentex, agent_id: str): assert task is not None # Poll for the initial task creation message - print(f"[DEBUG 070 POLL] Polling for initial task creation message...") + + task_creation_found = False async for message in poll_messages( client=client, task_id=task.id, @@ -80,61 +81,57 @@ async def test_send_event_and_poll(self, client: AsyncAgentex, agent_id: str): assert isinstance(message, TaskMessage) if message.content and message.content.type == "text" and message.content.author == "agent": # Check for the initial acknowledgment message - print(f"[DEBUG 070 POLL] Initial message: {message.content.content[:100]}") assert "task" in message.content.content.lower() or "received" in message.content.content.lower() + task_creation_found = True break + assert task_creation_found, "Task creation message not found" + # Send an event asking about the weather in NYC and poll for response with streaming user_message = "What is the weather in New York City?" - print(f"[DEBUG 070 POLL] Sending message: '{user_message}'") # Track what we've seen to ensure tool calls happened seen_tool_request = False seen_tool_response = False final_message = None - async for message in send_event_and_poll_yielding( client=client, agent_id=agent_id, task_id=task.id, user_message=user_message, timeout=60, - sleep_interval=1.0 - ): + sleep_interval=1.0, + ): assert isinstance(message, TaskMessage) - print(f"[DEBUG 070 POLL] Received message - Type: {message.content.type if message.content else 'None'}, Author: {message.content.author if message.content else 'None'}, Status: {message.streaming_status}") # Track tool_request messages (agent calling get_weather) if message.content and message.content.type == "tool_request": - print(f"[DEBUG 070 POLL] ✅ Saw tool_request - agent is calling get_weather tool") seen_tool_request = True # Track tool_response messages (get_weather result) if message.content and message.content.type == "tool_response": - print(f"[DEBUG 070 POLL] ✅ Saw tool_response - get_weather returned result") seen_tool_response = True # Track agent text messages and their streaming updates if message.content and message.content.type == "text" and message.content.author == "agent": - content_length = len(message.content.content) if message.content.content else 0 - print(f"[DEBUG 070 POLL] Agent text update - Status: {message.streaming_status}, Length: {content_length}") + agent_text = getattr(message.content, "content", "") or "" + content_length = len(str(agent_text)) final_message = message # Stop when we get DONE status if message.streaming_status == "DONE" and content_length > 0: - print(f"[DEBUG 070 POLL] ✅ Streaming complete!") break # Verify we got all the expected pieces assert seen_tool_request, "Expected to see tool_request message (agent calling get_weather)" assert seen_tool_response, "Expected to see tool_response message (get_weather result)" assert final_message is not None, "Expected to see final agent text message" - assert final_message.content is not None and len(final_message.content.content) > 0, "Final message should have content" + final_text = getattr(final_message.content, "content", None) if final_message.content else None + assert isinstance(final_text, str) and len(final_text) > 0, "Final message should have content" # Check that the response contains the temperature (22 degrees) # The get_weather activity returns "The weather in New York City is 22 degrees Celsius" - print(f"[DEBUG 070 POLL] Final response: {final_message.content.content}") - assert "22" in final_message.content.content, "Expected weather response to contain temperature (22 degrees)" + assert "22" in final_text, "Expected weather response to contain temperature (22 degrees)" class TestStreamingEvents: diff --git a/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/manifest.yaml b/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/manifest.yaml index 09f49c3e5..f6fc7e9ca 100644 --- a/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/manifest.yaml +++ b/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/manifest.yaml @@ -104,7 +104,6 @@ agent: # Optional: Set Environment variables for running your agent locally as well # as for deployment later on env: - OPENAI_API_KEY: "" # OPENAI_BASE_URL: "" OPENAI_ORG_ID: "" diff --git a/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/tests/test_agent.py b/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/tests/test_agent.py index 5b0c2f74c..3377c1ea8 100644 --- a/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/tests/test_agent.py +++ b/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/tests/test_agent.py @@ -87,7 +87,7 @@ async def test_send_event_and_poll_with_human_approval(self, client: AsyncAgente assert task is not None # Poll for the initial task creation message - print(f"[DEBUG 080 POLL] Polling for initial task creation message...") + task_creation_found = False async for message in poll_messages( client=client, task_id=task.id, @@ -97,91 +97,67 @@ async def test_send_event_and_poll_with_human_approval(self, client: AsyncAgente assert isinstance(message, TaskMessage) if message.content and message.content.type == "text" and message.content.author == "agent": # Check for the initial acknowledgment message - print(f"[DEBUG 080 POLL] Initial message: {message.content.content[:100]}") assert "task" in message.content.content.lower() or "received" in message.content.content.lower() + task_creation_found = True break + assert task_creation_found, "Task creation message not found" + # Send an event asking to confirm an order (triggers human-in-the-loop) user_message = "Please confirm my order" - print(f"[DEBUG 080 POLL] Sending message: '{user_message}'") # Track what we've seen to ensure human-in-the-loop flow happened seen_tool_request = False seen_tool_response = False found_final_response = False - child_workflow_detected = False - - # Start polling for messages in the background - async def poll_and_detect(): - nonlocal seen_tool_request, seen_tool_response, found_final_response, child_workflow_detected - - async for message in send_event_and_poll_yielding( - client=client, - agent_id=agent_id, - task_id=task.id, - user_message=user_message, - timeout=120, # Longer timeout for human-in-the-loop - sleep_interval=1.0, - yield_updates=True, # Get all streaming chunks - ): - assert isinstance(message, TaskMessage) - print(f"[DEBUG 080 POLL] Received message - Type: {message.content.type if message.content else 'None'}, Author: {message.content.author if message.content else 'None'}, Status: {message.streaming_status}") - - # Track tool_request messages (agent calling wait_for_confirmation) - if message.content and message.content.type == "tool_request": - print(f"[DEBUG 080 POLL] ✅ Saw tool_request - agent is calling wait_for_confirmation tool") - print(f"[DEBUG 080 POLL] 🔔 Child workflow should be spawned - will signal it to approve") - seen_tool_request = True - child_workflow_detected = True - - # Track tool_response messages (child workflow completion) - if message.content and message.content.type == "tool_response": - print(f"[DEBUG 080 POLL] ✅ Saw tool_response - child workflow completed after approval") - seen_tool_response = True - - # Track agent text messages and their streaming updates - if message.content and message.content.type == "text" and message.content.author == "agent": - content_length = len(message.content.content) if message.content.content else 0 - print(f"[DEBUG 080 POLL] Agent text update - Status: {message.streaming_status}, Length: {content_length}") - - # Stop when we get DONE status with actual content - if message.streaming_status == "DONE" and content_length > 0: - print(f"[DEBUG 080 POLL] ✅ Streaming complete!") - found_final_response = True - break - - # Start polling task - polling_task = asyncio.create_task(poll_and_detect()) - - # Wait a bit for the child workflow to be created - print(f"[DEBUG 080 POLL] Waiting for child workflow to spawn...") - await asyncio.sleep(5) - - # Send signal to child workflow to approve the order - # The child workflow ID is fixed as "child-workflow-id" (see tools.py) - try: - print(f"[DEBUG 080 POLL] Sending approval signal to child workflow...") - handle = temporal_client.get_workflow_handle("child-workflow-id") - await handle.signal("fulfill_order_signal", True) - print(f"[DEBUG 080 POLL] ✅ Approval signal sent successfully!") - except Exception as e: - print(f"[DEBUG 080 POLL] ⚠️ Warning: Could not send signal to child workflow: {e}") - print(f"[DEBUG 080 POLL] This may be expected if workflow completed before signal could be sent") - - # Wait for polling to complete - try: - await asyncio.wait_for(polling_task, timeout=60) - except asyncio.TimeoutError: - print(f"[DEBUG 080 POLL] ⚠️ Polling timed out - workflow may still be waiting") - polling_task.cancel() + approval_signal_sent = False + + async for message in send_event_and_poll_yielding( + client=client, + agent_id=agent_id, + task_id=task.id, + user_message=user_message, + timeout=120, # Longer timeout for human-in-the-loop + sleep_interval=1.0, + yield_updates=True, # Get all streaming chunks + ): + assert isinstance(message, TaskMessage) + + # Track tool_request messages (agent calling wait_for_confirmation) + if message.content and message.content.type == "tool_request": + seen_tool_request = True + + if not approval_signal_sent: + # Send signal to child workflow to approve the order + # The child workflow ID is fixed as "child-workflow-id" (see tools.py) + # Give Temporal a brief moment to materialize the child workflow + await asyncio.sleep(1) + try: + handle = temporal_client.get_workflow_handle("child-workflow-id") + await handle.signal("fulfill_order_signal", True) + approval_signal_sent = True + except Exception as e: + # It's okay if the workflow completed before we could signal it. + _ = e + + # Track tool_response messages (child workflow completion) + if message.content and message.content.type == "tool_response": + seen_tool_response = True + + # Track agent text messages and their streaming updates + if message.content and message.content.type == "text" and message.content.author == "agent": + content_length = len(message.content.content) if message.content.content else 0 + + # Stop when we get DONE status with actual content + if message.streaming_status == "DONE" and content_length > 0: + found_final_response = True + break # Verify that we saw the complete flow: tool_request -> human approval -> tool_response -> final answer assert seen_tool_request, "Expected to see tool_request message (agent calling wait_for_confirmation)" assert seen_tool_response, "Expected to see tool_response message (child workflow completion after approval)" assert found_final_response, "Expected to see final text response after human approval" - print(f"[DEBUG 080 POLL] ✅ Human-in-the-loop workflow completed successfully!") - class TestStreamingEvents: """Test streaming event sending (backend verification via polling).""" diff --git a/examples/tutorials/10_async/10_temporal/090_claude_agents_sdk_mvp/tests/test_agent.py b/examples/tutorials/10_async/10_temporal/090_claude_agents_sdk_mvp/tests/test_agent.py index aae1cb916..9b93b1b76 100644 --- a/examples/tutorials/10_async/10_temporal/090_claude_agents_sdk_mvp/tests/test_agent.py +++ b/examples/tutorials/10_async/10_temporal/090_claude_agents_sdk_mvp/tests/test_agent.py @@ -18,7 +18,7 @@ # Configuration from environment variables AGENTEX_API_BASE_URL = os.environ.get("AGENTEX_API_BASE_URL", "http://localhost:5003") -AGENT_NAME = os.environ.get("AGENT_NAME", "claude_") +AGENT_NAME = os.environ.get("AGENT_NAME", "claude-mvp-agent") @pytest_asyncio.fixture diff --git a/examples/tutorials/test_utils/async_utils.py b/examples/tutorials/test_utils/async_utils.py index effe87789..2187e98d8 100644 --- a/examples/tutorials/test_utils/async_utils.py +++ b/examples/tutorials/test_utils/async_utils.py @@ -8,6 +8,7 @@ import json import time import asyncio +import contextlib from typing import Optional, AsyncGenerator from datetime import datetime, timezone @@ -94,7 +95,7 @@ async def poll_messages( # Keep track of messages we've already yielded seen_message_ids = set() # Track message content hashes to detect updates (for streaming) - message_content_hashes = {} + message_content_hashes: dict[str, int] = {} start_time = datetime.now() # Poll continuously until timeout @@ -120,6 +121,10 @@ async def poll_messages( if msg_timestamp < messages_created_after: continue + # Some message objects may not have an ID; skip them since we use IDs for dedupe. + if not message.id: + continue + # Check if this is a new message or an update to existing message is_new_message = message.id not in seen_message_ids @@ -177,18 +182,45 @@ async def send_event_and_stream( Raises: Exception: If streaming fails """ - # Send the event - event_content = TextContentParam(type="text", author="user", content=user_message) + queue: asyncio.Queue[dict[str, object] | None] = asyncio.Queue() + stream_exc: BaseException | None = None + + async def consume_stream() -> None: + nonlocal stream_exc + try: + async for event in stream_agent_response( + client=client, + task_id=task_id, + timeout=timeout, + ): + await queue.put(event) + if event.get("type") == "done": + break + except BaseException as e: # noqa: BLE001 - propagate after draining + stream_exc = e + finally: + await queue.put(None) + + # Start consuming the stream *before* sending the event, so we don't block waiting for the first message. + stream_task = asyncio.create_task(consume_stream()) - await client.agents.send_event(agent_id=agent_id, params={"task_id": task_id, "content": event_content}) + try: + event_content = TextContentParam(type="text", author="user", content=user_message) + await client.agents.send_event(agent_id=agent_id, params={"task_id": task_id, "content": event_content}) - # Stream the response using stream_agent_response and yield events up the stack - async for event in stream_agent_response( - client=client, - task_id=task_id, - timeout=timeout, - ): - yield event + while True: + item = await queue.get() + if item is None: + break + yield item + + if stream_exc is not None: + raise stream_exc + finally: + if not stream_task.done(): + stream_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await stream_task async def stream_agent_response( @@ -220,9 +252,9 @@ async def stream_agent_response( yield event except asyncio.TimeoutError: - print(f"[DEBUG] Stream timed out after {timeout}s") + raise except Exception as e: - print(f"[DEBUG] Stream error: {e}") + raise async def stream_task_messages( diff --git a/uv.lock b/uv.lock index 0a0242aed..391297102 100644 --- a/uv.lock +++ b/uv.lock @@ -8,7 +8,7 @@ resolution-markers = [ [[package]] name = "agentex-sdk" -version = "0.6.7" +version = "0.8.0" source = { editable = "." } dependencies = [ { name = "aiohttp" },