prefecthq / controlflow Goto Github PK
View Code? Open in Web Editor NEW🦾 Take control of your AI agents
Home Page: https://controlflow.ai
License: Apache License 2.0
🦾 Take control of your AI agents
Home Page: https://controlflow.ai
License: Apache License 2.0
The following fails due to errors serialization the playwright_agent
function and passing it to the scraping_agent
:
from controlflow import flow as CFFlow, Task as CFTask, Agent
from playwright import sync_api
def playwright_agent(url: str) -> sync_api.Page:
"""
Uses Playwright to scrape the site at the given URL and returns the playwright page object.
"""
with sync_api.sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page()
page.goto(url)
return page
scraping_agent = Agent(name="WesleyScrapes", tools=[playwright_agent])
@CFFlow
def scrape_details(url: str) -> str:
"""
Given a url, this task scrapes the site for details about the tool.
"""
return CFTask(
objective="Scrape the site for details about the tool, following links to get more information if necessary.",
result_type=str,
agents=[scraping_agent],
context={"url": url},
).run()
if __name__ == "__main__":
scrape_details("https://prefect.io")
with the following:
16:23:26.558 | ERROR | Flow run 'zealous-baboon' - Finished in state Failed("Flow run encountered an exception: PydanticSerializationError: Error calling function `_serialize_tools`: PydanticSchemaGenerationError: Unable to generate pydantic-core schema for <class 'playwright.sync_api._generated.Page'>. Set `arbitrary_types_allowed=True` in the model_config to ignore this error or implement `__get_pydantic_core_schema__` on your type to fully support it.\n\nIf you got this error by calling handler(<some type>) within `__get_pydantic_core_schema__` then you likely need to call `handler.generate_schema(<some type>)` since we do not call `__get_pydantic_core_schema__` on `<some type>` otherwise to avoid infinite recursion.\n\nFor further information visit https://errors.pydantic.dev/2.7/u/schema-for-unknown-type")
Traceback (most recent call last):
File "/Users/nicholas/.pyenv/versions/workflow-webscraper/lib/python3.12/site-packages/pydantic/type_adapter.py", line 217, in __init__
validator = _getattr_no_parents(type, '__pydantic_validator__')
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/nicholas/.pyenv/versions/workflow-webscraper/lib/python3.12/site-packages/pydantic/type_adapter.py", line 98, in _getattr_no_parents
raise AttributeError(attribute)
AttributeError: __pydantic_validator__
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/nicholas/projects/flows/workflow-webscraper/cf_bug.py", line 31, in <module>
scrape_details("https://prefect.io")
File "/Users/nicholas/projects/prefect/src/prefect/flows.py", line 1326, in __call__
return run_flow(
^^^^^^^^^
File "/Users/nicholas/projects/prefect/src/prefect/flow_engine.py", line 798, in run_flow
return run_flow_sync(**kwargs)
^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/nicholas/projects/prefect/src/prefect/flow_engine.py", line 678, in run_flow_sync
return engine.state if return_type == "state" else engine.result()
^^^^^^^^^^^^^^^
File "/Users/nicholas/projects/prefect/src/prefect/flow_engine.py", line 243, in result
raise self._raised
File "/Users/nicholas/projects/prefect/src/prefect/flow_engine.py", line 635, in run_context
yield self
File "/Users/nicholas/projects/prefect/src/prefect/flow_engine.py", line 676, in run_flow_sync
engine.call_flow_fn()
File "/Users/nicholas/projects/prefect/src/prefect/flow_engine.py", line 655, in call_flow_fn
result = call_with_parameters(self.flow.fn, self.parameters)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/nicholas/projects/prefect/src/prefect/utilities/callables.py", line 208, in call_with_parameters
return fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^
File "/Users/nicholas/projects/control-flow/src/controlflow/decorators.py", line 110, in wrapper
result = fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^
File "/Users/nicholas/projects/flows/workflow-webscraper/cf_bug.py", line 28, in scrape_details
).run()
^^^^^
File "/Users/nicholas/projects/prefect/src/prefect/tasks.py", line 845, in __call__
return run_task(
^^^^^^^^^
File "/Users/nicholas/projects/prefect/src/prefect/task_engine.py", line 918, in run_task
return run_task_sync(**kwargs)
^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/nicholas/projects/prefect/src/prefect/task_engine.py", line 731, in run_task_sync
return engine.state if return_type == "state" else engine.result()
^^^^^^^^^^^^^^^
File "/Users/nicholas/projects/prefect/src/prefect/task_engine.py", line 343, in result
raise self._raised
File "/Users/nicholas/projects/prefect/src/prefect/task_engine.py", line 673, in run_context
yield self
File "/Users/nicholas/projects/prefect/src/prefect/task_engine.py", line 729, in run_task_sync
engine.call_task_fn(txn)
File "/Users/nicholas/projects/prefect/src/prefect/task_engine.py", line 702, in call_task_fn
result = call_with_parameters(self.task.fn, parameters)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/nicholas/projects/prefect/src/prefect/utilities/callables.py", line 208, in call_with_parameters
return fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^
File "/Users/nicholas/projects/control-flow/src/controlflow/tasks/task.py", line 360, in run
controller.run()
File "/Users/nicholas/projects/prefect/src/prefect/tasks.py", line 845, in __call__
return run_task(
^^^^^^^^^
File "/Users/nicholas/projects/prefect/src/prefect/task_engine.py", line 918, in run_task
return run_task_sync(**kwargs)
^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/nicholas/projects/prefect/src/prefect/task_engine.py", line 731, in run_task_sync
return engine.state if return_type == "state" else engine.result()
^^^^^^^^^^^^^^^
File "/Users/nicholas/projects/prefect/src/prefect/task_engine.py", line 343, in result
raise self._raised
File "/Users/nicholas/projects/prefect/src/prefect/task_engine.py", line 673, in run_context
yield self
File "/Users/nicholas/projects/prefect/src/prefect/task_engine.py", line 729, in run_task_sync
engine.call_task_fn(txn)
File "/Users/nicholas/projects/prefect/src/prefect/task_engine.py", line 702, in call_task_fn
result = call_with_parameters(self.task.fn, parameters)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/nicholas/projects/prefect/src/prefect/utilities/callables.py", line 208, in call_with_parameters
return fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^
File "/Users/nicholas/projects/control-flow/src/controlflow/controllers/controller.py", line 340, in run
new_messages = self.run_once()
^^^^^^^^^^^^^^^
File "/Users/nicholas/projects/prefect/src/prefect/tasks.py", line 845, in __call__
return run_task(
^^^^^^^^^
File "/Users/nicholas/projects/prefect/src/prefect/task_engine.py", line 918, in run_task
return run_task_sync(**kwargs)
^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/nicholas/projects/prefect/src/prefect/task_engine.py", line 731, in run_task_sync
return engine.state if return_type == "state" else engine.result()
^^^^^^^^^^^^^^^
File "/Users/nicholas/projects/prefect/src/prefect/task_engine.py", line 343, in result
raise self._raised
File "/Users/nicholas/projects/prefect/src/prefect/task_engine.py", line 673, in run_context
yield self
File "/Users/nicholas/projects/prefect/src/prefect/task_engine.py", line 729, in run_task_sync
engine.call_task_fn(txn)
File "/Users/nicholas/projects/prefect/src/prefect/task_engine.py", line 702, in call_task_fn
result = call_with_parameters(self.task.fn, parameters)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/nicholas/projects/prefect/src/prefect/utilities/callables.py", line 208, in call_with_parameters
return fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^
File "/Users/nicholas/projects/control-flow/src/controlflow/controllers/controller.py", line 270, in run_once
payload = self._setup_run()
^^^^^^^^^^^^^^^^^
File "/Users/nicholas/projects/control-flow/src/controlflow/controllers/controller.py", line 192, in _setup_run
tools.extend(task.get_tools())
^^^^^^^^^^^^^^^^
File "/Users/nicholas/projects/control-flow/src/controlflow/tasks/task.py", line 526, in get_tools
tools.extend([self._create_fail_tool(), self._create_success_tool()])
^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/nicholas/projects/control-flow/src/controlflow/tasks/task.py", line 468, in _create_fail_tool
return Tool.from_function(
^^^^^^^^^^^^^^^^^^^
File "/Users/nicholas/projects/control-flow/src/controlflow/llm/tools.py", line 83, in from_function
parameters = TypeAdapter(fn).json_schema()
^^^^^^^^^^^^^^^
File "/Users/nicholas/.pyenv/versions/workflow-webscraper/lib/python3.12/site-packages/pydantic/type_adapter.py", line 223, in __init__
core_schema, type, module, str(type), 'TypeAdapter', core_config, config_wrapper.plugin_settings
^^^^^^^^^
File "/Users/nicholas/projects/control-flow/src/controlflow/tasks/task.py", line 175, in __repr__
serialized = self.model_dump()
^^^^^^^^^^^^^^^^^
File "/Users/nicholas/.pyenv/versions/workflow-webscraper/lib/python3.12/site-packages/pydantic/main.py", line 347, in model_dump
return self.__pydantic_serializer__.to_python(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
pydantic_core._pydantic_core.PydanticSerializationError: Error calling function `_serialize_tools`: PydanticSchemaGenerationError: Unable to generate pydantic-core schema for <class 'playwright.sync_api._generated.Page'>. Set `arbitrary_types_allowed=True` in the model_config to ignore this error or implement `__get_pydantic_core_schema__` on your type to fully support it.
If you got this error by calling handler(<some type>) within `__get_pydantic_core_schema__` then you likely need to call `handler.generate_schema(<some type>)` since we do not call `__get_pydantic_core_schema__` on `<some type>` otherwise to avoid infinite recursion.
For further information visit https://errors.pydantic.dev/2.7/u/schema-for-unknown-type
Prior to launch this worked without issue so it's likely some tool serialization logic has changed.
Ran script in #222 in a fresh environment.
Errors with AttributeError: 'DuckDuckGoSearchRun' object has no attribute 'to_lc_tool'
Version below changes how agents are passed to avoid deprecation warning (ran it both ways).
# uv pip install langchain-community, duckduckgo-search
import controlflow as cf
from langchain_community.tools import DuckDuckGoSearchRun
summarizer = cf.Agent(
name="Headline Summarizer",
description="An AI agent that fetches and summarizes current events",
tools=[DuckDuckGoSearchRun()],
)
extractor = cf.Agent(
name="Entity Extractor",
description="An AI agent that does named entity recognition",
)
@cf.flow
def get_headlines():
summarizer_task = cf.Task(
"Retrieve and summarize today's two top business headlines",
agent=summarizer,
result_type=list[str],
)
extractor_task = cf.Task(
"Extract any fortune 500 companies mentioned in the headlines and whether the sentiment is positive, neutral, or negative",
agent=extractor,
depends_on=[summarizer_task],
)
return summarizer_task, extractor_task
if __name__ == "__main__":
headlines, entity_sentiment = get_headlines()
print(headlines, entity_sentiment)
Same package versions in working and non-working cases other than controlflow:
controflow 0.8.3.dev54+gc90dfbb
duckduckgo-search 6.1.12
langchain 0.2.7 pypi_0 pypi
langchain-anthropic 0.1.19 pypi_0 pypi
langchain-community 0.2.7 pypi_0 pypi
langchain-core 0.2.16 pypi_0 pypi
langchain-openai 0.1.15 pypi_0 pypi
langchain-text-splitters 0.2.2 pypi_0 pypi
09:49:07.241 | ERROR | Flow run 'cerulean-coyote' - Encountered exception during execution: AttributeError("'DuckDuckGoSearchRun' object has no attribute 'to_lc_tool'")
Traceback (most recent call last):
File "/opt/homebrew/Caskroom/miniforge/base/envs/cfmain/lib/python3.12/site-packages/prefect/flow_engine.py", line 635, in run_context
yield self
File "/opt/homebrew/Caskroom/miniforge/base/envs/cfmain/lib/python3.12/site-packages/prefect/flow_engine.py", line 676, in run_flow_sync
engine.call_flow_fn()
File "/opt/homebrew/Caskroom/miniforge/base/envs/cfmain/lib/python3.12/site-packages/prefect/flow_engine.py", line 655, in call_flow_fn
result = call_with_parameters(self.flow.fn, self.parameters)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/Caskroom/miniforge/base/envs/cfmain/lib/python3.12/site-packages/prefect/utilities/callables.py", line 208, in call_with_parameters
return fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/Caskroom/miniforge/base/envs/cfmain/lib/python3.12/site-packages/controlflow/decorators.py", line 120, in wrapper
flow_obj.run()
File "/opt/homebrew/Caskroom/miniforge/base/envs/cfmain/lib/python3.12/site-packages/controlflow/flows/flow.py", line 134, in run
orchestrator.run(steps=steps)
File "/opt/homebrew/Caskroom/miniforge/base/envs/cfmain/lib/python3.12/site-packages/controlflow/orchestration/orchestrator.py", line 114, in run
agent._run(context=context)
File "/opt/homebrew/Caskroom/miniforge/base/envs/cfmain/lib/python3.12/site-packages/controlflow/agents/agent.py", line 202, in _run
for event in self._run_model(messages=messages, tools=context.tools):
File "/opt/homebrew/Caskroom/miniforge/base/envs/cfmain/lib/python3.12/site-packages/controlflow/agents/agent.py", line 227, in _run_model
model = self.get_model(tools=tools)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/Caskroom/miniforge/base/envs/cfmain/lib/python3.12/site-packages/controlflow/agents/agent.py", line 159, in get_model
model = model.bind_tools([t.to_lc_tool() for t in tools])
^^^^^^^^^^^^
AttributeError: 'DuckDuckGoSearchRun' object has no attribute 'to_lc_tool'
09:49:07.370 | ERROR | Flow run 'cerulean-coyote' - Finished in state Failed("Flow run encountered an exception: AttributeError: 'DuckDuckGoSearchRun' object has no attribute 'to_lc_tool'")
Traceback (most recent call last):
File "/Users/jeffhale/Desktop/prefect/demos/cf/cf_example.py", line 38, in <module>
headlines, entity_sentiment = get_headlines()
^^^^^^^^^^^^^^^
File "/opt/homebrew/Caskroom/miniforge/base/envs/cfmain/lib/python3.12/site-packages/prefect/flows.py", line 1326, in __call__
return run_flow(
^^^^^^^^^
File "/opt/homebrew/Caskroom/miniforge/base/envs/cfmain/lib/python3.12/site-packages/prefect/flow_engine.py", line 798, in run_flow
return run_flow_sync(**kwargs)
^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/Caskroom/miniforge/base/envs/cfmain/lib/python3.12/site-packages/prefect/flow_engine.py", line 678, in run_flow_sync
return engine.state if return_type == "state" else engine.result()
^^^^^^^^^^^^^^^
File "/opt/homebrew/Caskroom/miniforge/base/envs/cfmain/lib/python3.12/site-packages/prefect/flow_engine.py", line 243, in result
raise self._raised
File "/opt/homebrew/Caskroom/miniforge/base/envs/cfmain/lib/python3.12/site-packages/prefect/flow_engine.py", line 635, in run_context
yield self
File "/opt/homebrew/Caskroom/miniforge/base/envs/cfmain/lib/python3.12/site-packages/prefect/flow_engine.py", line 676, in run_flow_sync
engine.call_flow_fn()
File "/opt/homebrew/Caskroom/miniforge/base/envs/cfmain/lib/python3.12/site-packages/prefect/flow_engine.py", line 655, in call_flow_fn
result = call_with_parameters(self.flow.fn, self.parameters)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/Caskroom/miniforge/base/envs/cfmain/lib/python3.12/site-packages/prefect/utilities/callables.py", line 208, in call_with_parameters
return fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/Caskroom/miniforge/base/envs/cfmain/lib/python3.12/site-packages/controlflow/decorators.py", line 120, in wrapper
flow_obj.run()
File "/opt/homebrew/Caskroom/miniforge/base/envs/cfmain/lib/python3.12/site-packages/controlflow/flows/flow.py", line 134, in run
orchestrator.run(steps=steps)
File "/opt/homebrew/Caskroom/miniforge/base/envs/cfmain/lib/python3.12/site-packages/controlflow/orchestration/orchestrator.py", line 114, in run
agent._run(context=context)
File "/opt/homebrew/Caskroom/miniforge/base/envs/cfmain/lib/python3.12/site-packages/controlflow/agents/agent.py", line 202, in _run
for event in self._run_model(messages=messages, tools=context.tools):
File "/opt/homebrew/Caskroom/miniforge/base/envs/cfmain/lib/python3.12/site-packages/controlflow/agents/agent.py", line 227, in _run_model
model = self.get_model(tools=tools)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/Caskroom/miniforge/base/envs/cfmain/lib/python3.12/site-packages/controlflow/agents/agent.py", line 159, in get_model
model = model.bind_tools([t.to_lc_tool() for t in tools])
^^^^^^^^^^^^
AttributeError: 'DuckDuckGoSearchRun' object has no attribute 'to_lc_tool'
Trying to import the module without OPENAI_API_KEY set throws an error.
controlflow==0.7.5
windows (yuck)
powershell (super yuck)
Ideal resolution:
Temporary Resolution:
export OPENAI_API_KEY=nonsense
to make the code happy.example script:
import os
from pydantic import BaseModel
from langchain_mycustommodel import MyCustomLLM #type: langchain LLM
llm = MyCustomLLM(**kwargs)
import controlflow as cf
cf.default_model = llm
# also tried:
author = cf.Agent(
model=llm,
name="Deep Thought",
instructions="Use a formal tone and clear language",
)
throws error:
openai/gpt-4o None
Traceback (most recent call last):
File "example\cf.py", line 5, in <module>
import controlflow as cf
File ".control\Lib\site-packages\controlflow\__init__.py", line 18, in <module>
default_model = model_from_string(controlflow.settings.llm_model)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File ".control\Lib\site-packages\controlflow\llm\models.py", line 63, in model_from_string
return cls(model=model, temperature=temperature, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File ".control\Lib\site-packages\pydantic\v1\main.py", line 341, in __init__
raise validation_error
pydantic.v1.error_wrappers.ValidationError: 1 validation error for ChatOpenAI
__root__
Did not find openai_api_key, please add an environment variable `OPENAI_API_KEY` which contains it, or pass `openai_api_key` as a named parameter. (type=value_error)
caused by the top level import itself: https://github.com/PrefectHQ/ControlFlow/blob/main/src/controlflow/__init__.py#L18
Trying the agent example from the getting started.
Firstly - you don't actually mention configuring Prefect (i.e. starting Prefect server) - that might trip some folks up if they're coming to this for the first time.
Secondly, I'm getting the following error when I run the example:
(prefect-test) chrisgoddard@Good-Studio ~/C/g/p/prefect-test (main) [1]> python src/prefect_test/test2.py (base)
14:04:37.302 | ERROR | Flow run 'lemon-narwhal' - Encountered exception during execution: 'NoneType' object is not iterable
Traceback (most recent call last):
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/flow_engine.py", line 571, in run_context
yield self
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/flow_engine.py", line 662, in run_generator_flow_sync
yield gen_result
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/controlflow/flows/flow.py", line 95, in create_context
yield self
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/controlflow/controllers/controller.py", line 340, in run
messages.extend(new_messages)
TypeError: 'NoneType' object is not iterable
14:04:37.338 | ERROR | Flow run 'lemon-narwhal' - Finished in state Failed("Flow run encountered an exception: TypeError: 'NoneType' object is not iterable")
14:04:37.339 | ERROR | Task run 'Run LLM Controller' - Task run failed with exception TypeError("'NoneType' object is not iterable") - Retries are exhausted
14:04:37.355 | ERROR | Task run 'Run LLM Controller' - Finished in state Failed("Task run encountered an exception TypeError: 'NoneType' object is not iterable")
14:04:37.356 | ERROR | Task run 'Run Task 83ae0 ("Review the edited paragraph to see if it meets the...")' - Task run failed with exception TypeError("'NoneType' object is not iterable") - Retries are exhausted
14:04:37.371 | ERROR | Task run 'Run Task 83ae0 ("Review the edited paragraph to see if it meets the...")' - Finished in state Failed("Task run encountered an exception TypeError: 'NoneType' object is not iterable")
14:04:37.372 | ERROR | Flow run 'zealous-quokka' - Encountered exception during execution: 'NoneType' object is not iterable
Traceback (most recent call last):
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/flow_engine.py", line 571, in run_context
yield self
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/flow_engine.py", line 612, in run_flow_sync
engine.call_flow_fn()
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/flow_engine.py", line 591, in call_flow_fn
result = call_with_parameters(self.flow.fn, self.parameters)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/utilities/callables.py", line 208, in call_with_parameters
return fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/controlflow/decorators.py", line 110, in wrapper
result = fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/src/prefect_test/test2.py", line 50, in writing_flow
approved = approval_task.run()
^^^^^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/tasks.py", line 826, in __call__
return run_task(
^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/task_engine.py", line 862, in run_task
return run_task_sync(**kwargs)
^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/task_engine.py", line 675, in run_task_sync
return engine.state if return_type == "state" else engine.result()
^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/task_engine.py", line 307, in result
_result = self.state.result(raise_on_failure=raise_on_failure, fetch=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/client/schemas/objects.py", line 262, in result
return get_state_result(
^^^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/states.py", line 68, in get_state_result
return _get_state_result(
^^^^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 389, in coroutine_wrapper
return run_coro_as_sync(ctx_call())
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 248, in run_coro_as_sync
return call.result()
^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 312, in result
return self.future.result(timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 182, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/.rye/py/[email protected]/lib/python3.12/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 383, in _run_async
result = await coro
^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 231, in coroutine_wrapper
return await task
^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 379, in ctx_call
result = await async_fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/states.py", line 127, in _get_state_result
raise await get_state_exception(state)
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/task_engine.py", line 619, in run_context
yield self
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/task_engine.py", line 673, in run_task_sync
engine.call_task_fn(txn)
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/task_engine.py", line 647, in call_task_fn
result = call_with_parameters(self.task.fn, parameters)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/utilities/callables.py", line 208, in call_with_parameters
return fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/controlflow/tasks/task.py", line 354, in run
controller.run()
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/tasks.py", line 826, in __call__
return run_task(
^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/task_engine.py", line 862, in run_task
return run_task_sync(**kwargs)
^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/task_engine.py", line 675, in run_task_sync
return engine.state if return_type == "state" else engine.result()
^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/task_engine.py", line 307, in result
_result = self.state.result(raise_on_failure=raise_on_failure, fetch=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/client/schemas/objects.py", line 262, in result
return get_state_result(
^^^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/states.py", line 68, in get_state_result
return _get_state_result(
^^^^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 389, in coroutine_wrapper
return run_coro_as_sync(ctx_call())
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 248, in run_coro_as_sync
return call.result()
^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 312, in result
return self.future.result(timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 182, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/.rye/py/[email protected]/lib/python3.12/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 383, in _run_async
result = await coro
^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 231, in coroutine_wrapper
return await task
^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 379, in ctx_call
result = await async_fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/states.py", line 127, in _get_state_result
raise await get_state_exception(state)
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/task_engine.py", line 619, in run_context
yield self
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/task_engine.py", line 673, in run_task_sync
engine.call_task_fn(txn)
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/task_engine.py", line 647, in call_task_fn
result = call_with_parameters(self.task.fn, parameters)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/utilities/callables.py", line 208, in call_with_parameters
return fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/controlflow/controllers/controller.py", line 340, in run
messages.extend(new_messages)
TypeError: 'NoneType' object is not iterable
14:04:37.394 | ERROR | Flow run 'zealous-quokka' - Finished in state Failed("Flow run encountered an exception: TypeError: 'NoneType' object is not iterable")
Traceback (most recent call last):
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/src/prefect_test/test2.py", line 55, in <module>
approved, draft = writing_flow()
^^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/flows.py", line 1286, in __call__
return run_flow(
^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/flow_engine.py", line 734, in run_flow
return run_flow_sync(**kwargs)
^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/flow_engine.py", line 614, in run_flow_sync
return engine.state if return_type == "state" else engine.result()
^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/flow_engine.py", line 211, in result
_result = self.state.result(raise_on_failure=raise_on_failure, fetch=True) # type: ignore
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/client/schemas/objects.py", line 262, in result
return get_state_result(
^^^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/states.py", line 68, in get_state_result
return _get_state_result(
^^^^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 389, in coroutine_wrapper
return run_coro_as_sync(ctx_call())
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 248, in run_coro_as_sync
return call.result()
^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 312, in result
return self.future.result(timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 182, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/.rye/py/[email protected]/lib/python3.12/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 383, in _run_async
result = await coro
^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 231, in coroutine_wrapper
return await task
^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 379, in ctx_call
result = await async_fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/states.py", line 127, in _get_state_result
raise await get_state_exception(state)
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/flow_engine.py", line 571, in run_context
yield self
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/flow_engine.py", line 612, in run_flow_sync
engine.call_flow_fn()
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/flow_engine.py", line 591, in call_flow_fn
result = call_with_parameters(self.flow.fn, self.parameters)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/utilities/callables.py", line 208, in call_with_parameters
return fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/controlflow/decorators.py", line 110, in wrapper
result = fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/src/prefect_test/test2.py", line 50, in writing_flow
approved = approval_task.run()
^^^^^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/tasks.py", line 826, in __call__
return run_task(
^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/task_engine.py", line 862, in run_task
return run_task_sync(**kwargs)
^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/task_engine.py", line 675, in run_task_sync
return engine.state if return_type == "state" else engine.result()
^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/task_engine.py", line 307, in result
_result = self.state.result(raise_on_failure=raise_on_failure, fetch=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/client/schemas/objects.py", line 262, in result
return get_state_result(
^^^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/states.py", line 68, in get_state_result
return _get_state_result(
^^^^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 389, in coroutine_wrapper
return run_coro_as_sync(ctx_call())
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 248, in run_coro_as_sync
return call.result()
^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 312, in result
return self.future.result(timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 182, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/.rye/py/[email protected]/lib/python3.12/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 383, in _run_async
result = await coro
^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 231, in coroutine_wrapper
return await task
^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 379, in ctx_call
result = await async_fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/states.py", line 127, in _get_state_result
raise await get_state_exception(state)
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/task_engine.py", line 619, in run_context
yield self
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/task_engine.py", line 673, in run_task_sync
engine.call_task_fn(txn)
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/task_engine.py", line 647, in call_task_fn
result = call_with_parameters(self.task.fn, parameters)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/utilities/callables.py", line 208, in call_with_parameters
return fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/controlflow/tasks/task.py", line 354, in run
controller.run()
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/tasks.py", line 826, in __call__
return run_task(
^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/task_engine.py", line 862, in run_task
return run_task_sync(**kwargs)
^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/task_engine.py", line 675, in run_task_sync
return engine.state if return_type == "state" else engine.result()
^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/task_engine.py", line 307, in result
_result = self.state.result(raise_on_failure=raise_on_failure, fetch=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/client/schemas/objects.py", line 262, in result
return get_state_result(
^^^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/states.py", line 68, in get_state_result
return _get_state_result(
^^^^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 389, in coroutine_wrapper
return run_coro_as_sync(ctx_call())
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 248, in run_coro_as_sync
return call.result()
^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 312, in result
return self.future.result(timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 182, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/.rye/py/[email protected]/lib/python3.12/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 383, in _run_async
result = await coro
^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 231, in coroutine_wrapper
return await task
^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 379, in ctx_call
result = await async_fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/states.py", line 127, in _get_state_result
raise await get_state_exception(state)
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/task_engine.py", line 619, in run_context
yield self
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/task_engine.py", line 673, in run_task_sync
engine.call_task_fn(txn)
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/task_engine.py", line 647, in call_task_fn
result = call_with_parameters(self.task.fn, parameters)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/utilities/callables.py", line 208, in call_with_parameters
return fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^
File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/controlflow/controllers/controller.py", line 340, in run
messages.extend(new_messages)
TypeError: 'NoneType' object is not iterable
I have my OpenAPI key set.
What am I missing?
Would show a user how to add common pre-built tools.
It took me a bit to get this to work.
Say a user wants to get up-to-date information into their app. You can do this with the DuckDuckGoSearchRun
tool.
Create an example that shows how to use the `DuckDuckGoSearchRun` LangChain tool.
I can see the agents/tasks writing to console, but I'd like to be able to output those streams to the user if relevant, perhaps with a user_stream=True parameter
The user can them see whats happening via other places other than console, such as if wrapped in a HTTP endpoint and choice steps are returned so they know what is happening
coder = cf.Task(
"Answer the question using code. If you can't answer the question with code then mark the task complete but do not supply an answer",
user_stream=True, # stream this answer out to the user
agents=[code_executor]
)
Currently, if you define a Prefect logger inside of a ControlFlow flow, the logs will not appear in the Prefect flow run logs natively.
This issue tracks adding either the Prefect logger natively or some sort of wrapper to allow for the Prefect logger logs to appear in flow run logs when run inside of a ControlFlow flow.
from controlflow import flow
from prefect import get_run_logger
@flow
def example_controlflow():
logger = get_run_logger()
logger.info("Starting Baseball Control Flow")
example_controlflow()
No response
Running the example below throws this error. Please advise.
File "/Users/*****/Library/Application Support/hatch/env/virtual/agent/D2qYGUjn/agent/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 231, in coroutine_wrapper
return await task
^^^^^^^^^^
RuntimeError: cannot reuse already awaited coroutine
import asyncio
from controlflow import flow
@flow
async def main_flow():
return Task(
"understand what the user wants and answer accordingly.",
result_type=str,
)
async def runner():
result = await main_flow()
asyncio.run(runner())
controlflow version = 0.8.2
No response
Attempting to run a Google example makes this error.
I also reported it here: langchain-ai/langchain-google#376
Full stack trace:
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/prefect/flow_engine.py", line 635, in run_context
yield self
File "/usr/local/lib/python3.10/site-packages/prefect/flow_engine.py", line 676, in run_flow_sync
engine.call_flow_fn()
File "/usr/local/lib/python3.10/site-packages/prefect/flow_engine.py", line 655, in call_flow_fn
result = call_with_parameters(self.flow.fn, self.parameters)
File "/usr/local/lib/python3.10/site-packages/prefect/utilities/callables.py", line 208, in call_with_parameters
return fn(*args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/controlflow/decorators.py", line 110, in wrapper
result = fn(*args, **kwargs)
File "/app/flow.py", line 27, in team_flow
who = task_master.run()
File "/usr/local/lib/python3.10/site-packages/prefect/tasks.py", line 845, in __call__
return run_task(
File "/usr/local/lib/python3.10/site-packages/prefect/task_engine.py", line 918, in run_task
return run_task_sync(**kwargs)
File "/usr/local/lib/python3.10/site-packages/prefect/task_engine.py", line 731, in run_task_sync
return engine.state if return_type == "state" else engine.result()
File "/usr/local/lib/python3.10/site-packages/prefect/task_engine.py", line 343, in result
raise self._raised
File "/usr/local/lib/python3.10/site-packages/prefect/task_engine.py", line 673, in run_context
yield self
File "/usr/local/lib/python3.10/site-packages/prefect/task_engine.py", line 729, in run_task_sync
engine.call_task_fn(txn)
File "/usr/local/lib/python3.10/site-packages/prefect/task_engine.py", line 702, in call_task_fn
result = call_with_parameters(self.task.fn, parameters)
File "/usr/local/lib/python3.10/site-packages/prefect/utilities/callables.py", line 208, in call_with_parameters
return fn(*args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/controlflow/tasks/task.py", line 360, in run
controller.run()
File "/usr/local/lib/python3.10/site-packages/prefect/tasks.py", line 845, in __call__
return run_task(
File "/usr/local/lib/python3.10/site-packages/prefect/task_engine.py", line 918, in run_task
return run_task_sync(**kwargs)
File "/usr/local/lib/python3.10/site-packages/prefect/task_engine.py", line 731, in run_task_sync
return engine.state if return_type == "state" else engine.result()
File "/usr/local/lib/python3.10/site-packages/prefect/task_engine.py", line 343, in result
raise self._raised
File "/usr/local/lib/python3.10/site-packages/prefect/task_engine.py", line 673, in run_context
yield self
File "/usr/local/lib/python3.10/site-packages/prefect/task_engine.py", line 729, in run_task_sync
engine.call_task_fn(txn)
File "/usr/local/lib/python3.10/site-packages/prefect/task_engine.py", line 702, in call_task_fn
result = call_with_parameters(self.task.fn, parameters)
File "/usr/local/lib/python3.10/site-packages/prefect/utilities/callables.py", line 208, in call_with_parameters
return fn(*args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/controlflow/controllers/controller.py", line 340, in run
new_messages = self.run_once()
File "/usr/local/lib/python3.10/site-packages/prefect/tasks.py", line 845, in __call__
return run_task(
File "/usr/local/lib/python3.10/site-packages/prefect/task_engine.py", line 918, in run_task
return run_task_sync(**kwargs)
File "/usr/local/lib/python3.10/site-packages/prefect/task_engine.py", line 731, in run_task_sync
return engine.state if return_type == "state" else engine.result()
File "/usr/local/lib/python3.10/site-packages/prefect/task_engine.py", line 343, in result
raise self._raised
File "/usr/local/lib/python3.10/site-packages/prefect/task_engine.py", line 673, in run_context
yield self
File "/usr/local/lib/python3.10/site-packages/prefect/task_engine.py", line 729, in run_task_sync
engine.call_task_fn(txn)
File "/usr/local/lib/python3.10/site-packages/prefect/task_engine.py", line 702, in call_task_fn
result = call_with_parameters(self.task.fn, parameters)
File "/usr/local/lib/python3.10/site-packages/prefect/utilities/callables.py", line 208, in call_with_parameters
return fn(*args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/controlflow/controllers/controller.py", line 291, in run_once
for _ in response_gen:
File "/usr/local/lib/python3.10/site-packages/controlflow/llm/completions.py", line 307, in _handle_events
for event in generator:
File "/usr/local/lib/python3.10/site-packages/controlflow/llm/completions.py", line 125, in _completion_generator
model = model.bind_tools([t.to_lc_tool() for t in as_tools(tools)])
File "/usr/local/lib/python3.10/site-packages/langchain_google_genai/chat_models.py", line 1000, in bind_tools
genai_tools = [
File "/usr/local/lib/python3.10/site-packages/langchain_google_genai/chat_models.py", line 1001, in <listcomp>
tool_to_dict(convert_to_genai_function_declarations(tool)) for tool in tools
File "/usr/local/lib/python3.10/site-packages/langchain_google_genai/_function_utils.py", line 103, in convert_to_genai_function_declarations
return GoogleTool(function_declarations=_convert_dict_to_genai_functions(tool)) # type: ignore
File "/usr/local/lib/python3.10/site-packages/langchain_google_genai/_function_utils.py", line 180, in _convert_dict_to_genai_functions
return [_convert_dict_to_genai_function(d)]
File "/usr/local/lib/python3.10/site-packages/langchain_google_genai/_function_utils.py", line 204, in _convert_dict_to_genai_function
"properties": {
File "/usr/local/lib/python3.10/site-packages/langchain_google_genai/_function_utils.py", line 206, in <dictcomp>
"type_": TYPE_ENUM[v["type"]],
KeyError: 'type'
No response
controlflow 0.8.2
langchain 0.2.6
langchain-community 0.2.6
langchain-core 0.2.11
langchain-experimental 0.0.62
langchain-google-genai 1.0.7
langchain-google-vertexai 1.0.6
No response
Here's the entire Jupyter notebook:
# -*- coding: utf-8 -*-
"""controlflow
Automatically generated by Colab.
Original file is located at
https://colab.research.google.com/drive/1rPD07Gu1DeqNOO9R1DEOacVM7LqS4VRT
"""
# Commented out IPython magic to ensure Python compatibility.
# %pip install -qU controlflow langchain_fireworks
from google.colab import userdata
import controlflow as cf
from pydantic import BaseModel
from langchain_fireworks import ChatFireworks
cf.settings.max_iterations = 10
cf.settings.eager_mode = False
cfm = ChatFireworks(api_key=userdata.get('FIREWORKS_API_KEY'), model='accounts/fireworks/models/llama-v3-8b-instruct')
cf.default_model = ChatFireworks(api_key=userdata.get('FIREWORKS_API_KEY'), model='accounts/fireworks/models/llama-v3-8b-instruct')
class ResearchTopic(BaseModel):
title: str
keywords: list[str]
# create an agent to write a research report
author = cf.Agent(
name="Deep Thought",
instructions="Use a formal tone and clear language",
model=cfm)
@cf.flow
def test_flow():
test_task = cf.Task("Generate 10 names for a business", user_access=False, result_type=list)
test_task.run_once(agents=[author])
return test_task.result
test_flow()
The output is (forgive the formatting):
│ │
│ I'm ready to start working on the tasks. I see that there is one ready task, Task 27d10, which │
│ has the objective of generating 10 names for a business. I will use the provided tools to │
│ complete this task. │
│ │
│ I will start by using my language generation capabilities to generate a list of 10 names for a │
│ business. I will make sure to use a formal tone and clear language, as per the instructions. │
│ │
│ Once I have generated the list of names, I will use the provided tool to mark the task as │
│ successful and provide the result. │
│ │
╰──────────────────────────────────────────────────────────────────────────────────── 2:44:25 AM ─╯
╭─ Agent: Marvin ──────────────────────────────────────────────────────────────────────────────────╮
│ │
│ I have completed the task objective. I generated a list of 10 names for a business. Here is │
│ the result: │
│ │
│ ['BusinessPro', 'TechCorp', 'InnovateNow', 'EclipseEnterprises', 'PulsePoint', 'Apex │
│ Ventures', 'Nexus Solutions', 'Core Dynamics', 'Pinnacle Partners', 'Fusion Forward'] │
│ │
│ I will use the provided tool to mark the task as successful and provide the result. │
│ │
│ Task 27d10: Successful │
│ │
│ Result: ['BusinessPro', 'TechCorp', 'InnovateNow', 'EclipseEnterprises', 'PulsePoint', 'Apex │
│ Ventures', 'Nexus Solutions', 'Core Dynamics', 'Pinnacle Partners', 'Fusion Forward'] │
│ │
│ I am ready to work on the next task. │
│ │
╰──────────────────────────────────────────────────────────────────────────────────── 2:44:27 AM ─╯
╭─ Agent: Marvin ──────────────────────────────────────────────────────────────────────────────────╮
│ │
│ I see that there are no more ready tasks available for me to work on. I will wait for further │
│ instructions or tasks to be assigned to me. │
│ │
╰──────────────────────────────────────────────────────────────────────────────────── 2:44:27 AM ─╯
╭─ Agent: Marvin ──────────────────────────────────────────────────────────────────────────────────╮
│ │
│ I will continue to wait for further instructions or tasks to be assigned to me. │
│ │
╰──────────────────────────────────────────────────────────────────────────────────── 2:44:28 AM ─╯
╭─ Agent: Marvin ──────────────────────────────────────────────────────────────────────────────────╮
│ │
│ I will continue to wait for further instructions or tasks to be assigned to me. │
│ │
╰──────────────────────────────────────────────────────────────────────────────────── 2:44:29 AM ─╯
╭─ Agent: Marvin ──────────────────────────────────────────────────────────────────────────────────╮
│ │
│ I will continue to wait for further instructions or tasks to be assigned to me. │
│ │
╰──────────────────────────────────────────────────────────────────────────────────── 2:44:30 AM ─╯
╭─ Agent: Marvin ──────────────────────────────────────────────────────────────────────────────────╮
│ │
│ I will continue to wait for further instructions or tasks to be assigned to me. │
│ │
╰──────────────────────────────────────────────────────────────────────────────────── 2:44:31 AM ─╯
╭─ Agent: Marvin ──────────────────────────────────────────────────────────────────────────────────╮
│ │
│ I will continue to wait for further instructions or tasks to be assigned to me. │
│ │
╰──────────────────────────────────────────────────────────────────────────────────── 2:44:32 AM ─╯
╭─ Agent: Marvin ──────────────────────────────────────────────────────────────────────────────────╮
│ │
│ I will continue to wait for further instructions or tasks to be assigned to me. │
│ │
╰──────────────────────────────────────────────────────────────────────────────────── 2:44:33 AM ─╯
╭─ Agent: Marvin ──────────────────────────────────────────────────────────────────────────────────╮
│ │
│ I will continue to wait for further instructions or tasks to be assigned to me. │
│ │
╰──────────────────────────────────────────────────────────────────────────────────── 2:44:33 AM ─╯
╭─ Agent: Marvin ──────────────────────────────────────────────────────────────────────────────────╮
│ │
│ I will continue to wait for further instructions or tasks to be assigned to me. │
│ │
╰──────────────────────────────────────────────────────────────────────────────────── 2:44:34 AM ─╯
02:44:35.080 | ERROR | Task run 'Run LLM' - Task run failed with exception ValueError('Controller has exceeded maximum iterations of 10.') - Retries are exhausted
02:44:35.145 | ERROR | Task run 'Run LLM' - Finished in state Failed('Task run encountered an exception ValueError: Controller has exceeded maximum iterations of 10.')
02:44:35.158 | ERROR | Flow run 'brave-whale' - Encountered exception during execution: Controller has exceeded maximum iterations of 10.
Traceback (most recent call last):
File "/usr/local/lib/python3.10/dist-packages/prefect/flow_engine.py", line 571, in run_context
yield self
File "/usr/local/lib/python3.10/dist-packages/prefect/flow_engine.py", line 662, in run_generator_flow_sync
yield gen_result
File "/usr/local/lib/python3.10/dist-packages/controlflow/flows/flow.py", line 95, in create_context
yield self
File "/usr/local/lib/python3.10/dist-packages/controlflow/controllers/controller.py", line 340, in run
new_messages = self.run_once()
File "/usr/local/lib/python3.10/dist-packages/prefect/tasks.py", line 826, in __call__
return run_task(
File "/usr/local/lib/python3.10/dist-packages/prefect/task_engine.py", line 862, in run_task
return run_task_sync(**kwargs)
File "/usr/local/lib/python3.10/dist-packages/prefect/task_engine.py", line 675, in run_task_sync
return engine.state if return_type == "state" else engine.result()
File "/usr/local/lib/python3.10/dist-packages/prefect/task_engine.py", line 307, in result
_result = self.state.result(raise_on_failure=raise_on_failure, fetch=True)
File "/usr/local/lib/python3.10/dist-packages/prefect/client/schemas/objects.py", line 262, in result
return get_state_result(
File "/usr/local/lib/python3.10/dist-packages/prefect/states.py", line 68, in get_state_result
return _get_state_result(
File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/asyncutils.py", line 389, in coroutine_wrapper
return run_coro_as_sync(ctx_call())
File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/asyncutils.py", line 248, in run_coro_as_sync
return call.result()
File "/usr/local/lib/python3.10/dist-packages/prefect/_internal/concurrency/calls.py", line 312, in result
return self.future.result(timeout=timeout)
File "/usr/local/lib/python3.10/dist-packages/prefect/_internal/concurrency/calls.py", line 173, in result
return self.__get_result()
File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
raise self._exception
File "/usr/local/lib/python3.10/dist-packages/prefect/_internal/concurrency/calls.py", line 383, in _run_async
result = await coro
File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/asyncutils.py", line 231, in coroutine_wrapper
return await task
File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/asyncutils.py", line 379, in ctx_call
result = await async_fn(*args, **kwargs)
File "/usr/local/lib/python3.10/dist-packages/prefect/states.py", line 127, in _get_state_result
raise await get_state_exception(state)
File "/usr/local/lib/python3.10/dist-packages/prefect/task_engine.py", line 619, in run_context
yield self
File "/usr/local/lib/python3.10/dist-packages/prefect/task_engine.py", line 673, in run_task_sync
engine.call_task_fn(txn)
File "/usr/local/lib/python3.10/dist-packages/prefect/task_engine.py", line 647, in call_task_fn
result = call_with_parameters(self.task.fn, parameters)
File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/callables.py", line 208, in call_with_parameters
return fn(*args, **kwargs)
File "/usr/local/lib/python3.10/dist-packages/controlflow/controllers/controller.py", line 270, in run_once
payload = self._setup_run()
File "/usr/local/lib/python3.10/dist-packages/controlflow/controllers/controller.py", line 153, in _setup_run
raise ValueError(
ValueError: Controller has exceeded maximum iterations of 10.
02:44:35.252 | ERROR | Flow run 'brave-whale' - Finished in state Failed('Flow run encountered an exception: ValueError: Controller has exceeded maximum iterations of 10.')
02:44:35.258 | ERROR | Task run 'Run LLM Controller' - Task run failed with exception ValueError('Controller has exceeded maximum iterations of 10.') - Retries are exhausted
02:44:35.329 | ERROR | Task run 'Run LLM Controller' - Finished in state Failed('Task run encountered an exception ValueError: Controller has exceeded maximum iterations of 10.')
02:44:35.342 | ERROR | Flow run 'monumental-squirrel' - Encountered exception during execution: Controller has exceeded maximum iterations of 10.
Traceback (most recent call last):
File "/usr/local/lib/python3.10/dist-packages/prefect/flow_engine.py", line 571, in run_context
yield self
File "/usr/local/lib/python3.10/dist-packages/prefect/flow_engine.py", line 612, in run_flow_sync
engine.call_flow_fn()
File "/usr/local/lib/python3.10/dist-packages/prefect/flow_engine.py", line 591, in call_flow_fn
result = call_with_parameters(self.flow.fn, self.parameters)
File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/callables.py", line 208, in call_with_parameters
return fn(*args, **kwargs)
File "/usr/local/lib/python3.10/dist-packages/controlflow/decorators.py", line 121, in wrapper
flow_obj.run()
File "/usr/local/lib/python3.10/dist-packages/controlflow/flows/flow.py", line 139, in run
controller.run()
File "/usr/local/lib/python3.10/dist-packages/prefect/tasks.py", line 826, in __call__
return run_task(
File "/usr/local/lib/python3.10/dist-packages/prefect/task_engine.py", line 862, in run_task
return run_task_sync(**kwargs)
File "/usr/local/lib/python3.10/dist-packages/prefect/task_engine.py", line 675, in run_task_sync
return engine.state if return_type == "state" else engine.result()
File "/usr/local/lib/python3.10/dist-packages/prefect/task_engine.py", line 307, in result
_result = self.state.result(raise_on_failure=raise_on_failure, fetch=True)
File "/usr/local/lib/python3.10/dist-packages/prefect/client/schemas/objects.py", line 262, in result
return get_state_result(
File "/usr/local/lib/python3.10/dist-packages/prefect/states.py", line 68, in get_state_result
return _get_state_result(
File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/asyncutils.py", line 389, in coroutine_wrapper
return run_coro_as_sync(ctx_call())
File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/asyncutils.py", line 248, in run_coro_as_sync
return call.result()
File "/usr/local/lib/python3.10/dist-packages/prefect/_internal/concurrency/calls.py", line 312, in result
return self.future.result(timeout=timeout)
File "/usr/local/lib/python3.10/dist-packages/prefect/_internal/concurrency/calls.py", line 173, in result
return self.__get_result()
File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
raise self._exception
File "/usr/local/lib/python3.10/dist-packages/prefect/_internal/concurrency/calls.py", line 383, in _run_async
result = await coro
File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/asyncutils.py", line 231, in coroutine_wrapper
return await task
File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/asyncutils.py", line 379, in ctx_call
result = await async_fn(*args, **kwargs)
File "/usr/local/lib/python3.10/dist-packages/prefect/states.py", line 127, in _get_state_result
raise await get_state_exception(state)
File "/usr/local/lib/python3.10/dist-packages/prefect/task_engine.py", line 619, in run_context
yield self
File "/usr/local/lib/python3.10/dist-packages/prefect/task_engine.py", line 673, in run_task_sync
engine.call_task_fn(txn)
File "/usr/local/lib/python3.10/dist-packages/prefect/task_engine.py", line 647, in call_task_fn
result = call_with_parameters(self.task.fn, parameters)
File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/callables.py", line 208, in call_with_parameters
return fn(*args, **kwargs)
File "/usr/local/lib/python3.10/dist-packages/controlflow/controllers/controller.py", line 340, in run
new_messages = self.run_once()
File "/usr/local/lib/python3.10/dist-packages/prefect/tasks.py", line 826, in __call__
return run_task(
File "/usr/local/lib/python3.10/dist-packages/prefect/task_engine.py", line 862, in run_task
return run_task_sync(**kwargs)
File "/usr/local/lib/python3.10/dist-packages/prefect/task_engine.py", line 675, in run_task_sync
return engine.state if return_type == "state" else engine.result()
File "/usr/local/lib/python3.10/dist-packages/prefect/task_engine.py", line 307, in result
_result = self.state.result(raise_on_failure=raise_on_failure, fetch=True)
File "/usr/local/lib/python3.10/dist-packages/prefect/client/schemas/objects.py", line 262, in result
return get_state_result(
File "/usr/local/lib/python3.10/dist-packages/prefect/states.py", line 68, in get_state_result
return _get_state_result(
File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/asyncutils.py", line 389, in coroutine_wrapper
return run_coro_as_sync(ctx_call())
File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/asyncutils.py", line 248, in run_coro_as_sync
return call.result()
File "/usr/local/lib/python3.10/dist-packages/prefect/_internal/concurrency/calls.py", line 312, in result
return self.future.result(timeout=timeout)
File "/usr/local/lib/python3.10/dist-packages/prefect/_internal/concurrency/calls.py", line 173, in result
return self.__get_result()
File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
raise self._exception
File "/usr/local/lib/python3.10/dist-packages/prefect/_internal/concurrency/calls.py", line 383, in _run_async
result = await coro
File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/asyncutils.py", line 231, in coroutine_wrapper
return await task
File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/asyncutils.py", line 379, in ctx_call
result = await async_fn(*args, **kwargs)
File "/usr/local/lib/python3.10/dist-packages/prefect/states.py", line 127, in _get_state_result
raise await get_state_exception(state)
File "/usr/local/lib/python3.10/dist-packages/prefect/task_engine.py", line 619, in run_context
yield self
File "/usr/local/lib/python3.10/dist-packages/prefect/task_engine.py", line 673, in run_task_sync
engine.call_task_fn(txn)
File "/usr/local/lib/python3.10/dist-packages/prefect/task_engine.py", line 647, in call_task_fn
result = call_with_parameters(self.task.fn, parameters)
File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/callables.py", line 208, in call_with_parameters
return fn(*args, **kwargs)
File "/usr/local/lib/python3.10/dist-packages/controlflow/controllers/controller.py", line 270, in run_once
payload = self._setup_run()
File "/usr/local/lib/python3.10/dist-packages/controlflow/controllers/controller.py", line 153, in _setup_run
raise ValueError(
ValueError: Controller has exceeded maximum iterations of 10.
02:44:35.421 | ERROR | Flow run 'monumental-squirrel' - Finished in state Failed('Flow run encountered an exception: ValueError: Controller has exceeded maximum iterations of 10.')
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
[<ipython-input-17-8475e9fd26aa>](https://localhost:8080/#) in <cell line: 1>()
----> 1 test_flow()
58 frames
[/usr/local/lib/python3.10/dist-packages/controlflow/controllers/controller.py](https://localhost:8080/#) in _setup_run(self)
151 """
152 if self._iteration >= (self.max_iterations or math.inf):
--> 153 raise ValueError(
154 f"Controller has exceeded maximum iterations of {self.max_iterations}."
155 )
ValueError: Controller has exceeded maximum iterations of 10.
Had I not defined a max iterations, it seems like it would have run forever.
Am I missing a step or something?
Tracking issue for vocabulary updates
run_agent
(became run_ai_task
in #10)system_access
delegation_strategy
so far we're havin a lot of fun over here, had a question - I'd imagine there's probably an easy-ish way to do this, but I haven't done a ton w/ prefect so bear with me.
I'm interested to know not just when tool calls finish, but also when they start, and what params they were started with. This helps a lot during development as I'm trying to document the functions/tools I'm building and make sure the LLM can use them well.
I did manage to spin up the prefect web UI and start a server in a another shell, and trigger a deployment from a third shell, but wondering if there's a way to just up the verbosity of the logs a little bit without leaving the CLI / IDE
Here's a subset of the code, minus the tools impls
researcher = cf.Agent(
name="Researcher",
instructions="Use a formal tone and clear language",
tools=[
list_meetings,
list_zoom_meeting_recordings,
transcribe_recording,
download_zoom_file,
split_local_file,
get_file_size,
]
)
person = "<redacted>"
@cf.flow
def research() -> str:
zoomid = cf.Task(
f"""
tell me the zoom meeting ID for my call w/ {person}?
""",
agents=[researcher]
)
downloadURL = cf.Task(
"""
What is the download URL for the m4a audio file from that meeting?
""",
context={"meeting_id": zoomid},
agents=[researcher]
)
summary = cf.Task(
"""
download the file to recording.m4a
""",
context={"download_url": downloadURL},
agents=[researcher]
)
summary = cf.Task(
"""
transcribe the file and then tell me the highlights of the meeting.
You can split the file in half if its too big. The size limit for whisper transcription is 25MB.
""",
context={"filename": "recording.m4a"},
agents=[researcher]
)
return summary
def main():
result = research()
print(result)
if __name__ == "__main__":
main()
I'd be looking for a way to opt-in to something like showing the params in, and maybe the first 200 chars of the response if it's real big
Tool call: list_zoom_meeting_recordings(meeting_id=123456789)
Tool call: ✅ w/ response `[{"meeting_id":123456789, "downloadable_files":[{"type":"video", "url": ....
---------------
Tool call: download_file(download_url=..., filename="dex meeting from 2024-06-24.m4a")
Tool call: ❌ w/ response `{status: 503, message: "something is broken try back later"}`
just riffing here - let me know what y'all think or if I missed a flag/option
I am running into this error with first task from the tutorial.
Environment/System
I am on Windows 11 pro
pip_list is installed
using an anaconda prompt to run the script in a virtual environment
Here is my task
import controlflow as cf
hello_task = cf.Task("say hello")
hello_task.run()
Here is the error
pip_list.txt
00:04:20.859 | ERROR | Task run 'Run LLM' - Task run failed with exception ValueError('Invalid format string') - Retries are exhausted
00:04:20.957 | ERROR | Task run 'Run LLM' - Finished in state Failed('Task run encountered an exception ValueError: Invalid format string')
00:04:20.967 | ERROR | Flow run 'devious-whale' - Encountered exception during execution: Invalid format string
Traceback (most recent call last):
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\flow_engine.py", line 571, in run_context
yield self
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\flow_engine.py", line 662, in run_generator_flow_sync
yield gen_result
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\controlflow\flows\flow.py", line 95, in create_context
yield self
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\controlflow\controllers\controller.py", line 340, in run
new_messages = self.run_once()
^^^^^^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\tasks.py", line 826, in __call__
return run_task(
^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\task_engine.py", line 862, in run_task
return run_task_sync(**kwargs)
^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\task_engine.py", line 675, in run_task_sync
return engine.state if return_type == "state" else engine.result()
^^^^^^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\task_engine.py", line 307, in result
_result = self.state.result(raise_on_failure=raise_on_failure, fetch=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\client\schemas\objects.py", line 262, in result
return get_state_result(
^^^^^^^^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\states.py", line 68, in get_state_result
return _get_state_result(
^^^^^^^^^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\utilities\asyncutils.py", line 389, in coroutine_wrapper
return run_coro_as_sync(ctx_call())
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\utilities\asyncutils.py", line 248, in run_coro_as_sync
return call.result()
^^^^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 312, in result
return self.future.result(timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 173, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "C:\ProgramData\anaconda3\Lib\concurrent\futures\_base.py", line 401, in __get_result
raise self._exception
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 383, in _run_async
result = await coro
^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\utilities\asyncutils.py", line 231, in coroutine_wrapper
return await task
^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\utilities\asyncutils.py", line 379, in ctx_call
result = await async_fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\states.py", line 127, in _get_state_result
raise await get_state_exception(state)
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\task_engine.py", line 619, in run_context
yield self
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\task_engine.py", line 673, in run_task_sync
engine.call_task_fn(txn)
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\task_engine.py", line 647, in call_task_fn
result = call_with_parameters(self.task.fn, parameters)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\utilities\callables.py", line 208, in call_with_parameters
return fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\controlflow\controllers\controller.py", line 291, in run_once
for _ in response_gen:
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\controlflow\llm\completions.py", line 307, in _handle_events
for event in generator:
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\controlflow\llm\completions.py", line 167, in _completion_generator
yield from handle_delta_events(
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\controlflow\llm\completions.py", line 78, in handle_delta_events
yield CompletionEvent(
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\controlflow\llm\completions.py", line 310, in _handle_events
handler.on_event(event)
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\controlflow\llm\handlers.py", line 25, in on_event
method(**event.payload)
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\controlflow\handlers\print_handler.py", line 78, in on_tool_call_delta
self.update_live()
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\controlflow\handlers\print_handler.py", line 60, in update_live
content.append(format_message(message, tool_results=tool_results))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\controlflow\handlers\print_handler.py", line 155, in format_message
subtitle=f"[italic]{format_timestamp(message.timestamp)}[/]",
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\controlflow\handlers\print_handler.py", line 107, in format_timestamp
return local_timestamp.strftime("%l:%M:%S %p")
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ValueError: Invalid format string
00:04:21.135 | ERROR | Flow run 'devious-whale' - Finished in state Failed('Flow run encountered an exception: ValueError: Invalid format string')
00:04:21.140 | ERROR | Task run 'Run LLM Controller' - Task run failed with exception ValueError('Invalid format string') - Retries are exhausted
00:04:21.231 | ERROR | Task run 'Run LLM Controller' - Finished in state Failed('Task run encountered an exception ValueError: Invalid format string')
00:04:21.237 | ERROR | Task run 'Run Task a0cf6 ("say hello")' - Task run failed with exception ValueError('Invalid format string') - Retries are exhausted
00:04:21.376 | ERROR | Task run 'Run Task a0cf6 ("say hello")' - Finished in state Failed('Task run encountered an exception ValueError: Invalid format string')
Traceback (most recent call last):
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\firsttask.py", line 5, in <module>
hello_task.run()
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\tasks.py", line 826, in __call__
return run_task(
^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\task_engine.py", line 862, in run_task
return run_task_sync(**kwargs)
^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\task_engine.py", line 675, in run_task_sync
return engine.state if return_type == "state" else engine.result()
^^^^^^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\task_engine.py", line 307, in result
_result = self.state.result(raise_on_failure=raise_on_failure, fetch=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\client\schemas\objects.py", line 262, in result
return get_state_result(
^^^^^^^^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\states.py", line 68, in get_state_result
return _get_state_result(
^^^^^^^^^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\utilities\asyncutils.py", line 389, in coroutine_wrapper
return run_coro_as_sync(ctx_call())
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\utilities\asyncutils.py", line 248, in run_coro_as_sync
return call.result()
^^^^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 312, in result
return self.future.result(timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 182, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "C:\ProgramData\anaconda3\Lib\concurrent\futures\_base.py", line 401, in __get_result
raise self._exception
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 383, in _run_async
result = await coro
^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\utilities\asyncutils.py", line 231, in coroutine_wrapper
return await task
^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\utilities\asyncutils.py", line 379, in ctx_call
result = await async_fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\states.py", line 127, in _get_state_result
raise await get_state_exception(state)
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\task_engine.py", line 619, in run_context
yield self
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\task_engine.py", line 673, in run_task_sync
engine.call_task_fn(txn)
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\task_engine.py", line 647, in call_task_fn
result = call_with_parameters(self.task.fn, parameters)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\utilities\callables.py", line 208, in call_with_parameters
return fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\controlflow\tasks\task.py", line 354, in run
controller.run()
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\tasks.py", line 826, in __call__
return run_task(
^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\task_engine.py", line 862, in run_task
return run_task_sync(**kwargs)
^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\task_engine.py", line 675, in run_task_sync
return engine.state if return_type == "state" else engine.result()
^^^^^^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\task_engine.py", line 307, in result
_result = self.state.result(raise_on_failure=raise_on_failure, fetch=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\client\schemas\objects.py", line 262, in result
return get_state_result(
^^^^^^^^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\states.py", line 68, in get_state_result
return _get_state_result(
^^^^^^^^^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\utilities\asyncutils.py", line 389, in coroutine_wrapper
return run_coro_as_sync(ctx_call())
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\utilities\asyncutils.py", line 248, in run_coro_as_sync
return call.result()
^^^^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 312, in result
return self.future.result(timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 182, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "C:\ProgramData\anaconda3\Lib\concurrent\futures\_base.py", line 401, in __get_result
raise self._exception
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 383, in _run_async
result = await coro
^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\utilities\asyncutils.py", line 231, in coroutine_wrapper
return await task
^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\utilities\asyncutils.py", line 379, in ctx_call
result = await async_fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\states.py", line 127, in _get_state_result
raise await get_state_exception(state)
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\task_engine.py", line 619, in run_context
yield self
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\task_engine.py", line 673, in run_task_sync
engine.call_task_fn(txn)
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\task_engine.py", line 647, in call_task_fn
result = call_with_parameters(self.task.fn, parameters)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\utilities\callables.py", line 208, in call_with_parameters
return fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\controlflow\controllers\controller.py", line 340, in run
new_messages = self.run_once()
^^^^^^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\tasks.py", line 826, in __call__
return run_task(
^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\task_engine.py", line 862, in run_task
return run_task_sync(**kwargs)
^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\task_engine.py", line 675, in run_task_sync
return engine.state if return_type == "state" else engine.result()
^^^^^^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\task_engine.py", line 307, in result
_result = self.state.result(raise_on_failure=raise_on_failure, fetch=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\client\schemas\objects.py", line 262, in result
return get_state_result(
^^^^^^^^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\states.py", line 68, in get_state_result
return _get_state_result(
^^^^^^^^^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\utilities\asyncutils.py", line 389, in coroutine_wrapper
return run_coro_as_sync(ctx_call())
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\utilities\asyncutils.py", line 248, in run_coro_as_sync
return call.result()
^^^^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 312, in result
return self.future.result(timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 173, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "C:\ProgramData\anaconda3\Lib\concurrent\futures\_base.py", line 401, in __get_result
raise self._exception
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 383, in _run_async
result = await coro
^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\utilities\asyncutils.py", line 231, in coroutine_wrapper
return await task
^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\utilities\asyncutils.py", line 379, in ctx_call
result = await async_fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\states.py", line 127, in _get_state_result
raise await get_state_exception(state)
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\task_engine.py", line 619, in run_context
yield self
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\task_engine.py", line 673, in run_task_sync
engine.call_task_fn(txn)
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\task_engine.py", line 647, in call_task_fn
result = call_with_parameters(self.task.fn, parameters)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\utilities\callables.py", line 208, in call_with_parameters
return fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\controlflow\controllers\controller.py", line 291, in run_once
for _ in response_gen:
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\controlflow\llm\completions.py", line 307, in _handle_events
for event in generator:
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\controlflow\llm\completions.py", line 167, in _completion_generator
yield from handle_delta_events(
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\controlflow\llm\completions.py", line 78, in handle_delta_events
yield CompletionEvent(
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\controlflow\llm\completions.py", line 310, in _handle_events
handler.on_event(event)
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\controlflow\llm\handlers.py", line 25, in on_event
method(**event.payload)
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\controlflow\handlers\print_handler.py", line 78, in on_tool_call_delta
self.update_live()
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\controlflow\handlers\print_handler.py", line 60, in update_live
content.append(format_message(message, tool_results=tool_results))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\controlflow\handlers\print_handler.py", line 155, in format_message
subtitle=f"[italic]{format_timestamp(message.timestamp)}[/]",
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\controlflow\handlers\print_handler.py", line 107, in format_timestamp
return local_timestamp.strftime("%l:%M:%S %p")
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ValueError: Invalid format string
@ai_task
def generate_image(prompt: str) -> Image:
"""Generate an image based on the given prompt."""
If a task is annotated with image, handle it appropriately.
I've looked through the docs but its not clear to me if its possible to assign tasks to run in parrallel, or if its an anti-pattern to try to mark tasks that are complete as not complete anymore to get the behaviour I'm looking for. I'm looking for a self-improvement loop where the same tasks are completed by agents but with each iteration with a new follow up question derived from the conversation so far.
I have this flow:
import controlflow as cf
from typing import Annotated, List, Optional
from pydantic import BaseModel, Field
from agents import (
code_executor,
web_researcher,
database_researcher,
compere
)
class FastThoughts(BaseModel):
inital_response: str
can_answer: bool
@cf.flow()
def fast_n_slow_flow(question, vector_name="control_flow", chat_history=[]):
agents = ["code_executor", "web_researcher", "database_researcher"]
agents_obj = [code_executor, web_researcher, database_researcher]
task_master = cf.Task(
"""
Frame the question from the user in language that all your co-assistants can understand and ask which agents on your team will be able to answer it.
They should reply with thier first initial reactions, but will start a slower more considered response that should come later.
""",
agents=[compere],
result_type=str,
)
task_master.run()
print(task_master.result)
slow_thought_agents = []
for agent in agents_obj:
fast_thoughts = cf.Task(
"Indicate your reaction to the recent chat history without going in to too much depth, including if you think you can make a good next contribution that will help drive the conversation towards the goal of answering the question posed by the user.",
agents=[agent],
context=dict(user_question=task_master),
result_type=FastThoughts
)
fast_answer = fast_thoughts.run()
print(fast_answer)
if fast_answer.can_answer:
print(f"Adding {agent.name} to slow thoughts")
slow_thought_agents.append(agent)
who_speaks_next = cf.Task(
"From the conversation so far and the assistants recent reactions, decide who should speak next",
context=dict(assistant_reactions=fast_thoughts),
agents=[compere],
result_type=agents
)
who_speaks_next.run()
print(who_speaks_next.result) # only result available if you do not supply a result_type
slow_thoughts = []
for agent in slow_thought_agents:
try:
slow_thought = cf.Task(
"You indicated you could contribute more to the conversation with your skills - please now answer the question fully using all tools at your disposal.",
agents=[agent]
)
except ValueError:
print("{agent.name} could not help with a slow thought")
continue
slow_thought.run()
print(slow_thought.result)
slow_thoughts.append(slow_thought)
summary = cf.Task(
"""
Compile all the answers so far and summarise the conversation so far. Highlight what each team member has brought to the discussion.
Consider the initial question and the answers so far, reframe a new question that will help tease out any unexplored areas.
""",
agents = [compere],
context=dict(fast_thoughts=fast_thoughts, slow_thoughts=slow_thoughts, who_speaks_next=who_speaks_next)
)
summary.run()
print(summary.result)
shall_we_end = cf.Task(
"If you believe the conversation has reached its natural conclusion, mark this task complete, otherwise keep the conversation going",
agents=[compere],
context=dict(summary=summary),
result=bool)
shall_we_end.run()
print(shall_we_end)
if shall_we_end.result:
return slow_thoughts, summary.result
it works, but I'd like for it to loop back and use the follow up question the compere adds at the last step, but it is not clear to me how to do this. I'd also like to avoid the for loops or to make them async or parrallel somehow, since they don't depend on each other.
Maybe the below is already possible :) but how I'd theoretically like it is below.
I'd prefer instead of this for loop:
for agent in agents_obj:
fast_thoughts = cf.Task(
"Indicate your reaction to the recent chat history without going in to too much depth, including if you think you can make a good next contribution that will help drive the conversation towards the goal of answering the question posed by the user.",
agents=[agent],
context=dict(user_question=task_master),
result_type=FastThoughts
)
fast_answer = fast_thoughts.run()
print(fast_answer)
if fast_answer.can_answer:
print(f"Adding {agent.name} to slow thoughts")
slow_thought_agents.append(agent)
I just add the agents and then have the results in a list:
# this task is done in parrallel
fast_thoughts = cf.Task(
"Indicate your reaction to the recent chat history without going in to too much depth, including if you think you can make a good next contribution that will help drive the conversation towards the goal of answering the question posed by the user.",
agents=[code_executor, web_researcher, database_researcher], #
context=dict(user_question=task_master),
result_type=[FastThoughts] # a list of the output, one per passed in agent
)
fast_answers = fast_thoughts.run()
for answer in fast_answers:
if answer.can_answer:
print(f"Adding {agent.name} to slow thoughts")
slow_thought_agents.append(agent)
Then I am also unclear how to mark tasks as incomplete or if this is an anti-pattern, but something like
fast_thoughts.set_incomplete()
or perhaps create a new task with subtasks for the next loop?
I did not see groq support in the docs
call @task-decorated functions and add anything returned to the instructions
Generally, when an agent uses a tool we need to show it the result. However, for some tools we know the turn should end -- almost all builtin tools work this way (like marking a task successful). However, today they must issue the EndTurn
event before the ToolResult
event. By adding a flag to the tool, the tool call handler could emit the end turn event automatically.
No response
No response
Probably will address #219
Instead of having human input only via agent making a tool call and the user input in the response, instruct the agents that user input may come at any time in the thread, but they must use a tool to specifically request a response from the user.
The pattern the agent would see is:
...
<agent used human input tool: 'hello, whats your name?'>
<tool response successful>
<human message>
The key is we have to show the tool call so that agents don't conclude from history that they can converse with users by posting "regular" messages.
This will also facilitate adding user messages at any point, perhaps with a mechanism like
agent.send_message()
or
task.send_message()
sending a global message doesn't seem necessary at the moment.
The task dependency example in the ControlFlow documentation under the "Hello, tasks" section isn't functioning as expected. Here are the issues:
import controlflow as cf
from pydantic import BaseModel
class Name(BaseModel):
first: str
last: str
name = cf.Task("Get the user's name", user_access=True, result_type=Name)
poem = cf.Task("Write a personalized poem", context=dict(name=name))
poem.run()
Outcome : The process hangs indefinitely, requiring a manual interrupt.
I'm not sure if this is the correct way to run these outside of a flow...
name.run()
poem.run()
Outcome : The name
task runs successfully, but the poem
task throws an error:
RuntimeError: dictionary changed size during iteration
This contradicts the docs, which state:
"ControlFlow will automatically run any dependencies before executing the task you requested. In the above example, we only ran the
poem
task, but ControlFlow automatically ran thename
task first, then passed its result to thepoem
task's context. We can see that both tasks were successfully completed and haveresult
values."
ControlFlow version : 0.8.1
Python version : 3.12.2
Prefect version : 3.0.0rc9
The controller.graph
property raises a Runtime: dictionary changed size during iteration
exception.
I've only been able to replicate this with task B depending on task A. However, I suspect that if the flow adds subtasks while iterating over the current tasks, this error is raised.
A simple solution may be to caste the self.flow.tasks.values()
to a list before calling Graph.from_tasks
.
Patch will be submitted soon.
I'm look to understand the best way to bring the User into a conversation where the interaction is not a CLI / CLI tool. I can think of a few workaround-ish ideas that might work:
tell_user
or ask_user_for_clarification
that handles the IO via a websocket or somethinguser_input=True
on a task, capture/forward stdin/stdout 😬Building autonomous agents for data engineering and data product management -
The interaction paradigms I'd like to be able to support include web-app chat via websockets or other async layer, in addition to more "outer loop" type channels like email, slack, sms, etc. For example, an agent might discover something and want to alert a user, or might complete a long-running task for a user and want to get the user's input.
These sorts of workflows fit nicely into a D(A)G-y sort of state machine that prefect enables, but I'm trying to wrap my head around the best way to fit together these sorts of async and multi-player workflows, or even just some workarounds / patterns that have worked well for applications that have access to LLMs
# open to brainstorming but nothing off the top of my head
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.