Giter Club home page Giter Club logo

controlflow's People

Contributors

aaazzam avatar discdiver avatar djsauble avatar eltociear avatar emilrex avatar jlowin avatar seanpwlms avatar zhen0 avatar znicholasbrown avatar zzstoatzz avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

controlflow's Issues

Bug: Pydantic model errors during tool serialization

The following fails due to errors serialization the playwright_agent function and passing it to the scraping_agent:

from controlflow import flow as CFFlow, Task as CFTask, Agent
from playwright import sync_api

def playwright_agent(url: str) -> sync_api.Page:
    """
    Uses Playwright to scrape the site at the given URL and returns the playwright page object.
    """
    with sync_api.sync_playwright() as p:
        browser = p.chromium.launch(headless=True)
        page = browser.new_page()
        page.goto(url)

        return page 


scraping_agent = Agent(name="WesleyScrapes", tools=[playwright_agent])

@CFFlow
def scrape_details(url: str) -> str:
    """
    Given a url, this task scrapes the site for details about the tool.
    """
    return CFTask(
        objective="Scrape the site for details about the tool, following links to get more information if necessary.",
        result_type=str,
        agents=[scraping_agent],
        context={"url": url},
    ).run()

if __name__ == "__main__":
  scrape_details("https://prefect.io")

with the following:

16:23:26.558 | ERROR   | Flow run 'zealous-baboon' - Finished in state Failed("Flow run encountered an exception: PydanticSerializationError: Error calling function `_serialize_tools`: PydanticSchemaGenerationError: Unable to generate pydantic-core schema for <class 'playwright.sync_api._generated.Page'>. Set `arbitrary_types_allowed=True` in the model_config to ignore this error or implement `__get_pydantic_core_schema__` on your type to fully support it.\n\nIf you got this error by calling handler(<some type>) within `__get_pydantic_core_schema__` then you likely need to call `handler.generate_schema(<some type>)` since we do not call `__get_pydantic_core_schema__` on `<some type>` otherwise to avoid infinite recursion.\n\nFor further information visit https://errors.pydantic.dev/2.7/u/schema-for-unknown-type")
Traceback (most recent call last):
  File "/Users/nicholas/.pyenv/versions/workflow-webscraper/lib/python3.12/site-packages/pydantic/type_adapter.py", line 217, in __init__
    validator = _getattr_no_parents(type, '__pydantic_validator__')
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nicholas/.pyenv/versions/workflow-webscraper/lib/python3.12/site-packages/pydantic/type_adapter.py", line 98, in _getattr_no_parents
    raise AttributeError(attribute)
AttributeError: __pydantic_validator__

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/nicholas/projects/flows/workflow-webscraper/cf_bug.py", line 31, in <module>
    scrape_details("https://prefect.io")
  File "/Users/nicholas/projects/prefect/src/prefect/flows.py", line 1326, in __call__
    return run_flow(
           ^^^^^^^^^
  File "/Users/nicholas/projects/prefect/src/prefect/flow_engine.py", line 798, in run_flow
    return run_flow_sync(**kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nicholas/projects/prefect/src/prefect/flow_engine.py", line 678, in run_flow_sync
    return engine.state if return_type == "state" else engine.result()
                                                       ^^^^^^^^^^^^^^^
  File "/Users/nicholas/projects/prefect/src/prefect/flow_engine.py", line 243, in result
    raise self._raised
  File "/Users/nicholas/projects/prefect/src/prefect/flow_engine.py", line 635, in run_context
    yield self
  File "/Users/nicholas/projects/prefect/src/prefect/flow_engine.py", line 676, in run_flow_sync
    engine.call_flow_fn()
  File "/Users/nicholas/projects/prefect/src/prefect/flow_engine.py", line 655, in call_flow_fn
    result = call_with_parameters(self.flow.fn, self.parameters)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nicholas/projects/prefect/src/prefect/utilities/callables.py", line 208, in call_with_parameters
    return fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^
  File "/Users/nicholas/projects/control-flow/src/controlflow/decorators.py", line 110, in wrapper
    result = fn(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^
  File "/Users/nicholas/projects/flows/workflow-webscraper/cf_bug.py", line 28, in scrape_details
    ).run()
      ^^^^^
  File "/Users/nicholas/projects/prefect/src/prefect/tasks.py", line 845, in __call__
    return run_task(
           ^^^^^^^^^
  File "/Users/nicholas/projects/prefect/src/prefect/task_engine.py", line 918, in run_task
    return run_task_sync(**kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nicholas/projects/prefect/src/prefect/task_engine.py", line 731, in run_task_sync
    return engine.state if return_type == "state" else engine.result()
                                                       ^^^^^^^^^^^^^^^
  File "/Users/nicholas/projects/prefect/src/prefect/task_engine.py", line 343, in result
    raise self._raised
  File "/Users/nicholas/projects/prefect/src/prefect/task_engine.py", line 673, in run_context
    yield self
  File "/Users/nicholas/projects/prefect/src/prefect/task_engine.py", line 729, in run_task_sync
    engine.call_task_fn(txn)
  File "/Users/nicholas/projects/prefect/src/prefect/task_engine.py", line 702, in call_task_fn
    result = call_with_parameters(self.task.fn, parameters)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nicholas/projects/prefect/src/prefect/utilities/callables.py", line 208, in call_with_parameters
    return fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^
  File "/Users/nicholas/projects/control-flow/src/controlflow/tasks/task.py", line 360, in run
    controller.run()
  File "/Users/nicholas/projects/prefect/src/prefect/tasks.py", line 845, in __call__
    return run_task(
           ^^^^^^^^^
  File "/Users/nicholas/projects/prefect/src/prefect/task_engine.py", line 918, in run_task
    return run_task_sync(**kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nicholas/projects/prefect/src/prefect/task_engine.py", line 731, in run_task_sync
    return engine.state if return_type == "state" else engine.result()
                                                       ^^^^^^^^^^^^^^^
  File "/Users/nicholas/projects/prefect/src/prefect/task_engine.py", line 343, in result
    raise self._raised
  File "/Users/nicholas/projects/prefect/src/prefect/task_engine.py", line 673, in run_context
    yield self
  File "/Users/nicholas/projects/prefect/src/prefect/task_engine.py", line 729, in run_task_sync
    engine.call_task_fn(txn)
  File "/Users/nicholas/projects/prefect/src/prefect/task_engine.py", line 702, in call_task_fn
    result = call_with_parameters(self.task.fn, parameters)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nicholas/projects/prefect/src/prefect/utilities/callables.py", line 208, in call_with_parameters
    return fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^
  File "/Users/nicholas/projects/control-flow/src/controlflow/controllers/controller.py", line 340, in run
    new_messages = self.run_once()
                   ^^^^^^^^^^^^^^^
  File "/Users/nicholas/projects/prefect/src/prefect/tasks.py", line 845, in __call__
    return run_task(
           ^^^^^^^^^
  File "/Users/nicholas/projects/prefect/src/prefect/task_engine.py", line 918, in run_task
    return run_task_sync(**kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nicholas/projects/prefect/src/prefect/task_engine.py", line 731, in run_task_sync
    return engine.state if return_type == "state" else engine.result()
                                                       ^^^^^^^^^^^^^^^
  File "/Users/nicholas/projects/prefect/src/prefect/task_engine.py", line 343, in result
    raise self._raised
  File "/Users/nicholas/projects/prefect/src/prefect/task_engine.py", line 673, in run_context
    yield self
  File "/Users/nicholas/projects/prefect/src/prefect/task_engine.py", line 729, in run_task_sync
    engine.call_task_fn(txn)
  File "/Users/nicholas/projects/prefect/src/prefect/task_engine.py", line 702, in call_task_fn
    result = call_with_parameters(self.task.fn, parameters)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nicholas/projects/prefect/src/prefect/utilities/callables.py", line 208, in call_with_parameters
    return fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^
  File "/Users/nicholas/projects/control-flow/src/controlflow/controllers/controller.py", line 270, in run_once
    payload = self._setup_run()
              ^^^^^^^^^^^^^^^^^
  File "/Users/nicholas/projects/control-flow/src/controlflow/controllers/controller.py", line 192, in _setup_run
    tools.extend(task.get_tools())
                 ^^^^^^^^^^^^^^^^
  File "/Users/nicholas/projects/control-flow/src/controlflow/tasks/task.py", line 526, in get_tools
    tools.extend([self._create_fail_tool(), self._create_success_tool()])
                  ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nicholas/projects/control-flow/src/controlflow/tasks/task.py", line 468, in _create_fail_tool
    return Tool.from_function(
           ^^^^^^^^^^^^^^^^^^^
  File "/Users/nicholas/projects/control-flow/src/controlflow/llm/tools.py", line 83, in from_function
    parameters = TypeAdapter(fn).json_schema()
                 ^^^^^^^^^^^^^^^
  File "/Users/nicholas/.pyenv/versions/workflow-webscraper/lib/python3.12/site-packages/pydantic/type_adapter.py", line 223, in __init__
    core_schema, type, module, str(type), 'TypeAdapter', core_config, config_wrapper.plugin_settings
                               ^^^^^^^^^
  File "/Users/nicholas/projects/control-flow/src/controlflow/tasks/task.py", line 175, in __repr__
    serialized = self.model_dump()
                 ^^^^^^^^^^^^^^^^^
  File "/Users/nicholas/.pyenv/versions/workflow-webscraper/lib/python3.12/site-packages/pydantic/main.py", line 347, in model_dump
    return self.__pydantic_serializer__.to_python(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
pydantic_core._pydantic_core.PydanticSerializationError: Error calling function `_serialize_tools`: PydanticSchemaGenerationError: Unable to generate pydantic-core schema for <class 'playwright.sync_api._generated.Page'>. Set `arbitrary_types_allowed=True` in the model_config to ignore this error or implement `__get_pydantic_core_schema__` on your type to fully support it.

If you got this error by calling handler(<some type>) within `__get_pydantic_core_schema__` then you likely need to call `handler.generate_schema(<some type>)` since we do not call `__get_pydantic_core_schema__` on `<some type>` otherwise to avoid infinite recursion.

For further information visit https://errors.pydantic.dev/2.7/u/schema-for-unknown-type

Prior to launch this worked without issue so it's likely some tool serialization logic has changed.

LangChain tool errors when running with controlflow built from main (works fine with 0.8.2)

Description

Ran script in #222 in a fresh environment.
Errors with AttributeError: 'DuckDuckGoSearchRun' object has no attribute 'to_lc_tool'

Example Code

Version below changes how agents are passed to avoid deprecation warning (ran it both ways).


# uv pip install langchain-community, duckduckgo-search

import controlflow as cf
from langchain_community.tools import DuckDuckGoSearchRun


summarizer = cf.Agent(
    name="Headline Summarizer",
    description="An AI agent that fetches and summarizes current events",
    tools=[DuckDuckGoSearchRun()],
)

extractor = cf.Agent(
    name="Entity Extractor",
    description="An AI agent that does named entity recognition",
)


@cf.flow
def get_headlines():

    summarizer_task = cf.Task(
        "Retrieve and summarize today's two top business headlines",
        agent=summarizer,
        result_type=list[str],
    )

    extractor_task = cf.Task(
        "Extract any fortune 500 companies mentioned in the headlines and whether the sentiment is positive, neutral, or negative",
        agent=extractor,
        depends_on=[summarizer_task],
    )

    return summarizer_task, extractor_task


if __name__ == "__main__":
    headlines, entity_sentiment = get_headlines()
    print(headlines, entity_sentiment)

Version Information

Same package versions in working and non-working cases other than controlflow:
controflow         0.8.3.dev54+gc90dfbb  

duckduckgo-search         6.1.12 
langchain                 0.2.7                    pypi_0    pypi
langchain-anthropic       0.1.19                   pypi_0    pypi
langchain-community       0.2.7                    pypi_0    pypi
langchain-core            0.2.16                   pypi_0    pypi
langchain-openai          0.1.15                   pypi_0    pypi
langchain-text-splitters  0.2.2                    pypi_0    pypi

Additional Context

09:49:07.241 | ERROR   | Flow run 'cerulean-coyote' - Encountered exception during execution: AttributeError("'DuckDuckGoSearchRun' object has no attribute 'to_lc_tool'")
Traceback (most recent call last):
  File "/opt/homebrew/Caskroom/miniforge/base/envs/cfmain/lib/python3.12/site-packages/prefect/flow_engine.py", line 635, in run_context
    yield self
  File "/opt/homebrew/Caskroom/miniforge/base/envs/cfmain/lib/python3.12/site-packages/prefect/flow_engine.py", line 676, in run_flow_sync
    engine.call_flow_fn()
  File "/opt/homebrew/Caskroom/miniforge/base/envs/cfmain/lib/python3.12/site-packages/prefect/flow_engine.py", line 655, in call_flow_fn
    result = call_with_parameters(self.flow.fn, self.parameters)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Caskroom/miniforge/base/envs/cfmain/lib/python3.12/site-packages/prefect/utilities/callables.py", line 208, in call_with_parameters
    return fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Caskroom/miniforge/base/envs/cfmain/lib/python3.12/site-packages/controlflow/decorators.py", line 120, in wrapper
    flow_obj.run()
  File "/opt/homebrew/Caskroom/miniforge/base/envs/cfmain/lib/python3.12/site-packages/controlflow/flows/flow.py", line 134, in run
    orchestrator.run(steps=steps)
  File "/opt/homebrew/Caskroom/miniforge/base/envs/cfmain/lib/python3.12/site-packages/controlflow/orchestration/orchestrator.py", line 114, in run
    agent._run(context=context)
  File "/opt/homebrew/Caskroom/miniforge/base/envs/cfmain/lib/python3.12/site-packages/controlflow/agents/agent.py", line 202, in _run
    for event in self._run_model(messages=messages, tools=context.tools):
  File "/opt/homebrew/Caskroom/miniforge/base/envs/cfmain/lib/python3.12/site-packages/controlflow/agents/agent.py", line 227, in _run_model
    model = self.get_model(tools=tools)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Caskroom/miniforge/base/envs/cfmain/lib/python3.12/site-packages/controlflow/agents/agent.py", line 159, in get_model
    model = model.bind_tools([t.to_lc_tool() for t in tools])
                              ^^^^^^^^^^^^
AttributeError: 'DuckDuckGoSearchRun' object has no attribute 'to_lc_tool'
09:49:07.370 | ERROR   | Flow run 'cerulean-coyote' - Finished in state Failed("Flow run encountered an exception: AttributeError: 'DuckDuckGoSearchRun' object has no attribute 'to_lc_tool'")
Traceback (most recent call last):
  File "/Users/jeffhale/Desktop/prefect/demos/cf/cf_example.py", line 38, in <module>
    headlines, entity_sentiment = get_headlines()
                                  ^^^^^^^^^^^^^^^
  File "/opt/homebrew/Caskroom/miniforge/base/envs/cfmain/lib/python3.12/site-packages/prefect/flows.py", line 1326, in __call__
    return run_flow(
           ^^^^^^^^^
  File "/opt/homebrew/Caskroom/miniforge/base/envs/cfmain/lib/python3.12/site-packages/prefect/flow_engine.py", line 798, in run_flow
    return run_flow_sync(**kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Caskroom/miniforge/base/envs/cfmain/lib/python3.12/site-packages/prefect/flow_engine.py", line 678, in run_flow_sync
    return engine.state if return_type == "state" else engine.result()
                                                       ^^^^^^^^^^^^^^^
  File "/opt/homebrew/Caskroom/miniforge/base/envs/cfmain/lib/python3.12/site-packages/prefect/flow_engine.py", line 243, in result
    raise self._raised
  File "/opt/homebrew/Caskroom/miniforge/base/envs/cfmain/lib/python3.12/site-packages/prefect/flow_engine.py", line 635, in run_context
    yield self
  File "/opt/homebrew/Caskroom/miniforge/base/envs/cfmain/lib/python3.12/site-packages/prefect/flow_engine.py", line 676, in run_flow_sync
    engine.call_flow_fn()
  File "/opt/homebrew/Caskroom/miniforge/base/envs/cfmain/lib/python3.12/site-packages/prefect/flow_engine.py", line 655, in call_flow_fn
    result = call_with_parameters(self.flow.fn, self.parameters)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Caskroom/miniforge/base/envs/cfmain/lib/python3.12/site-packages/prefect/utilities/callables.py", line 208, in call_with_parameters
    return fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Caskroom/miniforge/base/envs/cfmain/lib/python3.12/site-packages/controlflow/decorators.py", line 120, in wrapper
    flow_obj.run()
  File "/opt/homebrew/Caskroom/miniforge/base/envs/cfmain/lib/python3.12/site-packages/controlflow/flows/flow.py", line 134, in run
    orchestrator.run(steps=steps)
  File "/opt/homebrew/Caskroom/miniforge/base/envs/cfmain/lib/python3.12/site-packages/controlflow/orchestration/orchestrator.py", line 114, in run
    agent._run(context=context)
  File "/opt/homebrew/Caskroom/miniforge/base/envs/cfmain/lib/python3.12/site-packages/controlflow/agents/agent.py", line 202, in _run
    for event in self._run_model(messages=messages, tools=context.tools):
  File "/opt/homebrew/Caskroom/miniforge/base/envs/cfmain/lib/python3.12/site-packages/controlflow/agents/agent.py", line 227, in _run_model
    model = self.get_model(tools=tools)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Caskroom/miniforge/base/envs/cfmain/lib/python3.12/site-packages/controlflow/agents/agent.py", line 159, in get_model
    model = model.bind_tools([t.to_lc_tool() for t in tools])
                              ^^^^^^^^^^^^
AttributeError: 'DuckDuckGoSearchRun' object has no attribute 'to_lc_tool'

default_model requires OPENAI_API_KEY to be set even with custom LLMs

Trying to import the module without OPENAI_API_KEY set throws an error.

controlflow==0.7.5

windows (yuck)
powershell (super yuck)

Ideal resolution:

  • not have to set an OPENAI_API_KEY environment variable if i'm not using OpenAI

Temporary Resolution:

  • export OPENAI_API_KEY=nonsense to make the code happy.

example script:

import os
from pydantic import BaseModel

from langchain_mycustommodel import MyCustomLLM #type: langchain LLM

llm = MyCustomLLM(**kwargs)

import controlflow as cf
cf.default_model = llm

# also tried: 
author = cf.Agent(
    model=llm,
    name="Deep Thought",
    instructions="Use a formal tone and clear language",
)

throws error:

openai/gpt-4o None
Traceback (most recent call last):
  File "example\cf.py", line 5, in <module>
    import controlflow as cf
  File ".control\Lib\site-packages\controlflow\__init__.py", line 18, in <module>
    default_model = model_from_string(controlflow.settings.llm_model)
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File ".control\Lib\site-packages\controlflow\llm\models.py", line 63, in model_from_string
    return cls(model=model, temperature=temperature, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File ".control\Lib\site-packages\pydantic\v1\main.py", line 341, in __init__
    raise validation_error
pydantic.v1.error_wrappers.ValidationError: 1 validation error for ChatOpenAI
__root__
  Did not find openai_api_key, please add an environment variable `OPENAI_API_KEY` which contains it, or pass `openai_api_key` as a named parameter. (type=value_error)

caused by the top level import itself: https://github.com/PrefectHQ/ControlFlow/blob/main/src/controlflow/__init__.py#L18

Trying to get quickstart example working

Trying the agent example from the getting started.

Firstly - you don't actually mention configuring Prefect (i.e. starting Prefect server) - that might trip some folks up if they're coming to this for the first time.

Secondly, I'm getting the following error when I run the example:

(prefect-test) chrisgoddard@Good-Studio ~/C/g/p/prefect-test (main) [1]> python src/prefect_test/test2.py                                                                   (base) 
14:04:37.302 | ERROR   | Flow run 'lemon-narwhal' - Encountered exception during execution: 'NoneType' object is not iterable
Traceback (most recent call last):
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/flow_engine.py", line 571, in run_context
    yield self
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/flow_engine.py", line 662, in run_generator_flow_sync
    yield gen_result
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/controlflow/flows/flow.py", line 95, in create_context
    yield self
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/controlflow/controllers/controller.py", line 340, in run
    messages.extend(new_messages)
TypeError: 'NoneType' object is not iterable
14:04:37.338 | ERROR   | Flow run 'lemon-narwhal' - Finished in state Failed("Flow run encountered an exception: TypeError: 'NoneType' object is not iterable")
14:04:37.339 | ERROR   | Task run 'Run LLM Controller' - Task run failed with exception TypeError("'NoneType' object is not iterable") - Retries are exhausted
14:04:37.355 | ERROR   | Task run 'Run LLM Controller' - Finished in state Failed("Task run encountered an exception TypeError: 'NoneType' object is not iterable")
14:04:37.356 | ERROR   | Task run 'Run Task 83ae0 ("Review the edited paragraph to see if it meets the...")' - Task run failed with exception TypeError("'NoneType' object is not iterable") - Retries are exhausted
14:04:37.371 | ERROR   | Task run 'Run Task 83ae0 ("Review the edited paragraph to see if it meets the...")' - Finished in state Failed("Task run encountered an exception TypeError: 'NoneType' object is not iterable")
14:04:37.372 | ERROR   | Flow run 'zealous-quokka' - Encountered exception during execution: 'NoneType' object is not iterable
Traceback (most recent call last):
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/flow_engine.py", line 571, in run_context
    yield self
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/flow_engine.py", line 612, in run_flow_sync
    engine.call_flow_fn()
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/flow_engine.py", line 591, in call_flow_fn
    result = call_with_parameters(self.flow.fn, self.parameters)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/utilities/callables.py", line 208, in call_with_parameters
    return fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/controlflow/decorators.py", line 110, in wrapper
    result = fn(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/src/prefect_test/test2.py", line 50, in writing_flow
    approved = approval_task.run()
               ^^^^^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/tasks.py", line 826, in __call__
    return run_task(
           ^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/task_engine.py", line 862, in run_task
    return run_task_sync(**kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/task_engine.py", line 675, in run_task_sync
    return engine.state if return_type == "state" else engine.result()
                                                       ^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/task_engine.py", line 307, in result
    _result = self.state.result(raise_on_failure=raise_on_failure, fetch=True)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/client/schemas/objects.py", line 262, in result
    return get_state_result(
           ^^^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/states.py", line 68, in get_state_result
    return _get_state_result(
           ^^^^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 389, in coroutine_wrapper
    return run_coro_as_sync(ctx_call())
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 248, in run_coro_as_sync
    return call.result()
           ^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 312, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 182, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/.rye/py/[email protected]/lib/python3.12/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 383, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 231, in coroutine_wrapper
    return await task
           ^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 379, in ctx_call
    result = await async_fn(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/states.py", line 127, in _get_state_result
    raise await get_state_exception(state)
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/task_engine.py", line 619, in run_context
    yield self
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/task_engine.py", line 673, in run_task_sync
    engine.call_task_fn(txn)
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/task_engine.py", line 647, in call_task_fn
    result = call_with_parameters(self.task.fn, parameters)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/utilities/callables.py", line 208, in call_with_parameters
    return fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/controlflow/tasks/task.py", line 354, in run
    controller.run()
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/tasks.py", line 826, in __call__
    return run_task(
           ^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/task_engine.py", line 862, in run_task
    return run_task_sync(**kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/task_engine.py", line 675, in run_task_sync
    return engine.state if return_type == "state" else engine.result()
                                                       ^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/task_engine.py", line 307, in result
    _result = self.state.result(raise_on_failure=raise_on_failure, fetch=True)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/client/schemas/objects.py", line 262, in result
    return get_state_result(
           ^^^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/states.py", line 68, in get_state_result
    return _get_state_result(
           ^^^^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 389, in coroutine_wrapper
    return run_coro_as_sync(ctx_call())
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 248, in run_coro_as_sync
    return call.result()
           ^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 312, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 182, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/.rye/py/[email protected]/lib/python3.12/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 383, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 231, in coroutine_wrapper
    return await task
           ^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 379, in ctx_call
    result = await async_fn(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/states.py", line 127, in _get_state_result
    raise await get_state_exception(state)
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/task_engine.py", line 619, in run_context
    yield self
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/task_engine.py", line 673, in run_task_sync
    engine.call_task_fn(txn)
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/task_engine.py", line 647, in call_task_fn
    result = call_with_parameters(self.task.fn, parameters)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/utilities/callables.py", line 208, in call_with_parameters
    return fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/controlflow/controllers/controller.py", line 340, in run
    messages.extend(new_messages)
TypeError: 'NoneType' object is not iterable
14:04:37.394 | ERROR   | Flow run 'zealous-quokka' - Finished in state Failed("Flow run encountered an exception: TypeError: 'NoneType' object is not iterable")
Traceback (most recent call last):
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/src/prefect_test/test2.py", line 55, in <module>
    approved, draft = writing_flow()
                      ^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/flows.py", line 1286, in __call__
    return run_flow(
           ^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/flow_engine.py", line 734, in run_flow
    return run_flow_sync(**kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/flow_engine.py", line 614, in run_flow_sync
    return engine.state if return_type == "state" else engine.result()
                                                       ^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/flow_engine.py", line 211, in result
    _result = self.state.result(raise_on_failure=raise_on_failure, fetch=True)  # type: ignore
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/client/schemas/objects.py", line 262, in result
    return get_state_result(
           ^^^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/states.py", line 68, in get_state_result
    return _get_state_result(
           ^^^^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 389, in coroutine_wrapper
    return run_coro_as_sync(ctx_call())
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 248, in run_coro_as_sync
    return call.result()
           ^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 312, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 182, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/.rye/py/[email protected]/lib/python3.12/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 383, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 231, in coroutine_wrapper
    return await task
           ^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 379, in ctx_call
    result = await async_fn(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/states.py", line 127, in _get_state_result
    raise await get_state_exception(state)
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/flow_engine.py", line 571, in run_context
    yield self
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/flow_engine.py", line 612, in run_flow_sync
    engine.call_flow_fn()
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/flow_engine.py", line 591, in call_flow_fn
    result = call_with_parameters(self.flow.fn, self.parameters)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/utilities/callables.py", line 208, in call_with_parameters
    return fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/controlflow/decorators.py", line 110, in wrapper
    result = fn(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/src/prefect_test/test2.py", line 50, in writing_flow
    approved = approval_task.run()
               ^^^^^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/tasks.py", line 826, in __call__
    return run_task(
           ^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/task_engine.py", line 862, in run_task
    return run_task_sync(**kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/task_engine.py", line 675, in run_task_sync
    return engine.state if return_type == "state" else engine.result()
                                                       ^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/task_engine.py", line 307, in result
    _result = self.state.result(raise_on_failure=raise_on_failure, fetch=True)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/client/schemas/objects.py", line 262, in result
    return get_state_result(
           ^^^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/states.py", line 68, in get_state_result
    return _get_state_result(
           ^^^^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 389, in coroutine_wrapper
    return run_coro_as_sync(ctx_call())
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 248, in run_coro_as_sync
    return call.result()
           ^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 312, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 182, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/.rye/py/[email protected]/lib/python3.12/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 383, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 231, in coroutine_wrapper
    return await task
           ^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 379, in ctx_call
    result = await async_fn(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/states.py", line 127, in _get_state_result
    raise await get_state_exception(state)
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/task_engine.py", line 619, in run_context
    yield self
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/task_engine.py", line 673, in run_task_sync
    engine.call_task_fn(txn)
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/task_engine.py", line 647, in call_task_fn
    result = call_with_parameters(self.task.fn, parameters)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/utilities/callables.py", line 208, in call_with_parameters
    return fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/controlflow/tasks/task.py", line 354, in run
    controller.run()
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/tasks.py", line 826, in __call__
    return run_task(
           ^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/task_engine.py", line 862, in run_task
    return run_task_sync(**kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/task_engine.py", line 675, in run_task_sync
    return engine.state if return_type == "state" else engine.result()
                                                       ^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/task_engine.py", line 307, in result
    _result = self.state.result(raise_on_failure=raise_on_failure, fetch=True)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/client/schemas/objects.py", line 262, in result
    return get_state_result(
           ^^^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/states.py", line 68, in get_state_result
    return _get_state_result(
           ^^^^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 389, in coroutine_wrapper
    return run_coro_as_sync(ctx_call())
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 248, in run_coro_as_sync
    return call.result()
           ^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 312, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 182, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/.rye/py/[email protected]/lib/python3.12/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 383, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 231, in coroutine_wrapper
    return await task
           ^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 379, in ctx_call
    result = await async_fn(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/states.py", line 127, in _get_state_result
    raise await get_state_exception(state)
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/task_engine.py", line 619, in run_context
    yield self
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/task_engine.py", line 673, in run_task_sync
    engine.call_task_fn(txn)
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/task_engine.py", line 647, in call_task_fn
    result = call_with_parameters(self.task.fn, parameters)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/prefect/utilities/callables.py", line 208, in call_with_parameters
    return fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^
  File "/Users/chrisgoddard/Code/goodkiwi/projects/prefect-test/.venv/lib/python3.12/site-packages/controlflow/controllers/controller.py", line 340, in run
    messages.extend(new_messages)
TypeError: 'NoneType' object is not iterable

I have my OpenAPI key set.

What am I missing?

Add example of using a LangChain tool to the docs

Enhancement Description

Would show a user how to add common pre-built tools.
It took me a bit to get this to work.

Use Case

Say a user wants to get up-to-date information into their app. You can do this with the DuckDuckGoSearchRun tool.

Proposed Implementation

Create an example that shows how to use the `DuckDuckGoSearchRun` LangChain tool.

Stream intermediate results

Enhancement Description

I can see the agents/tasks writing to console, but I'd like to be able to output those streams to the user if relevant, perhaps with a user_stream=True parameter

Use Case

The user can them see whats happening via other places other than console, such as if wrapped in a HTTP endpoint and choice steps are returned so they know what is happening

Proposed Implementation

coder = cf.Task(
            "Answer the question using code. If you can't answer the question with code then mark the task complete but do not supply an answer",
            user_stream=True, # stream this answer out to the user
            agents=[code_executor]
        )

Native Prefect Logging in ControlFlow flow

Enhancement Description

Currently, if you define a Prefect logger inside of a ControlFlow flow, the logs will not appear in the Prefect flow run logs natively.

This issue tracks adding either the Prefect logger natively or some sort of wrapper to allow for the Prefect logger logs to appear in flow run logs when run inside of a ControlFlow flow.

Use Case

from controlflow import flow
from prefect import get_run_logger

@flow
def example_controlflow():
    logger = get_run_logger()
    logger.info("Starting Baseball Control Flow")

example_controlflow()

Proposed Implementation

No response

functional async flow within a async function -> RuntimeError: cannot reuse already awaited coroutine

Description

Running the example below throws this error. Please advise.

File "/Users/*****/Library/Application Support/hatch/env/virtual/agent/D2qYGUjn/agent/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 231, in coroutine_wrapper
    return await task
           ^^^^^^^^^^
RuntimeError: cannot reuse already awaited coroutine

Example Code

import asyncio
from controlflow import flow


@flow
async def main_flow():
    return Task(
        "understand what the user wants and answer accordingly.",
        result_type=str,
    )

async def runner():
    result = await main_flow()

asyncio.run(runner())

Version Information

controlflow version = 0.8.2

Additional Context

No response

langchain_google_genai/_function_utils.py "type_": TYPE_ENUM[v["type"]] - KeyError: 'type'

Description

Attempting to run a Google example makes this error.

I also reported it here: langchain-ai/langchain-google#376

Full stack trace:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/prefect/flow_engine.py", line 635, in run_context
    yield self
  File "/usr/local/lib/python3.10/site-packages/prefect/flow_engine.py", line 676, in run_flow_sync
    engine.call_flow_fn()
  File "/usr/local/lib/python3.10/site-packages/prefect/flow_engine.py", line 655, in call_flow_fn
    result = call_with_parameters(self.flow.fn, self.parameters)
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/callables.py", line 208, in call_with_parameters
    return fn(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/controlflow/decorators.py", line 110, in wrapper
    result = fn(*args, **kwargs)
  File "/app/flow.py", line 27, in team_flow
    who = task_master.run()
  File "/usr/local/lib/python3.10/site-packages/prefect/tasks.py", line 845, in __call__
    return run_task(
  File "/usr/local/lib/python3.10/site-packages/prefect/task_engine.py", line 918, in run_task
    return run_task_sync(**kwargs)
  File "/usr/local/lib/python3.10/site-packages/prefect/task_engine.py", line 731, in run_task_sync
    return engine.state if return_type == "state" else engine.result()
  File "/usr/local/lib/python3.10/site-packages/prefect/task_engine.py", line 343, in result
    raise self._raised
  File "/usr/local/lib/python3.10/site-packages/prefect/task_engine.py", line 673, in run_context
    yield self
  File "/usr/local/lib/python3.10/site-packages/prefect/task_engine.py", line 729, in run_task_sync
    engine.call_task_fn(txn)
  File "/usr/local/lib/python3.10/site-packages/prefect/task_engine.py", line 702, in call_task_fn
    result = call_with_parameters(self.task.fn, parameters)
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/callables.py", line 208, in call_with_parameters
    return fn(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/controlflow/tasks/task.py", line 360, in run
    controller.run()
  File "/usr/local/lib/python3.10/site-packages/prefect/tasks.py", line 845, in __call__
    return run_task(
  File "/usr/local/lib/python3.10/site-packages/prefect/task_engine.py", line 918, in run_task
    return run_task_sync(**kwargs)
  File "/usr/local/lib/python3.10/site-packages/prefect/task_engine.py", line 731, in run_task_sync
    return engine.state if return_type == "state" else engine.result()
  File "/usr/local/lib/python3.10/site-packages/prefect/task_engine.py", line 343, in result
    raise self._raised
  File "/usr/local/lib/python3.10/site-packages/prefect/task_engine.py", line 673, in run_context
    yield self
  File "/usr/local/lib/python3.10/site-packages/prefect/task_engine.py", line 729, in run_task_sync
    engine.call_task_fn(txn)
  File "/usr/local/lib/python3.10/site-packages/prefect/task_engine.py", line 702, in call_task_fn
    result = call_with_parameters(self.task.fn, parameters)
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/callables.py", line 208, in call_with_parameters
    return fn(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/controlflow/controllers/controller.py", line 340, in run
    new_messages = self.run_once()
  File "/usr/local/lib/python3.10/site-packages/prefect/tasks.py", line 845, in __call__
    return run_task(
  File "/usr/local/lib/python3.10/site-packages/prefect/task_engine.py", line 918, in run_task
    return run_task_sync(**kwargs)
  File "/usr/local/lib/python3.10/site-packages/prefect/task_engine.py", line 731, in run_task_sync
    return engine.state if return_type == "state" else engine.result()
  File "/usr/local/lib/python3.10/site-packages/prefect/task_engine.py", line 343, in result
    raise self._raised
  File "/usr/local/lib/python3.10/site-packages/prefect/task_engine.py", line 673, in run_context
    yield self
  File "/usr/local/lib/python3.10/site-packages/prefect/task_engine.py", line 729, in run_task_sync
    engine.call_task_fn(txn)
  File "/usr/local/lib/python3.10/site-packages/prefect/task_engine.py", line 702, in call_task_fn
    result = call_with_parameters(self.task.fn, parameters)
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/callables.py", line 208, in call_with_parameters
    return fn(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/controlflow/controllers/controller.py", line 291, in run_once
    for _ in response_gen:
  File "/usr/local/lib/python3.10/site-packages/controlflow/llm/completions.py", line 307, in _handle_events
    for event in generator:
  File "/usr/local/lib/python3.10/site-packages/controlflow/llm/completions.py", line 125, in _completion_generator
    model = model.bind_tools([t.to_lc_tool() for t in as_tools(tools)])
  File "/usr/local/lib/python3.10/site-packages/langchain_google_genai/chat_models.py", line 1000, in bind_tools
    genai_tools = [
  File "/usr/local/lib/python3.10/site-packages/langchain_google_genai/chat_models.py", line 1001, in <listcomp>
    tool_to_dict(convert_to_genai_function_declarations(tool)) for tool in tools
  File "/usr/local/lib/python3.10/site-packages/langchain_google_genai/_function_utils.py", line 103, in convert_to_genai_function_declarations
    return GoogleTool(function_declarations=_convert_dict_to_genai_functions(tool))  # type: ignore
  File "/usr/local/lib/python3.10/site-packages/langchain_google_genai/_function_utils.py", line 180, in _convert_dict_to_genai_functions
    return [_convert_dict_to_genai_function(d)]
  File "/usr/local/lib/python3.10/site-packages/langchain_google_genai/_function_utils.py", line 204, in _convert_dict_to_genai_function
    "properties": {
  File "/usr/local/lib/python3.10/site-packages/langchain_google_genai/_function_utils.py", line 206, in <dictcomp>
    "type_": TYPE_ENUM[v["type"]],
KeyError: 'type'

Example Code

No response

Version Information

controlflow                    0.8.2
langchain                      0.2.6
langchain-community            0.2.6
langchain-core                 0.2.11
langchain-experimental         0.0.62
langchain-google-genai         1.0.7
langchain-google-vertexai      1.0.6

Additional Context

No response

Task never stops.

Here's the entire Jupyter notebook:

# -*- coding: utf-8 -*-
"""controlflow

Automatically generated by Colab.

Original file is located at
    https://colab.research.google.com/drive/1rPD07Gu1DeqNOO9R1DEOacVM7LqS4VRT
"""

# Commented out IPython magic to ensure Python compatibility.
# %pip install -qU controlflow langchain_fireworks

from google.colab import userdata

import controlflow as cf
from pydantic import BaseModel

from langchain_fireworks import ChatFireworks

cf.settings.max_iterations = 10
cf.settings.eager_mode = False

cfm = ChatFireworks(api_key=userdata.get('FIREWORKS_API_KEY'), model='accounts/fireworks/models/llama-v3-8b-instruct')
cf.default_model = ChatFireworks(api_key=userdata.get('FIREWORKS_API_KEY'), model='accounts/fireworks/models/llama-v3-8b-instruct')

class ResearchTopic(BaseModel):
    title: str
    keywords: list[str]

# create an agent to write a research report
author = cf.Agent(
    name="Deep Thought",
    instructions="Use a formal tone and clear language",
    model=cfm)

@cf.flow
def test_flow():
    test_task = cf.Task("Generate 10 names for a business", user_access=False, result_type=list)
    test_task.run_once(agents=[author])

    return test_task.result

test_flow()

The output is (forgive the formatting):

│                                                                                                  │
│  I'm ready to start working on the tasks. I see that there is one ready task, Task 27d10, which  │
│  has the objective of generating 10 names for a business. I will use the provided tools to       │
│  complete this task.                                                                             │
│                                                                                                  │
│  I will start by using my language generation capabilities to generate a list of 10 names for a  │
│  business. I will make sure to use a formal tone and clear language, as per the instructions.    │
│                                                                                                  │
│  Once I have generated the list of names, I will use the provided tool to mark the task as       │
│  successful and provide the result.                                                              │
│                                                                                                  │
╰────────────────────────────────────────────────────────────────────────────────────  2:44:25 AM ─╯
╭─ Agent: Marvin ──────────────────────────────────────────────────────────────────────────────────╮
│                                                                                                  │
│  I have completed the task objective. I generated a list of 10 names for a business. Here is     │
│  the result:                                                                                     │
│                                                                                                  │
│  ['BusinessPro', 'TechCorp', 'InnovateNow', 'EclipseEnterprises', 'PulsePoint', 'Apex            │
│  Ventures', 'Nexus Solutions', 'Core Dynamics', 'Pinnacle Partners', 'Fusion Forward']           │
│                                                                                                  │
│  I will use the provided tool to mark the task as successful and provide the result.             │
│                                                                                                  │
│  Task 27d10: Successful                                                                          │
│                                                                                                  │
│  Result: ['BusinessPro', 'TechCorp', 'InnovateNow', 'EclipseEnterprises', 'PulsePoint', 'Apex    │
│  Ventures', 'Nexus Solutions', 'Core Dynamics', 'Pinnacle Partners', 'Fusion Forward']           │
│                                                                                                  │
│  I am ready to work on the next task.                                                            │
│                                                                                                  │
╰────────────────────────────────────────────────────────────────────────────────────  2:44:27 AM ─╯
╭─ Agent: Marvin ──────────────────────────────────────────────────────────────────────────────────╮
│                                                                                                  │
│  I see that there are no more ready tasks available for me to work on. I will wait for further   │
│  instructions or tasks to be assigned to me.                                                     │
│                                                                                                  │
╰────────────────────────────────────────────────────────────────────────────────────  2:44:27 AM ─╯
╭─ Agent: Marvin ──────────────────────────────────────────────────────────────────────────────────╮
│                                                                                                  │
│  I will continue to wait for further instructions or tasks to be assigned to me.                 │
│                                                                                                  │
╰────────────────────────────────────────────────────────────────────────────────────  2:44:28 AM ─╯
╭─ Agent: Marvin ──────────────────────────────────────────────────────────────────────────────────╮
│                                                                                                  │
│  I will continue to wait for further instructions or tasks to be assigned to me.                 │
│                                                                                                  │
╰────────────────────────────────────────────────────────────────────────────────────  2:44:29 AM ─╯
╭─ Agent: Marvin ──────────────────────────────────────────────────────────────────────────────────╮
│                                                                                                  │
│  I will continue to wait for further instructions or tasks to be assigned to me.                 │
│                                                                                                  │
╰────────────────────────────────────────────────────────────────────────────────────  2:44:30 AM ─╯
╭─ Agent: Marvin ──────────────────────────────────────────────────────────────────────────────────╮
│                                                                                                  │
│  I will continue to wait for further instructions or tasks to be assigned to me.                 │
│                                                                                                  │
╰────────────────────────────────────────────────────────────────────────────────────  2:44:31 AM ─╯
╭─ Agent: Marvin ──────────────────────────────────────────────────────────────────────────────────╮
│                                                                                                  │
│  I will continue to wait for further instructions or tasks to be assigned to me.                 │
│                                                                                                  │
╰────────────────────────────────────────────────────────────────────────────────────  2:44:32 AM ─╯
╭─ Agent: Marvin ──────────────────────────────────────────────────────────────────────────────────╮
│                                                                                                  │
│  I will continue to wait for further instructions or tasks to be assigned to me.                 │
│                                                                                                  │
╰────────────────────────────────────────────────────────────────────────────────────  2:44:33 AM ─╯
╭─ Agent: Marvin ──────────────────────────────────────────────────────────────────────────────────╮
│                                                                                                  │
│  I will continue to wait for further instructions or tasks to be assigned to me.                 │
│                                                                                                  │
╰────────────────────────────────────────────────────────────────────────────────────  2:44:33 AM ─╯
╭─ Agent: Marvin ──────────────────────────────────────────────────────────────────────────────────╮
│                                                                                                  │
│  I will continue to wait for further instructions or tasks to be assigned to me.                 │
│                                                                                                  │
╰────────────────────────────────────────────────────────────────────────────────────  2:44:34 AM ─╯
02:44:35.080 | ERROR   | Task run 'Run LLM' - Task run failed with exception ValueError('Controller has exceeded maximum iterations of 10.') - Retries are exhausted
02:44:35.145 | ERROR   | Task run 'Run LLM' - Finished in state Failed('Task run encountered an exception ValueError: Controller has exceeded maximum iterations of 10.')
02:44:35.158 | ERROR   | Flow run 'brave-whale' - Encountered exception during execution: Controller has exceeded maximum iterations of 10.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/prefect/flow_engine.py", line 571, in run_context
    yield self
  File "/usr/local/lib/python3.10/dist-packages/prefect/flow_engine.py", line 662, in run_generator_flow_sync
    yield gen_result
  File "/usr/local/lib/python3.10/dist-packages/controlflow/flows/flow.py", line 95, in create_context
    yield self
  File "/usr/local/lib/python3.10/dist-packages/controlflow/controllers/controller.py", line 340, in run
    new_messages = self.run_once()
  File "/usr/local/lib/python3.10/dist-packages/prefect/tasks.py", line 826, in __call__
    return run_task(
  File "/usr/local/lib/python3.10/dist-packages/prefect/task_engine.py", line 862, in run_task
    return run_task_sync(**kwargs)
  File "/usr/local/lib/python3.10/dist-packages/prefect/task_engine.py", line 675, in run_task_sync
    return engine.state if return_type == "state" else engine.result()
  File "/usr/local/lib/python3.10/dist-packages/prefect/task_engine.py", line 307, in result
    _result = self.state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "/usr/local/lib/python3.10/dist-packages/prefect/client/schemas/objects.py", line 262, in result
    return get_state_result(
  File "/usr/local/lib/python3.10/dist-packages/prefect/states.py", line 68, in get_state_result
    return _get_state_result(
  File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/asyncutils.py", line 389, in coroutine_wrapper
    return run_coro_as_sync(ctx_call())
  File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/asyncutils.py", line 248, in run_coro_as_sync
    return call.result()
  File "/usr/local/lib/python3.10/dist-packages/prefect/_internal/concurrency/calls.py", line 312, in result
    return self.future.result(timeout=timeout)
  File "/usr/local/lib/python3.10/dist-packages/prefect/_internal/concurrency/calls.py", line 173, in result
    return self.__get_result()
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.10/dist-packages/prefect/_internal/concurrency/calls.py", line 383, in _run_async
    result = await coro
  File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/asyncutils.py", line 231, in coroutine_wrapper
    return await task
  File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/asyncutils.py", line 379, in ctx_call
    result = await async_fn(*args, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/prefect/states.py", line 127, in _get_state_result
    raise await get_state_exception(state)
  File "/usr/local/lib/python3.10/dist-packages/prefect/task_engine.py", line 619, in run_context
    yield self
  File "/usr/local/lib/python3.10/dist-packages/prefect/task_engine.py", line 673, in run_task_sync
    engine.call_task_fn(txn)
  File "/usr/local/lib/python3.10/dist-packages/prefect/task_engine.py", line 647, in call_task_fn
    result = call_with_parameters(self.task.fn, parameters)
  File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/callables.py", line 208, in call_with_parameters
    return fn(*args, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/controlflow/controllers/controller.py", line 270, in run_once
    payload = self._setup_run()
  File "/usr/local/lib/python3.10/dist-packages/controlflow/controllers/controller.py", line 153, in _setup_run
    raise ValueError(
ValueError: Controller has exceeded maximum iterations of 10.
02:44:35.252 | ERROR   | Flow run 'brave-whale' - Finished in state Failed('Flow run encountered an exception: ValueError: Controller has exceeded maximum iterations of 10.')
02:44:35.258 | ERROR   | Task run 'Run LLM Controller' - Task run failed with exception ValueError('Controller has exceeded maximum iterations of 10.') - Retries are exhausted
02:44:35.329 | ERROR   | Task run 'Run LLM Controller' - Finished in state Failed('Task run encountered an exception ValueError: Controller has exceeded maximum iterations of 10.')
02:44:35.342 | ERROR   | Flow run 'monumental-squirrel' - Encountered exception during execution: Controller has exceeded maximum iterations of 10.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/prefect/flow_engine.py", line 571, in run_context
    yield self
  File "/usr/local/lib/python3.10/dist-packages/prefect/flow_engine.py", line 612, in run_flow_sync
    engine.call_flow_fn()
  File "/usr/local/lib/python3.10/dist-packages/prefect/flow_engine.py", line 591, in call_flow_fn
    result = call_with_parameters(self.flow.fn, self.parameters)
  File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/callables.py", line 208, in call_with_parameters
    return fn(*args, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/controlflow/decorators.py", line 121, in wrapper
    flow_obj.run()
  File "/usr/local/lib/python3.10/dist-packages/controlflow/flows/flow.py", line 139, in run
    controller.run()
  File "/usr/local/lib/python3.10/dist-packages/prefect/tasks.py", line 826, in __call__
    return run_task(
  File "/usr/local/lib/python3.10/dist-packages/prefect/task_engine.py", line 862, in run_task
    return run_task_sync(**kwargs)
  File "/usr/local/lib/python3.10/dist-packages/prefect/task_engine.py", line 675, in run_task_sync
    return engine.state if return_type == "state" else engine.result()
  File "/usr/local/lib/python3.10/dist-packages/prefect/task_engine.py", line 307, in result
    _result = self.state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "/usr/local/lib/python3.10/dist-packages/prefect/client/schemas/objects.py", line 262, in result
    return get_state_result(
  File "/usr/local/lib/python3.10/dist-packages/prefect/states.py", line 68, in get_state_result
    return _get_state_result(
  File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/asyncutils.py", line 389, in coroutine_wrapper
    return run_coro_as_sync(ctx_call())
  File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/asyncutils.py", line 248, in run_coro_as_sync
    return call.result()
  File "/usr/local/lib/python3.10/dist-packages/prefect/_internal/concurrency/calls.py", line 312, in result
    return self.future.result(timeout=timeout)
  File "/usr/local/lib/python3.10/dist-packages/prefect/_internal/concurrency/calls.py", line 173, in result
    return self.__get_result()
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.10/dist-packages/prefect/_internal/concurrency/calls.py", line 383, in _run_async
    result = await coro
  File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/asyncutils.py", line 231, in coroutine_wrapper
    return await task
  File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/asyncutils.py", line 379, in ctx_call
    result = await async_fn(*args, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/prefect/states.py", line 127, in _get_state_result
    raise await get_state_exception(state)
  File "/usr/local/lib/python3.10/dist-packages/prefect/task_engine.py", line 619, in run_context
    yield self
  File "/usr/local/lib/python3.10/dist-packages/prefect/task_engine.py", line 673, in run_task_sync
    engine.call_task_fn(txn)
  File "/usr/local/lib/python3.10/dist-packages/prefect/task_engine.py", line 647, in call_task_fn
    result = call_with_parameters(self.task.fn, parameters)
  File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/callables.py", line 208, in call_with_parameters
    return fn(*args, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/controlflow/controllers/controller.py", line 340, in run
    new_messages = self.run_once()
  File "/usr/local/lib/python3.10/dist-packages/prefect/tasks.py", line 826, in __call__
    return run_task(
  File "/usr/local/lib/python3.10/dist-packages/prefect/task_engine.py", line 862, in run_task
    return run_task_sync(**kwargs)
  File "/usr/local/lib/python3.10/dist-packages/prefect/task_engine.py", line 675, in run_task_sync
    return engine.state if return_type == "state" else engine.result()
  File "/usr/local/lib/python3.10/dist-packages/prefect/task_engine.py", line 307, in result
    _result = self.state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "/usr/local/lib/python3.10/dist-packages/prefect/client/schemas/objects.py", line 262, in result
    return get_state_result(
  File "/usr/local/lib/python3.10/dist-packages/prefect/states.py", line 68, in get_state_result
    return _get_state_result(
  File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/asyncutils.py", line 389, in coroutine_wrapper
    return run_coro_as_sync(ctx_call())
  File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/asyncutils.py", line 248, in run_coro_as_sync
    return call.result()
  File "/usr/local/lib/python3.10/dist-packages/prefect/_internal/concurrency/calls.py", line 312, in result
    return self.future.result(timeout=timeout)
  File "/usr/local/lib/python3.10/dist-packages/prefect/_internal/concurrency/calls.py", line 173, in result
    return self.__get_result()
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.10/dist-packages/prefect/_internal/concurrency/calls.py", line 383, in _run_async
    result = await coro
  File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/asyncutils.py", line 231, in coroutine_wrapper
    return await task
  File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/asyncutils.py", line 379, in ctx_call
    result = await async_fn(*args, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/prefect/states.py", line 127, in _get_state_result
    raise await get_state_exception(state)
  File "/usr/local/lib/python3.10/dist-packages/prefect/task_engine.py", line 619, in run_context
    yield self
  File "/usr/local/lib/python3.10/dist-packages/prefect/task_engine.py", line 673, in run_task_sync
    engine.call_task_fn(txn)
  File "/usr/local/lib/python3.10/dist-packages/prefect/task_engine.py", line 647, in call_task_fn
    result = call_with_parameters(self.task.fn, parameters)
  File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/callables.py", line 208, in call_with_parameters
    return fn(*args, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/controlflow/controllers/controller.py", line 270, in run_once
    payload = self._setup_run()
  File "/usr/local/lib/python3.10/dist-packages/controlflow/controllers/controller.py", line 153, in _setup_run
    raise ValueError(
ValueError: Controller has exceeded maximum iterations of 10.
02:44:35.421 | ERROR   | Flow run 'monumental-squirrel' - Finished in state Failed('Flow run encountered an exception: ValueError: Controller has exceeded maximum iterations of 10.')
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
[<ipython-input-17-8475e9fd26aa>](https://localhost:8080/#) in <cell line: 1>()
----> 1 test_flow()

58 frames
[/usr/local/lib/python3.10/dist-packages/controlflow/controllers/controller.py](https://localhost:8080/#) in _setup_run(self)
    151         """
    152         if self._iteration >= (self.max_iterations or math.inf):
--> 153             raise ValueError(
    154                 f"Controller has exceeded maximum iterations of {self.max_iterations}."
    155             )

ValueError: Controller has exceeded maximum iterations of 10.

Had I not defined a max iterations, it seems like it would have run forever.

Am I missing a step or something?

Vocabulary updates

Tracking issue for vocabulary updates

  • run_agent (became run_ai_task in #10)
  • system_access
  • delegation_strategy

More verbose output for tool calls / responses

so far we're havin a lot of fun over here, had a question - I'd imagine there's probably an easy-ish way to do this, but I haven't done a ton w/ prefect so bear with me.

I'm interested to know not just when tool calls finish, but also when they start, and what params they were started with. This helps a lot during development as I'm trying to document the functions/tools I'm building and make sure the LLM can use them well.

I did manage to spin up the prefect web UI and start a server in a another shell, and trigger a deployment from a third shell, but wondering if there's a way to just up the verbosity of the logs a little bit without leaving the CLI / IDE

Here's a subset of the code, minus the tools impls

researcher = cf.Agent(
    name="Researcher",
    instructions="Use a formal tone and clear language",
    tools=[
        list_meetings,
        list_zoom_meeting_recordings,
        transcribe_recording,
        download_zoom_file,
        split_local_file,
        get_file_size,
    ]
)

person = "<redacted>"

@cf.flow
def research() -> str:
    zoomid = cf.Task(
        f"""
        tell me the zoom meeting ID for my call w/ {person}? 
        """,
        agents=[researcher]
    )

    downloadURL = cf.Task(
        """
        What is the download URL for the m4a audio file from that meeting? 
        """,
        context={"meeting_id": zoomid},
        agents=[researcher]
    )

    summary = cf.Task(
        """
        download the file  to recording.m4a
        """,
        context={"download_url": downloadURL},
        agents=[researcher]
    )

    summary = cf.Task(
        """
        transcribe the file and then tell me the highlights of the meeting.
         
        You can split the file in half if its too big. The size limit for whisper transcription is 25MB.
        """,
        context={"filename": "recording.m4a"},
        agents=[researcher]
    )
    return summary


def main():
    result = research()
    print(result)


if __name__ == "__main__":
    main()

I'd be looking for a way to opt-in to something like showing the params in, and maybe the first 200 chars of the response if it's real big

Tool call: list_zoom_meeting_recordings(meeting_id=123456789)
Tool call: ✅  w/ response `[{"meeting_id":123456789, "downloadable_files":[{"type":"video", "url": ....
---------------
Tool call: download_file(download_url=..., filename="dex meeting from 2024-06-24.m4a")
Tool call: ❌   w/ response `{status: 503, message: "something is broken try back later"}`

just riffing here - let me know what y'all think or if I missed a flag/option

Task run failed with exception ValueError('Invalid format string')

I am running into this error with first task from the tutorial.

Environment/System
I am on Windows 11 pro
pip_list is installed
using an anaconda prompt to run the script in a virtual environment

Here is my task

import controlflow as cf
hello_task = cf.Task("say hello")
hello_task.run()

Here is the error
pip_list.txt


00:04:20.859 | ERROR   | Task run 'Run LLM' - Task run failed with exception ValueError('Invalid format string') - Retries are exhausted
00:04:20.957 | ERROR   | Task run 'Run LLM' - Finished in state Failed('Task run encountered an exception ValueError: Invalid format string')
00:04:20.967 | ERROR   | Flow run 'devious-whale' - Encountered exception during execution: Invalid format string
Traceback (most recent call last):
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\flow_engine.py", line 571, in run_context
    yield self
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\flow_engine.py", line 662, in run_generator_flow_sync
    yield gen_result
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\controlflow\flows\flow.py", line 95, in create_context
    yield self
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\controlflow\controllers\controller.py", line 340, in run
    new_messages = self.run_once()
                   ^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\tasks.py", line 826, in __call__
    return run_task(
           ^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\task_engine.py", line 862, in run_task
    return run_task_sync(**kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\task_engine.py", line 675, in run_task_sync
    return engine.state if return_type == "state" else engine.result()
                                                       ^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\task_engine.py", line 307, in result
    _result = self.state.result(raise_on_failure=raise_on_failure, fetch=True)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\client\schemas\objects.py", line 262, in result
    return get_state_result(
           ^^^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\states.py", line 68, in get_state_result
    return _get_state_result(
           ^^^^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\utilities\asyncutils.py", line 389, in coroutine_wrapper
    return run_coro_as_sync(ctx_call())
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\utilities\asyncutils.py", line 248, in run_coro_as_sync
    return call.result()
           ^^^^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 312, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 173, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "C:\ProgramData\anaconda3\Lib\concurrent\futures\_base.py", line 401, in __get_result
    raise self._exception
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 383, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\utilities\asyncutils.py", line 231, in coroutine_wrapper
    return await task
           ^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\utilities\asyncutils.py", line 379, in ctx_call
    result = await async_fn(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\states.py", line 127, in _get_state_result
    raise await get_state_exception(state)
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\task_engine.py", line 619, in run_context
    yield self
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\task_engine.py", line 673, in run_task_sync
    engine.call_task_fn(txn)
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\task_engine.py", line 647, in call_task_fn
    result = call_with_parameters(self.task.fn, parameters)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\utilities\callables.py", line 208, in call_with_parameters
    return fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\controlflow\controllers\controller.py", line 291, in run_once
    for _ in response_gen:
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\controlflow\llm\completions.py", line 307, in _handle_events
    for event in generator:
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\controlflow\llm\completions.py", line 167, in _completion_generator
    yield from handle_delta_events(
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\controlflow\llm\completions.py", line 78, in handle_delta_events
    yield CompletionEvent(
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\controlflow\llm\completions.py", line 310, in _handle_events
    handler.on_event(event)
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\controlflow\llm\handlers.py", line 25, in on_event
    method(**event.payload)
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\controlflow\handlers\print_handler.py", line 78, in on_tool_call_delta
    self.update_live()
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\controlflow\handlers\print_handler.py", line 60, in update_live
    content.append(format_message(message, tool_results=tool_results))
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\controlflow\handlers\print_handler.py", line 155, in format_message
    subtitle=f"[italic]{format_timestamp(message.timestamp)}[/]",
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\controlflow\handlers\print_handler.py", line 107, in format_timestamp
    return local_timestamp.strftime("%l:%M:%S %p")
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ValueError: Invalid format string
00:04:21.135 | ERROR   | Flow run 'devious-whale' - Finished in state Failed('Flow run encountered an exception: ValueError: Invalid format string')
00:04:21.140 | ERROR   | Task run 'Run LLM Controller' - Task run failed with exception ValueError('Invalid format string') - Retries are exhausted
00:04:21.231 | ERROR   | Task run 'Run LLM Controller' - Finished in state Failed('Task run encountered an exception ValueError: Invalid format string')
00:04:21.237 | ERROR   | Task run 'Run Task a0cf6 ("say hello")' - Task run failed with exception ValueError('Invalid format string') - Retries are exhausted
00:04:21.376 | ERROR   | Task run 'Run Task a0cf6 ("say hello")' - Finished in state Failed('Task run encountered an exception ValueError: Invalid format string')
Traceback (most recent call last):
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\firsttask.py", line 5, in <module>
    hello_task.run()
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\tasks.py", line 826, in __call__
    return run_task(
           ^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\task_engine.py", line 862, in run_task
    return run_task_sync(**kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\task_engine.py", line 675, in run_task_sync
    return engine.state if return_type == "state" else engine.result()
                                                       ^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\task_engine.py", line 307, in result
    _result = self.state.result(raise_on_failure=raise_on_failure, fetch=True)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\client\schemas\objects.py", line 262, in result
    return get_state_result(
           ^^^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\states.py", line 68, in get_state_result
    return _get_state_result(
           ^^^^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\utilities\asyncutils.py", line 389, in coroutine_wrapper
    return run_coro_as_sync(ctx_call())
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\utilities\asyncutils.py", line 248, in run_coro_as_sync
    return call.result()
           ^^^^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 312, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 182, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "C:\ProgramData\anaconda3\Lib\concurrent\futures\_base.py", line 401, in __get_result
    raise self._exception
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 383, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\utilities\asyncutils.py", line 231, in coroutine_wrapper
    return await task
           ^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\utilities\asyncutils.py", line 379, in ctx_call
    result = await async_fn(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\states.py", line 127, in _get_state_result
    raise await get_state_exception(state)
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\task_engine.py", line 619, in run_context
    yield self
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\task_engine.py", line 673, in run_task_sync
    engine.call_task_fn(txn)
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\task_engine.py", line 647, in call_task_fn
    result = call_with_parameters(self.task.fn, parameters)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\utilities\callables.py", line 208, in call_with_parameters
    return fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\controlflow\tasks\task.py", line 354, in run
    controller.run()
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\tasks.py", line 826, in __call__
    return run_task(
           ^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\task_engine.py", line 862, in run_task
    return run_task_sync(**kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\task_engine.py", line 675, in run_task_sync
    return engine.state if return_type == "state" else engine.result()
                                                       ^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\task_engine.py", line 307, in result
    _result = self.state.result(raise_on_failure=raise_on_failure, fetch=True)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\client\schemas\objects.py", line 262, in result
    return get_state_result(
           ^^^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\states.py", line 68, in get_state_result
    return _get_state_result(
           ^^^^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\utilities\asyncutils.py", line 389, in coroutine_wrapper
    return run_coro_as_sync(ctx_call())
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\utilities\asyncutils.py", line 248, in run_coro_as_sync
    return call.result()
           ^^^^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 312, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 182, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "C:\ProgramData\anaconda3\Lib\concurrent\futures\_base.py", line 401, in __get_result
    raise self._exception
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 383, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\utilities\asyncutils.py", line 231, in coroutine_wrapper
    return await task
           ^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\utilities\asyncutils.py", line 379, in ctx_call
    result = await async_fn(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\states.py", line 127, in _get_state_result
    raise await get_state_exception(state)
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\task_engine.py", line 619, in run_context
    yield self
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\task_engine.py", line 673, in run_task_sync
    engine.call_task_fn(txn)
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\task_engine.py", line 647, in call_task_fn
    result = call_with_parameters(self.task.fn, parameters)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\utilities\callables.py", line 208, in call_with_parameters
    return fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\controlflow\controllers\controller.py", line 340, in run
    new_messages = self.run_once()
                   ^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\tasks.py", line 826, in __call__
    return run_task(
           ^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\task_engine.py", line 862, in run_task
    return run_task_sync(**kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\task_engine.py", line 675, in run_task_sync
    return engine.state if return_type == "state" else engine.result()
                                                       ^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\task_engine.py", line 307, in result
    _result = self.state.result(raise_on_failure=raise_on_failure, fetch=True)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\client\schemas\objects.py", line 262, in result
    return get_state_result(
           ^^^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\states.py", line 68, in get_state_result
    return _get_state_result(
           ^^^^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\utilities\asyncutils.py", line 389, in coroutine_wrapper
    return run_coro_as_sync(ctx_call())
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\utilities\asyncutils.py", line 248, in run_coro_as_sync
    return call.result()
           ^^^^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 312, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 173, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "C:\ProgramData\anaconda3\Lib\concurrent\futures\_base.py", line 401, in __get_result
    raise self._exception
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 383, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\utilities\asyncutils.py", line 231, in coroutine_wrapper
    return await task
           ^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\utilities\asyncutils.py", line 379, in ctx_call
    result = await async_fn(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\states.py", line 127, in _get_state_result
    raise await get_state_exception(state)
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\task_engine.py", line 619, in run_context
    yield self
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\task_engine.py", line 673, in run_task_sync
    engine.call_task_fn(txn)
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\task_engine.py", line 647, in call_task_fn
    result = call_with_parameters(self.task.fn, parameters)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\prefect\utilities\callables.py", line 208, in call_with_parameters
    return fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\controlflow\controllers\controller.py", line 291, in run_once
    for _ in response_gen:
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\controlflow\llm\completions.py", line 307, in _handle_events
    for event in generator:
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\controlflow\llm\completions.py", line 167, in _completion_generator
    yield from handle_delta_events(
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\controlflow\llm\completions.py", line 78, in handle_delta_events
    yield CompletionEvent(
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\controlflow\llm\completions.py", line 310, in _handle_events
    handler.on_event(event)
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\controlflow\llm\handlers.py", line 25, in on_event
    method(**event.payload)
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\controlflow\handlers\print_handler.py", line 78, in on_tool_call_delta
    self.update_live()
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\controlflow\handlers\print_handler.py", line 60, in update_live
    content.append(format_message(message, tool_results=tool_results))
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\controlflow\handlers\print_handler.py", line 155, in format_message
    subtitle=f"[italic]{format_timestamp(message.timestamp)}[/]",
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\src\cagi\prefect\controlflow\myenv\Lib\site-packages\controlflow\handlers\print_handler.py", line 107, in format_timestamp
    return local_timestamp.strftime("%l:%M:%S %p")
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ValueError: Invalid format string

Image + audio annotation

@ai_task 
def generate_image(prompt: str) -> Image:
    """Generate an image based on the given prompt."""

If a task is annotated with image, handle it appropriately.

Docs? How to run tasks in parallel and mark tasks that were done to be incomplete and/or loop executions

Enhancement Description

I've looked through the docs but its not clear to me if its possible to assign tasks to run in parrallel, or if its an anti-pattern to try to mark tasks that are complete as not complete anymore to get the behaviour I'm looking for. I'm looking for a self-improvement loop where the same tasks are completed by agents but with each iteration with a new follow up question derived from the conversation so far.

Use Case

I have this flow:

import controlflow as cf
from typing import Annotated, List, Optional
from pydantic import BaseModel, Field

from agents import (
    code_executor,
    web_researcher,
    database_researcher,
    compere
)

class FastThoughts(BaseModel):
    inital_response: str
    can_answer: bool

@cf.flow()
def fast_n_slow_flow(question, vector_name="control_flow", chat_history=[]):

    agents = ["code_executor", "web_researcher", "database_researcher"]
    agents_obj = [code_executor, web_researcher, database_researcher]

    task_master = cf.Task(
        """
Frame the question from the user in language that all your co-assistants can understand and ask which agents on your team will be able to answer it.
They should reply with thier first initial reactions, but will start a slower more considered response that should come later.
""",
        agents=[compere],
        result_type=str, 
    )
    task_master.run()
    print(task_master.result)

    slow_thought_agents = []
    

    for agent in agents_obj:
        fast_thoughts = cf.Task(
            "Indicate your reaction to the recent chat history without going in to too much depth, including if you think you can make a good next contribution that will help drive the conversation towards the goal of answering the question posed by the user.",
            agents=[agent],
            context=dict(user_question=task_master),
            result_type=FastThoughts
        )
        fast_answer = fast_thoughts.run()
        print(fast_answer)

        if fast_answer.can_answer:
            print(f"Adding {agent.name} to slow thoughts")
            slow_thought_agents.append(agent)


    who_speaks_next = cf.Task(
        "From the conversation so far and the assistants recent reactions, decide who should speak next",
        context=dict(assistant_reactions=fast_thoughts),
        agents=[compere],
        result_type=agents
    )
    who_speaks_next.run()
    print(who_speaks_next.result) # only result available if you do not supply a result_type


    slow_thoughts = []
    for agent in slow_thought_agents:
        try:
            slow_thought = cf.Task(
                "You indicated you could contribute more to the conversation with your skills - please now answer the question fully using all tools at your disposal.",
                agents=[agent]
            )
        except ValueError:
            print("{agent.name} could not help with a slow thought")
            continue

        slow_thought.run()
        print(slow_thought.result)
        slow_thoughts.append(slow_thought)

    summary = cf.Task(
        """
        Compile all the answers so far and summarise the conversation so far.  Highlight what each team member has brought to the discussion.
        Consider the initial question and the answers so far, reframe a new question that will help tease out any unexplored areas.
        """,
        agents = [compere],
        context=dict(fast_thoughts=fast_thoughts, slow_thoughts=slow_thoughts, who_speaks_next=who_speaks_next)
    )
    summary.run()
    print(summary.result)

    shall_we_end = cf.Task(
        "If you believe the conversation has reached its natural conclusion, mark this task complete, otherwise keep the conversation going",
        agents=[compere],
        context=dict(summary=summary),
        result=bool)
    
    shall_we_end.run()
    print(shall_we_end)

    if shall_we_end.result:
        return slow_thoughts, summary.result

it works, but I'd like for it to loop back and use the follow up question the compere adds at the last step, but it is not clear to me how to do this. I'd also like to avoid the for loops or to make them async or parrallel somehow, since they don't depend on each other.

Proposed Implementation

Maybe the below is already possible :) but how I'd theoretically like it is below.

I'd prefer instead of this for loop:

    for agent in agents_obj:
        fast_thoughts = cf.Task(
            "Indicate your reaction to the recent chat history without going in to too much depth, including if you think you can make a good next contribution that will help drive the conversation towards the goal of answering the question posed by the user.",
            agents=[agent],
            context=dict(user_question=task_master),
            result_type=FastThoughts
        )
        fast_answer = fast_thoughts.run()
        print(fast_answer)

        if fast_answer.can_answer:
            print(f"Adding {agent.name} to slow thoughts")
            slow_thought_agents.append(agent)

I just add the agents and then have the results in a list:

# this task is done in parrallel
fast_thoughts = cf.Task(
            "Indicate your reaction to the recent chat history without going in to too much depth, including if you think you can make a good next contribution that will help drive the conversation towards the goal of answering the question posed by the user.",
            agents=[code_executor, web_researcher, database_researcher], # 
            context=dict(user_question=task_master),
            result_type=[FastThoughts] # a list of the output, one per passed in agent
        )

fast_answers = fast_thoughts.run()

for answer in fast_answers:
      if answer.can_answer:
              print(f"Adding {agent.name} to slow thoughts")
              slow_thought_agents.append(agent)

Then I am also unclear how to mark tasks as incomplete or if this is an anti-pattern, but something like

fast_thoughts.set_incomplete()

or perhaps create a new task with subtasks for the next loop?

Add an `end_turn` field to tools

Enhancement Description

Generally, when an agent uses a tool we need to show it the result. However, for some tools we know the turn should end -- almost all builtin tools work this way (like marking a task successful). However, today they must issue the EndTurn event before the ToolResult event. By adding a flag to the tool, the tool call handler could emit the end turn event automatically.

Use Case

No response

Proposed Implementation

No response

Adjust how human input works

Probably will address #219

Instead of having human input only via agent making a tool call and the user input in the response, instruct the agents that user input may come at any time in the thread, but they must use a tool to specifically request a response from the user.

The pattern the agent would see is:

...
<agent used human input tool: 'hello, whats your name?'>
<tool response successful>
<human message>

The key is we have to show the tool call so that agents don't conclude from history that they can converse with users by posting "regular" messages.

This will also facilitate adding user messages at any point, perhaps with a mechanism like

agent.send_message()
or 
task.send_message()

sending a global message doesn't seem necessary at the moment.

Docs: Task Dependency Example in Tutorial Not Working as Described

The task dependency example in the ControlFlow documentation under the "Hello, tasks" section isn't functioning as expected. Here are the issues:

When Running the Example Code as Provided:

import controlflow as cf
from pydantic import BaseModel

class Name(BaseModel):
    first: str
    last: str

name = cf.Task("Get the user's name", user_access=True, result_type=Name)
poem = cf.Task("Write a personalized poem", context=dict(name=name))

poem.run()

Outcome : The process hangs indefinitely, requiring a manual interrupt.

When Running the Tasks Explicitly in Sequence:

I'm not sure if this is the correct way to run these outside of a flow...

name.run()
poem.run()

Outcome : The name task runs successfully, but the poem task throws an error:

RuntimeError: dictionary changed size during iteration

This contradicts the docs, which state:

"ControlFlow will automatically run any dependencies before executing the task you requested. In the above example, we only ran the poem task, but ControlFlow automatically ran the name task first, then passed its result to the poem task's context. We can see that both tasks were successfully completed and have result values."

Environment:

  • ControlFlow version : 0.8.1

  • Python version : 3.12.2

  • Prefect version : 3.0.0rc9

Bug: Graph.from_tasks with flow.tasks.values() raises "RuntimeError: dictionary changed size during iteration"

The controller.graph property raises a Runtime: dictionary changed size during iteration exception.

I've only been able to replicate this with task B depending on task A. However, I suspect that if the flow adds subtasks while iterating over the current tasks, this error is raised.

A simple solution may be to caste the self.flow.tasks.values() to a list before calling Graph.from_tasks.

Patch will be submitted soon.

Hooks for user input?

Enhancement Description

I'm look to understand the best way to bring the User into a conversation where the interaction is not a CLI / CLI tool. I can think of a few workaround-ish ideas that might work:

  1. use prefect workflow pause/resume to go get the user input and resume with their input
  2. add a tool call like tell_user or ask_user_for_clarification that handles the IO via a websocket or something
  3. set user_input=True on a task, capture/forward stdin/stdout 😬

Use Case

Building autonomous agents for data engineering and data product management -

The interaction paradigms I'd like to be able to support include web-app chat via websockets or other async layer, in addition to more "outer loop" type channels like email, slack, sms, etc. For example, an agent might discover something and want to alert a user, or might complete a long-running task for a user and want to get the user's input.

These sorts of workflows fit nicely into a D(A)G-y sort of state machine that prefect enables, but I'm trying to wrap my head around the best way to fit together these sorts of async and multi-player workflows, or even just some workarounds / patterns that have worked well for applications that have access to LLMs

Proposed Implementation

# open to brainstorming but nothing off the top of my head

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.