Giter Club home page Giter Club logo

cadence-python's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

cadence-python's Issues

Calling signal or query methods from new_workflow_stub_from_workflow_id is invoking always invoking WorkflowMethod again

From examples:


import time

from cadence.exceptions import QueryFailureException
from cadence.workerfactory import WorkerFactory
from cadence.workflow import workflow_method, signal_method, Workflow, WorkflowClient, query_method

TASK_LIST = "TestStubWorkflowId"
DOMAIN = "sample2"


class GreetingException(Exception):
    pass


class TestStubWorkflowId:

    @query_method()
    async def get_message(self) -> str:
        raise NotImplementedError

    @query_method()
    async def get_message_fail(self) -> str:
        raise NotImplementedError

    @signal_method()
    async def put_message(self, message):
        raise NotImplementedError

    @workflow_method(task_list=TASK_LIST)
    async def get_greetings(self) -> list:
        raise NotImplementedError


class TestStubWorkflowIdImpl(TestStubWorkflowId):

    def __init__(self):
        self.message = ""

    async def get_message(self) -> str:
        return self.message

    async def get_message_fail(self) -> str:
        raise GreetingException("error from query")

    async def put_message(self, message):
        self.message = message

    async def get_greetings(self) -> list:
        print("invoked!!")
        self.message = "initial-message"
        await Workflow.await_till(lambda: self.message == "done")
        return "finished"


if __name__ == "__main__":
    factory = WorkerFactory("localhost", 7933, DOMAIN)
    worker = factory.new_worker(TASK_LIST)
    worker.register_workflow_implementation_type(TestStubWorkflowIdImpl)
    factory.start()

    client = WorkflowClient.new_client(domain=DOMAIN)
    workflow: TestStubWorkflowId = client.new_workflow_stub(TestStubWorkflowId)
    context = WorkflowClient.start(workflow.get_greetings)

    stub: TestStubWorkflowId = client.new_workflow_stub_from_workflow_id(TestStubWorkflowId,
                                                                         workflow_id=context.workflow_execution.workflow_id)
    stub.put_message("abc")
    assert stub.get_message() == "abc"

    stub.put_message("done")
    assert client.wait_for_close_with_workflow_id(context.workflow_execution.workflow_id) == "finished"

    print("Stopping workers")
    worker.stop()

output:

/Users/tsharma/PycharmProjects/twilio_whatsapp/venv/bin/python /Users/tsharma/PycharmProjects/twilio_whatsapp/cadence_wf/test_id_again.py
invoked!!
invoked!!
invoked!!
invoked!!
Stopping workers

WorkflowMethod get_greetings is being invoked for every subsequent call to signal or query method on stub from workflow_id. not sure whether this should be happening.

Handling of ActivityTaskTimeoutException, schedule_to_start_timeout_seconds

It seems ActivityTaskTimeoutException is being caught in the decision_loop.py/workflow.py, but it doesn't propagate up to the caller. The Java Client catches both of these exceptions using a similar setup so the behavior seems different.

Using this sample code below, if you set the schedule_to_start_timeout_seconds shorter than the execution_start_to_close_timeout, you see the following, but it doesn't propagate up. If you set the schedule_to_start_timeout_seconds longer than the execution timeout, the client will get the WorkflowExecutionTimeoutException.

  1. Running with schedule_to_start_timeout_seconds = 1 (we see the exception, it just never makes it up to the caller)
 ERROR:cadence.decision_loop:Workflow GreetingWorkflow::get_greeting('Python') failed
Traceback (most recent call last):
  File "/Users/rich/.pyenv/versions/dev/lib/python3.7/site-packages/cadence/decision_loop.py", line 233, in workflow_main
    self.ret_value = await workflow_proc(self.workflow_instance, *self.workflow_input)
  File "/Users/rich/.pyenv/versions/dev/lib/python3.7/site-packages/cadence/activity_method.py", line 62, in stub_activity_fn
    return await decision_context.schedule_activity_task(parameters=parameters)
  File "/Users/rich/.pyenv/versions/dev/lib/python3.7/site-packages/cadence/decision_loop.py", line 342, in schedule_activity_task
    await future
cadence.exceptions.ActivityTaskTimeoutException
...

  1. Running with execution_start_to_close_timeout=1 and schedule_to_start_timeout_seconds=5 we get the exception
Workflow Excution Timeout 
Stopping workers....
Workers stopped...

Appears that somewhere in the code, it isn't adding the ActivityTimeout to the GetWorkflowExecutionHistoryResponse. But I could be way off, but that's where the trail seemed to lead. Whether it's supposed to or not is also a question, but the Java Client does throw this exception up to the caller (although it's thrown up as WorkflowException, which I think is correct).

# Activities Interface
class GreetingActivities:
    @activity_method(task_list=TASK_LIST,
                     schedule_to_close_timeout_seconds=100000,
                     schedule_to_start_timeout_seconds=5)
    def compose_greeting(self, greeting: str, name: str) -> str:
        raise NotImplementedError

# Workflow Interface
class GreetingWorkflow:
    @workflow_method(task_list=TASK_LIST,
                     execution_start_to_close_timeout_seconds=1)

    def get_greeting(self, name: str) -> str:
        raise NotImplementedError

# Workflow Implementation
class GreetingWorkflowImpl(GreetingWorkflow):

    def __init__(self):
        self.greeting_activities: GreetingActivities = Workflow.new_activity_stub(GreetingActivities)

    #async
    def get_greeting(self, name):
        return self.greeting_activities.compose_greeting("Aloha", name)


if __name__ == '__main__':
    try:
        factory = WorkerFactory("localhost", 7933, DOMAIN)
        worker = factory.new_worker(TASK_LIST)
        worker.register_workflow_implementation_type(GreetingWorkflowImpl)
        factory.start()

        client = WorkflowClient.new_client(domain=DOMAIN)
        greeting_workflow: GreetingWorkflow = client.new_workflow_stub(GreetingWorkflow)
        name = "foo"
        result = greeting_workflow.get_greeting("{}".format(name))
        print(result)
    except ActivityTaskTimeoutException as ex:
        print("Activity Task Timeout {}".format(ex))
    except WorkflowExecutionTimedOutException as ex2:
        print("Workflow Excution Timeout {}".format(ex2))


    print("Stopping workers....")
    worker.stop()
    print("Workers stopped...")
    sys.exit(0)

Question: Picking a running workflow to trigger a signal method

Hi,
I am evaluating using cadence (python) for a workflow project. An important part of the workflow is to call a 3rd party service and wait for a result to come back (with timeout). This seems like a good use for the signal method. The 3rd party service message will be received asynchronously from SQS. In this case, how do I get the right workflow instance to trigger the signal on assuming that there are several workflows running with different ids? The pattern I was thinking of using in the sqs handler is:

  • Retrieve a id from the message that maps to a running workflow id via some mechanism
  • Retrieve a running workflow with that id if one exists
  • If a workflow exists with that specific id, trigger the signal method on it.

Please let me know if this is possible, I was unable to find it from reading the code, but I might have missed something.

Can't invoke activity from WorkflowMethod

import logging
import time

from cadence.activity_method import activity_method
from cadence.workerfactory import WorkerFactory
from cadence.workflow import workflow_method, Workflow, WorkflowClient, signal_method, query_method

logging.basicConfig(level=logging.DEBUG)

TASK_LIST = "SubsActivity-python-tasklistd3sasa4552asddasdsa1sda31"
DOMAIN = "sample2"
QUOTA = 5


class SubscriptionActivity:
    @activity_method(task_list=TASK_LIST)
    def cancel(self):
        return "Cancel"


class SubscriptionActivityImpl(SubscriptionActivity):

    def cancel(self):
        return "Cancel"


class SubscriptionWfInterface:
    @workflow_method(task_list=TASK_LIST)
    async def manage_subscription(self, name):
        raise NotImplementedError

    @signal_method()
    async def decrement(self):
        raise NotImplementedError

    @signal_method()
    async def renew_subscription(self):
        raise NotImplementedError

    @query_method()
    async def get_quota(self):
        raise NotImplementedError


class SubscriptionWf(SubscriptionWfInterface):
    def __init__(self):
        self.subscription_activities: SubscriptionActivity = Workflow.new_activity_stub(SubscriptionActivity)
        self.quota = QUOTA

    async def manage_subscription(self, name):
        self.name = name
        print(f"Started workflow for {self.name}")
        print(Workflow.get_workflow_id())

        await Workflow.await_till(lambda: self.quota <= 0)
        print("Condition met..")
        # return await self.subscription_activities.cancel()

    def renew_subscription(self):
        self.quota = QUOTA

    async def decrement(self):
        self.quota = self.quota - 1

    async def get_quota(self):
        return self.quota


if __name__ == '__main__':
    factory = WorkerFactory("localhost", 7933, DOMAIN)
    worker = factory.new_worker(TASK_LIST)
    worker.register_activities_implementation(SubscriptionActivityImpl(), "SubscriptionActivity")
    worker.register_workflow_implementation_type(SubscriptionWf)
    factory.start()

    client = WorkflowClient.new_client(domain=DOMAIN)
    subs_workflow: SubscriptionWf = client.new_workflow_stub(SubscriptionWfInterface)

    wf_execution = client.start(subs_workflow.manage_subscription, "tddd3wsdsasdassdaeqsdsaw")
    time.sleep(5)
    subs_workflow2: SubscriptionWf = client.new_workflow_stub_from_workflow_id(SubscriptionWfInterface,
                                                                               wf_execution.workflow_execution.workflow_id)
    subs_workflow2.decrement()
    time.sleep(5)
    subs_workflow2.get_quota()
    subs_workflow2.decrement()
    subs_workflow2.decrement()
    subs_workflow2.decrement()
    subs_workflow2.get_quota()
    subs_workflow2.decrement()
    subs_workflow2.get_quota()
    # subs_workflow2.decrement()
    # time.sleep(1)
    # worker.stop()
    # print("Workers stopped...")
    # sys.exit(0)

In the above dummy code , If I try to invoke return await self.subscription_activities.cancel(), that throws errors such as:

DEBUG:cadence.decision_loop:[signal-task-WorkflowExecution(workflow_id='3b1cd958-4af4-4f7b-9446-af7d489be482', run_id='e9c472e6-693c-4529-8265-e8c204a9a6e2')-SubscriptionWfInterface::decrement] Created
DEBUG:cadence.decision_loop:[signal-task-WorkflowExecution(workflow_id='3b1cd958-4af4-4f7b-9446-af7d489be482', run_id='e9c472e6-693c-4529-8265-e8c204a9a6e2')-SubscriptionWfInterface::decrement] Running
INFO:cadence.decision_loop:Invoking signal SubscriptionWfInterface::decrement()
INFO:cadence.decision_loop:Signal SubscriptionWfInterface::decrement() returned None
Started workflow for tddd3wsdsasdassdaeqsdsaw
3b1cd958-4af4-4f7b-9446-af7d489be482
Condition met..
DEBUG:cadence.decision_loop:RespondDecisionTaskCompleted: RespondDecisionTaskCompletedResponse(decision_task=None)
ERROR:asyncio:Task exception was never retrieved
future: <Task finished coro=<SignalMethodTask.signal_main() done, defined at /Users/tsharma/PycharmProjects/twilio_whatsapp/venv/lib/python3.7/site-packages/cadence/decision_loop.py:320> exception=CancelledError()>
concurrent.futures._base.CancelledError
ERROR:asyncio:Task exception was never retrieved
future: <Task finished coro=<SignalMethodTask.signal_main() done, defined at /Users/tsharma/PycharmProjects/twilio_whatsapp/venv/lib/python3.7/site-packages/cadence/decision_loop.py:320> exception=CancelledError()>
concurrent.futures._base.CancelledError
ERROR:asyncio:Task exception was never retrieved
future: <Task finished coro=<SignalMethodTask.signal_main() done, defined at /Users/tsharma/PycharmProjects/twilio_whatsapp/venv/lib/python3.7/site-packages/cadence/decision_loop.py:320> exception=CancelledError()>
concurrent.futures._base.CancelledError
ERROR:asyncio:Task exception was never retrieved
future: <Task finished coro=<SignalMethodTask.signal_main() done, defined at /Users/tsharma/PycharmProjects/twilio_whatsapp/venv/lib/python3.7/site-packages/cadence/decision_loop.py:320> exception=CancelledError()>
concurrent.futures._base.CancelledError
ERROR:asyncio:Task exception was never retrieved
future: <Task finished coro=<SignalMethodTask.signal_main() done, defined at /Users/tsharma/PycharmProjects/twilio_whatsapp/venv/lib/python3.7/site-packages/cadence/decision_loop.py:320> exception=CancelledError()>
concurrent.futures._base.CancelledError
ERROR:asyncio:Task exception was never retrieved
future: <Task finished coro=<SignalMethodTask.signal_main() done, defined at /Users/tsharma/PycharmProjects/twilio_whatsapp/venv/lib/python3.7/site-packages/cadence/decision_loop.py:320> exception=CancelledError()>
concurrent.futures._base.CancelledError
ERROR:asyncio:Task exception was never retrieved
future: <Task finished coro=<SignalMethodTask.signal_main() done, defined at /Users/tsharma/PycharmProjects/twilio_whatsapp/venv/lib/python3.7/site-packages/cadence/decision_loop.py:320> exception=CancelledError()>
concurrent.futures._base.CancelledError
ERROR:asyncio:Task exception was never retrieved
future: <Task finished coro=<SignalMethodTask.signal_main() done, defined at /Users/tsharma/PycharmProjects/twilio_whatsapp/venv/lib/python3.7/site-packages/cadence/decision_loop.py:320> exception=CancelledError()>
concurrent.futures._base.CancelledError
ERROR:asyncio:Task exception was never retrieved```

not sure what's happening here, I've tried multiple ways but still stuck,

Issue with re-running workflows

Hi,
I have a simple workfow defined with 4 steps and no signals. I set the reuse id policy to allow duplicates and after running the workflow the first time, if I attempt to restart the workflow with the cadence cli, it doesn't work. This is the exception trace on the worker thread:

Exception in thread Thread-4:
Traceback (most recent call last):
  File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/ganeshan.jayaraman/.local/share/virtualenvs/cadence_test-wYdWkn-S/lib/python3.7/site-packages/cadence/decision_loop.py", line 725, in run
    decisions = self.process_task(decision_task)
  File "/Users/ganeshan.jayaraman/.local/share/virtualenvs/cadence_test-wYdWkn-S/lib/python3.7/site-packages/cadence/decision_loop.py", line 762, in process_task
    decisions: List[Decision] = decider.decide(decision_task.history.events)
  File "/Users/ganeshan.jayaraman/.local/share/virtualenvs/cadence_test-wYdWkn-S/lib/python3.7/site-packages/cadence/decision_loop.py", line 462, in decide
    self.process_decision_events(decision_events)
  File "/Users/ganeshan.jayaraman/.local/share/virtualenvs/cadence_test-wYdWkn-S/lib/python3.7/site-packages/cadence/decision_loop.py", line 471, in process_decision_events
    self.process_event(event)
  File "/Users/ganeshan.jayaraman/.local/share/virtualenvs/cadence_test-wYdWkn-S/lib/python3.7/site-packages/cadence/decision_loop.py", line 488, in process_event
    raise Exception(f"No event handler for event type {event.event_type.name}")
Exception: No event handler for event type DecisionTaskFailed

The command I used was
docker run --rm ubercadence/cli:master --address host.docker.internal:7933 --domain samples-domain workflow reset -w OR:9949881e-3c94-11ea-bc33-acde48001122 -r 8092a6e6-5ef4-4429-b54b-794de5fff680 --reset_type FirstDecisionCompleted --reason "some"
Is re-running workflows a supported use case for the python version and if so are there any other fixes to attempt to allow workflows to be executed again ?

IO Error not handeled in the code.

File "/usr/src/app/utils/cadence_workflows.py", line 97, in send_email
return exec_workflow(
File "/usr/local/lib/python3.8/site-packages/cadence/workflow.py", line 209, in exec_workflow
start_response, err = workflow_client.service.start_workflow(start_request)
File "/usr/local/lib/python3.8/site-packages/cadence/workflowservice.py", line 72, in start_workflow
return self.call_return("StartWorkflowExecution", request, StartWorkflowExecutionResponse)
File "/usr/local/lib/python3.8/site-packages/cadence/workflowservice.py", line 59, in call_return
response = self.thrift_call(method_name, request)
File "/usr/local/lib/python3.8/site-packages/cadence/workflowservice.py", line 54, in thrift_call
response = self.connection.call_function(call)
File "/usr/local/lib/python3.8/site-packages/cadence/connection.py", line 351, in call_function
self.write_frame(frame)
File "/usr/local/lib/python3.8/site-packages/cadence/connection.py", line 335, in write_frame
frame.write(self.wrapper)
File "/usr/local/lib/python3.8/site-packages/cadence/frames.py", line 76, in write
self.write_header(wrapper)
File "/usr/local/lib/python3.8/site-packages/cadence/frames.py", line 80, in write_header
fp.write_short(self.get_size())
File "/usr/local/lib/python3.8/site-packages/cadence/ioutils.py", line 51, in write_short
self.io_stream.write(v.to_bytes(2, byteorder='big', signed=False))
File "/usr/local/lib/python3.8/socket.py", line 687, in write
return self._sock.send(b)
BrokenPipeError: [Errno 32] Broken pipe

Fetching completed workflow ID

Is there any way to fetch the completed activity/workflow in the logger?

The DEBUG logger is returning the status of the workflow, created and running is returning with the workflow ID, but the completed workflow does not return the workflow ID.

I am using async in the client to hit the request, and fetching the statuses from the logs whether the workflow is created, running, completed or failed.

Thanks

[Question] Behaviour when receiving a struct / dataclass

We are considering implementing activities in Python, and workflows in Go. When calling activities in Go, it is possible to pass a struct to the activity (e.g. https://github.com/uber-common/cadence-samples/blob/56c4ffffd38e8596075c56f03f82c5a80ae77b92/cmd/samples/fileprocessing/activities.go). We are curious about what the behaviour is like when the activity is implemented using this Python client. Judging from

def json_to_args(jsonb: bytes) -> typing.Optional[list]:
, it seems like the argument will simply be a Python dictionary. If this is the case, are there any plans for automatically converting this to say a Python dataclass?

The use-case for using a single parameter (much like a single message) activity is for versioning. It has been mentioned several times, and the solution seems to always point to this approach for dealing with an activity's contract.

[question] Behaviour when Activity.do_not_complete_on_return() is called

Registered n number of activities which is part of one workflow. Workflow execution is started and activity 1 is called. I donot want activity2 to start until activity 1 completes, so used Activity.do_not_complete_on_return() inside activity implementation code. Later this activity is completed by external services.

But until activity 1 completes the process thread is blocked.

Is there anyway to unblock the thread so that it can process other events ?
and still activity 2 can be dependent on completion of activity 1.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.