firdaus / cadence-python Goto Github PK
View Code? Open in Web Editor NEWPython framework for Cadence Workflow Service
Home Page: https://cadenceworkflow.io
License: MIT License
Python framework for Cadence Workflow Service
Home Page: https://cadenceworkflow.io
License: MIT License
From examples:
import time
from cadence.exceptions import QueryFailureException
from cadence.workerfactory import WorkerFactory
from cadence.workflow import workflow_method, signal_method, Workflow, WorkflowClient, query_method
TASK_LIST = "TestStubWorkflowId"
DOMAIN = "sample2"
class GreetingException(Exception):
pass
class TestStubWorkflowId:
@query_method()
async def get_message(self) -> str:
raise NotImplementedError
@query_method()
async def get_message_fail(self) -> str:
raise NotImplementedError
@signal_method()
async def put_message(self, message):
raise NotImplementedError
@workflow_method(task_list=TASK_LIST)
async def get_greetings(self) -> list:
raise NotImplementedError
class TestStubWorkflowIdImpl(TestStubWorkflowId):
def __init__(self):
self.message = ""
async def get_message(self) -> str:
return self.message
async def get_message_fail(self) -> str:
raise GreetingException("error from query")
async def put_message(self, message):
self.message = message
async def get_greetings(self) -> list:
print("invoked!!")
self.message = "initial-message"
await Workflow.await_till(lambda: self.message == "done")
return "finished"
if __name__ == "__main__":
factory = WorkerFactory("localhost", 7933, DOMAIN)
worker = factory.new_worker(TASK_LIST)
worker.register_workflow_implementation_type(TestStubWorkflowIdImpl)
factory.start()
client = WorkflowClient.new_client(domain=DOMAIN)
workflow: TestStubWorkflowId = client.new_workflow_stub(TestStubWorkflowId)
context = WorkflowClient.start(workflow.get_greetings)
stub: TestStubWorkflowId = client.new_workflow_stub_from_workflow_id(TestStubWorkflowId,
workflow_id=context.workflow_execution.workflow_id)
stub.put_message("abc")
assert stub.get_message() == "abc"
stub.put_message("done")
assert client.wait_for_close_with_workflow_id(context.workflow_execution.workflow_id) == "finished"
print("Stopping workers")
worker.stop()
output:
/Users/tsharma/PycharmProjects/twilio_whatsapp/venv/bin/python /Users/tsharma/PycharmProjects/twilio_whatsapp/cadence_wf/test_id_again.py
invoked!!
invoked!!
invoked!!
invoked!!
Stopping workers
WorkflowMethod get_greetings is being invoked for every subsequent call to signal or query method on stub from workflow_id. not sure whether this should be happening.
It seems ActivityTaskTimeoutException is being caught in the decision_loop.py/workflow.py, but it doesn't propagate up to the caller. The Java Client catches both of these exceptions using a similar setup so the behavior seems different.
Using this sample code below, if you set the schedule_to_start_timeout_seconds shorter than the execution_start_to_close_timeout, you see the following, but it doesn't propagate up. If you set the schedule_to_start_timeout_seconds longer than the execution timeout, the client will get the WorkflowExecutionTimeoutException.
ERROR:cadence.decision_loop:Workflow GreetingWorkflow::get_greeting('Python') failed
Traceback (most recent call last):
File "/Users/rich/.pyenv/versions/dev/lib/python3.7/site-packages/cadence/decision_loop.py", line 233, in workflow_main
self.ret_value = await workflow_proc(self.workflow_instance, *self.workflow_input)
File "/Users/rich/.pyenv/versions/dev/lib/python3.7/site-packages/cadence/activity_method.py", line 62, in stub_activity_fn
return await decision_context.schedule_activity_task(parameters=parameters)
File "/Users/rich/.pyenv/versions/dev/lib/python3.7/site-packages/cadence/decision_loop.py", line 342, in schedule_activity_task
await future
cadence.exceptions.ActivityTaskTimeoutException
...
Workflow Excution Timeout
Stopping workers....
Workers stopped...
Appears that somewhere in the code, it isn't adding the ActivityTimeout to the GetWorkflowExecutionHistoryResponse. But I could be way off, but that's where the trail seemed to lead. Whether it's supposed to or not is also a question, but the Java Client does throw this exception up to the caller (although it's thrown up as WorkflowException, which I think is correct).
# Activities Interface
class GreetingActivities:
@activity_method(task_list=TASK_LIST,
schedule_to_close_timeout_seconds=100000,
schedule_to_start_timeout_seconds=5)
def compose_greeting(self, greeting: str, name: str) -> str:
raise NotImplementedError
# Workflow Interface
class GreetingWorkflow:
@workflow_method(task_list=TASK_LIST,
execution_start_to_close_timeout_seconds=1)
def get_greeting(self, name: str) -> str:
raise NotImplementedError
# Workflow Implementation
class GreetingWorkflowImpl(GreetingWorkflow):
def __init__(self):
self.greeting_activities: GreetingActivities = Workflow.new_activity_stub(GreetingActivities)
#async
def get_greeting(self, name):
return self.greeting_activities.compose_greeting("Aloha", name)
if __name__ == '__main__':
try:
factory = WorkerFactory("localhost", 7933, DOMAIN)
worker = factory.new_worker(TASK_LIST)
worker.register_workflow_implementation_type(GreetingWorkflowImpl)
factory.start()
client = WorkflowClient.new_client(domain=DOMAIN)
greeting_workflow: GreetingWorkflow = client.new_workflow_stub(GreetingWorkflow)
name = "foo"
result = greeting_workflow.get_greeting("{}".format(name))
print(result)
except ActivityTaskTimeoutException as ex:
print("Activity Task Timeout {}".format(ex))
except WorkflowExecutionTimedOutException as ex2:
print("Workflow Excution Timeout {}".format(ex2))
print("Stopping workers....")
worker.stop()
print("Workers stopped...")
sys.exit(0)
Hi,
I am evaluating using cadence (python) for a workflow project. An important part of the workflow is to call a 3rd party service and wait for a result to come back (with timeout). This seems like a good use for the signal method. The 3rd party service message will be received asynchronously from SQS. In this case, how do I get the right workflow instance to trigger the signal on assuming that there are several workflows running with different ids? The pattern I was thinking of using in the sqs handler is:
Please let me know if this is possible, I was unable to find it from reading the code, but I might have missed something.
import logging
import time
from cadence.activity_method import activity_method
from cadence.workerfactory import WorkerFactory
from cadence.workflow import workflow_method, Workflow, WorkflowClient, signal_method, query_method
logging.basicConfig(level=logging.DEBUG)
TASK_LIST = "SubsActivity-python-tasklistd3sasa4552asddasdsa1sda31"
DOMAIN = "sample2"
QUOTA = 5
class SubscriptionActivity:
@activity_method(task_list=TASK_LIST)
def cancel(self):
return "Cancel"
class SubscriptionActivityImpl(SubscriptionActivity):
def cancel(self):
return "Cancel"
class SubscriptionWfInterface:
@workflow_method(task_list=TASK_LIST)
async def manage_subscription(self, name):
raise NotImplementedError
@signal_method()
async def decrement(self):
raise NotImplementedError
@signal_method()
async def renew_subscription(self):
raise NotImplementedError
@query_method()
async def get_quota(self):
raise NotImplementedError
class SubscriptionWf(SubscriptionWfInterface):
def __init__(self):
self.subscription_activities: SubscriptionActivity = Workflow.new_activity_stub(SubscriptionActivity)
self.quota = QUOTA
async def manage_subscription(self, name):
self.name = name
print(f"Started workflow for {self.name}")
print(Workflow.get_workflow_id())
await Workflow.await_till(lambda: self.quota <= 0)
print("Condition met..")
# return await self.subscription_activities.cancel()
def renew_subscription(self):
self.quota = QUOTA
async def decrement(self):
self.quota = self.quota - 1
async def get_quota(self):
return self.quota
if __name__ == '__main__':
factory = WorkerFactory("localhost", 7933, DOMAIN)
worker = factory.new_worker(TASK_LIST)
worker.register_activities_implementation(SubscriptionActivityImpl(), "SubscriptionActivity")
worker.register_workflow_implementation_type(SubscriptionWf)
factory.start()
client = WorkflowClient.new_client(domain=DOMAIN)
subs_workflow: SubscriptionWf = client.new_workflow_stub(SubscriptionWfInterface)
wf_execution = client.start(subs_workflow.manage_subscription, "tddd3wsdsasdassdaeqsdsaw")
time.sleep(5)
subs_workflow2: SubscriptionWf = client.new_workflow_stub_from_workflow_id(SubscriptionWfInterface,
wf_execution.workflow_execution.workflow_id)
subs_workflow2.decrement()
time.sleep(5)
subs_workflow2.get_quota()
subs_workflow2.decrement()
subs_workflow2.decrement()
subs_workflow2.decrement()
subs_workflow2.get_quota()
subs_workflow2.decrement()
subs_workflow2.get_quota()
# subs_workflow2.decrement()
# time.sleep(1)
# worker.stop()
# print("Workers stopped...")
# sys.exit(0)
In the above dummy code , If I try to invoke return await self.subscription_activities.cancel()
, that throws errors such as:
DEBUG:cadence.decision_loop:[signal-task-WorkflowExecution(workflow_id='3b1cd958-4af4-4f7b-9446-af7d489be482', run_id='e9c472e6-693c-4529-8265-e8c204a9a6e2')-SubscriptionWfInterface::decrement] Created
DEBUG:cadence.decision_loop:[signal-task-WorkflowExecution(workflow_id='3b1cd958-4af4-4f7b-9446-af7d489be482', run_id='e9c472e6-693c-4529-8265-e8c204a9a6e2')-SubscriptionWfInterface::decrement] Running
INFO:cadence.decision_loop:Invoking signal SubscriptionWfInterface::decrement()
INFO:cadence.decision_loop:Signal SubscriptionWfInterface::decrement() returned None
Started workflow for tddd3wsdsasdassdaeqsdsaw
3b1cd958-4af4-4f7b-9446-af7d489be482
Condition met..
DEBUG:cadence.decision_loop:RespondDecisionTaskCompleted: RespondDecisionTaskCompletedResponse(decision_task=None)
ERROR:asyncio:Task exception was never retrieved
future: <Task finished coro=<SignalMethodTask.signal_main() done, defined at /Users/tsharma/PycharmProjects/twilio_whatsapp/venv/lib/python3.7/site-packages/cadence/decision_loop.py:320> exception=CancelledError()>
concurrent.futures._base.CancelledError
ERROR:asyncio:Task exception was never retrieved
future: <Task finished coro=<SignalMethodTask.signal_main() done, defined at /Users/tsharma/PycharmProjects/twilio_whatsapp/venv/lib/python3.7/site-packages/cadence/decision_loop.py:320> exception=CancelledError()>
concurrent.futures._base.CancelledError
ERROR:asyncio:Task exception was never retrieved
future: <Task finished coro=<SignalMethodTask.signal_main() done, defined at /Users/tsharma/PycharmProjects/twilio_whatsapp/venv/lib/python3.7/site-packages/cadence/decision_loop.py:320> exception=CancelledError()>
concurrent.futures._base.CancelledError
ERROR:asyncio:Task exception was never retrieved
future: <Task finished coro=<SignalMethodTask.signal_main() done, defined at /Users/tsharma/PycharmProjects/twilio_whatsapp/venv/lib/python3.7/site-packages/cadence/decision_loop.py:320> exception=CancelledError()>
concurrent.futures._base.CancelledError
ERROR:asyncio:Task exception was never retrieved
future: <Task finished coro=<SignalMethodTask.signal_main() done, defined at /Users/tsharma/PycharmProjects/twilio_whatsapp/venv/lib/python3.7/site-packages/cadence/decision_loop.py:320> exception=CancelledError()>
concurrent.futures._base.CancelledError
ERROR:asyncio:Task exception was never retrieved
future: <Task finished coro=<SignalMethodTask.signal_main() done, defined at /Users/tsharma/PycharmProjects/twilio_whatsapp/venv/lib/python3.7/site-packages/cadence/decision_loop.py:320> exception=CancelledError()>
concurrent.futures._base.CancelledError
ERROR:asyncio:Task exception was never retrieved
future: <Task finished coro=<SignalMethodTask.signal_main() done, defined at /Users/tsharma/PycharmProjects/twilio_whatsapp/venv/lib/python3.7/site-packages/cadence/decision_loop.py:320> exception=CancelledError()>
concurrent.futures._base.CancelledError
ERROR:asyncio:Task exception was never retrieved
future: <Task finished coro=<SignalMethodTask.signal_main() done, defined at /Users/tsharma/PycharmProjects/twilio_whatsapp/venv/lib/python3.7/site-packages/cadence/decision_loop.py:320> exception=CancelledError()>
concurrent.futures._base.CancelledError
ERROR:asyncio:Task exception was never retrieved```
not sure what's happening here, I've tried multiple ways but still stuck,
Python 3.8
ImportError: cannot import name 'CancelledError' from 'asyncio.base_futures'
Hi,
I have a simple workfow defined with 4 steps and no signals. I set the reuse id policy to allow duplicates and after running the workflow the first time, if I attempt to restart the workflow with the cadence cli, it doesn't work. This is the exception trace on the worker thread:
Exception in thread Thread-4:
Traceback (most recent call last):
File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/threading.py", line 926, in _bootstrap_inner
self.run()
File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "/Users/ganeshan.jayaraman/.local/share/virtualenvs/cadence_test-wYdWkn-S/lib/python3.7/site-packages/cadence/decision_loop.py", line 725, in run
decisions = self.process_task(decision_task)
File "/Users/ganeshan.jayaraman/.local/share/virtualenvs/cadence_test-wYdWkn-S/lib/python3.7/site-packages/cadence/decision_loop.py", line 762, in process_task
decisions: List[Decision] = decider.decide(decision_task.history.events)
File "/Users/ganeshan.jayaraman/.local/share/virtualenvs/cadence_test-wYdWkn-S/lib/python3.7/site-packages/cadence/decision_loop.py", line 462, in decide
self.process_decision_events(decision_events)
File "/Users/ganeshan.jayaraman/.local/share/virtualenvs/cadence_test-wYdWkn-S/lib/python3.7/site-packages/cadence/decision_loop.py", line 471, in process_decision_events
self.process_event(event)
File "/Users/ganeshan.jayaraman/.local/share/virtualenvs/cadence_test-wYdWkn-S/lib/python3.7/site-packages/cadence/decision_loop.py", line 488, in process_event
raise Exception(f"No event handler for event type {event.event_type.name}")
Exception: No event handler for event type DecisionTaskFailed
The command I used was
docker run --rm ubercadence/cli:master --address host.docker.internal:7933 --domain samples-domain workflow reset -w OR:9949881e-3c94-11ea-bc33-acde48001122 -r 8092a6e6-5ef4-4429-b54b-794de5fff680 --reset_type FirstDecisionCompleted --reason "some"
Is re-running workflows a supported use case for the python version and if so are there any other fixes to attempt to allow workflows to be executed again ?
File "/usr/src/app/utils/cadence_workflows.py", line 97, in send_email
return exec_workflow(
File "/usr/local/lib/python3.8/site-packages/cadence/workflow.py", line 209, in exec_workflow
start_response, err = workflow_client.service.start_workflow(start_request)
File "/usr/local/lib/python3.8/site-packages/cadence/workflowservice.py", line 72, in start_workflow
return self.call_return("StartWorkflowExecution", request, StartWorkflowExecutionResponse)
File "/usr/local/lib/python3.8/site-packages/cadence/workflowservice.py", line 59, in call_return
response = self.thrift_call(method_name, request)
File "/usr/local/lib/python3.8/site-packages/cadence/workflowservice.py", line 54, in thrift_call
response = self.connection.call_function(call)
File "/usr/local/lib/python3.8/site-packages/cadence/connection.py", line 351, in call_function
self.write_frame(frame)
File "/usr/local/lib/python3.8/site-packages/cadence/connection.py", line 335, in write_frame
frame.write(self.wrapper)
File "/usr/local/lib/python3.8/site-packages/cadence/frames.py", line 76, in write
self.write_header(wrapper)
File "/usr/local/lib/python3.8/site-packages/cadence/frames.py", line 80, in write_header
fp.write_short(self.get_size())
File "/usr/local/lib/python3.8/site-packages/cadence/ioutils.py", line 51, in write_short
self.io_stream.write(v.to_bytes(2, byteorder='big', signed=False))
File "/usr/local/lib/python3.8/socket.py", line 687, in write
return self._sock.send(b)
BrokenPipeError: [Errno 32] Broken pipe
Is there any way to fetch the completed activity/workflow in the logger?
The DEBUG logger is returning the status of the workflow, created and running is returning with the workflow ID, but the completed workflow does not return the workflow ID.
I am using async in the client to hit the request, and fetching the statuses from the logs whether the workflow is created, running, completed or failed.
Thanks
As a reference, the equivalent code in the cadence-java-client is in
com.uber.cadence.internal.common.WorkflowExecutionUtils#getInstanceCloseEvent()
We are considering implementing activities in Python, and workflows in Go. When calling activities in Go, it is possible to pass a struct to the activity (e.g. https://github.com/uber-common/cadence-samples/blob/56c4ffffd38e8596075c56f03f82c5a80ae77b92/cmd/samples/fileprocessing/activities.go). We are curious about what the behaviour is like when the activity is implemented using this Python client. Judging from
cadence-python/cadence/conversions.py
Line 125 in 729420c
The use-case for using a single parameter (much like a single message) activity is for versioning. It has been mentioned several times, and the solution seems to always point to this approach for dealing with an activity's contract.
Registered n number of activities which is part of one workflow. Workflow execution is started and activity 1 is called. I donot want activity2 to start until activity 1 completes, so used Activity.do_not_complete_on_return() inside activity implementation code. Later this activity is completed by external services.
But until activity 1 completes the process thread is blocked.
Is there anyway to unblock the thread so that it can process other events ?
and still activity 2 can be dependent on completion of activity 1.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.