rsalmei / clearly Goto Github PK
View Code? Open in Web Editor NEWClearly see and debug your celery cluster in real time!
License: MIT License
Clearly see and debug your celery cluster in real time!
License: MIT License
Hi, We are using a custom named event queue (set by the event_exchange option in celery), but from what I can see, there is no way of configuring event_exchange in clearly at this point. Can you support configuring the event_exchange?
Also, we have multiple subapplication, and we are considering splitting event exchange on a per-subapplication basis. Would it be possible to support multiple event exchanges in clearly (so that we only need one cleary instance?
Thank you,
As stated in PR #43
To give a little context, I have a couple of Celery tasks. One that processes and aggregates chunks of data and another that gets called when all of the data has been aggregated (my callback task). When my callback task gets called with all of the data as the task payload, Clearly chokes and stops listening for events. Are there any settings or flags I can pass in to gracefully ignore or recover and continue monitoring the tasks?
19:52:52.926 SUCCESS 0 v2.tasks.process_data.create_data_processors d4add4a5-1396-4697-8ff2-52cc3c21e681
19:52:52.926 STARTED 0 v2.tasks.process_data.chunk_processor ba383a3d-a7be-4044-b2bd-e0f63bd510b4
19:52:52.928 RECEIVED 0 v2.tasks.process_data.chunk_processor ab7741dc-cd3e-494d-929d-169bbc6c164b
19:52:52.929 RECEIVED 0 v2.tasks.process_data.chunk_processor ddae671f-ccbf-4b18-9b29-c69c20e9617a
19:52:53.012 STARTED 0 v2.tasks.process_data.chunk_processor 405ffbb1-4316-4107-8386-4cf6f53879fc
19:52:53.014 RECEIVED 0 v2.tasks.process_data.chunk_processor d4cc2c42-e5ca-4309-b533-5e65a5980ee7
19:52:53.114 STARTED 0 v2.tasks.process_data.chunk_processor 1bb463af-74de-46d7-ba25-a85bca290914
19:52:53.116 RECEIVED 0 v2.tasks.process_data.chunk_processor b7b0ddd3-a767-4cc2-ad76-a08c1a969147
19:52:53.222 STARTED 0 v2.tasks.process_data.chunk_processor ab7741dc-cd3e-494d-929d-169bbc6c164b
19:52:53.223 STARTED 0 v2.tasks.process_data.chunk_processor ddae671f-ccbf-4b18-9b29-c69c20e9617a
19:52:53.224 STARTED 0 v2.tasks.process_data.chunk_processor d4cc2c42-e5ca-4309-b533-5e65a5980ee7
19:52:53.228 STARTED 0 v2.tasks.process_data.chunk_processor b7b0ddd3-a767-4cc2-ad76-a08c1a969147
19:52:56.302 SUCCESS 0 v2.tasks.process_data.chunk_processor b7b0ddd3-a767-4cc2-ad76-a08c1a969147
19:52:59.080 SUCCESS 0 v2.tasks.process_data.chunk_processor ba383a3d-a7be-4044-b2bd-e0f63bd510b4
19:52:59.136 SUCCESS 0 v2.tasks.process_data.chunk_processor 405ffbb1-4316-4107-8386-4cf6f53879fc
# --> my callback task happened here
Server communication error: Received message larger than max (4764885 vs. 4194304) (StatusCode.RESOURCE_EXHAUSTED)
Hi,
I'm hitting an error in the clearly listener that stops new tasks from being processed. When using cli.workers()
some workers have sw:
.
Here's the traceback:
File "/usr/local/lib/python3.7/threading.py", line 926, in _bootstrap_inner
self.run()
File "/usr/local/lib/python3.7/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/lib/python3.7/site-packages/clearly/event_core/event_listener.py", line 103, in __run_listener
self._celery_receiver.capture(limit=None, timeout=None, wakeup=True)
File "/usr/local/lib/python3.7/site-packages/celery/events/receiver.py", line 93, in capture
return list(self.consume(limit=limit, timeout=timeout, wakeup=wakeup))
File "/usr/local/lib/python3.7/site-packages/kombu/mixins.py", line 197, in consume
conn.drain_events(timeout=safety_interval)
File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 321, in drain_events
return self.transport.drain_events(self.connection, **kwargs)
File "/usr/local/lib/python3.7/site-packages/kombu/transport/virtual/base.py", line 979, in drain_events
get(self._deliver, timeout=timeout)
File "/usr/local/lib/python3.7/site-packages/kombu/transport/redis.py", line 370, in get
ret = self.handle_event(fileno, event)
File "/usr/local/lib/python3.7/site-packages/kombu/transport/redis.py", line 352, in handle_event
return self.on_readable(fileno), self
File "/usr/local/lib/python3.7/site-packages/kombu/transport/redis.py", line 348, in on_readable
chan.handlers[type]()
File "/usr/local/lib/python3.7/site-packages/kombu/transport/redis.py", line 679, in _receive
ret.append(self._receive_one(c))
File "/usr/local/lib/python3.7/site-packages/kombu/transport/redis.py", line 709, in _receive_one
message, self._fanout_to_queue[exchange])
File "/usr/local/lib/python3.7/site-packages/kombu/transport/virtual/base.py", line 999, in _deliver
callback(message)
File "/usr/local/lib/python3.7/site-packages/kombu/transport/virtual/base.py", line 633, in _callback
return callback(message)
File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 624, in _receive_callback
return on_m(message) if on_m else self.receive(decoded, message)
File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 590, in receive
[callback(body, message) for callback in callbacks]
File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 590, in <listcomp>
[callback(body, message) for callback in callbacks]
File "/usr/local/lib/python3.7/site-packages/celery/events/receiver.py", line 132, in _receive
self.process(*self.event_from_message(body))
File "/usr/local/lib/python3.7/site-packages/celery/events/receiver.py", line 71, in process
handler and handler(event)
File "/usr/local/lib/python3.7/site-packages/clearly/event_core/event_listener.py", line 110, in _process_event
data = self._process_task_event(event)
File "/usr/local/lib/python3.7/site-packages/clearly/event_core/event_listener.py", line 126, in _process_task_event
task.result = EventListener.compile_task_result(task)
File "/usr/local/lib/python3.7/site-packages/clearly/event_core/event_listener.py", line 144, in compile_task_result
if task.worker.sw_ver < '4':
TypeError: '<' not supported between instances of 'NoneType' and 'str'
Do you know why our workers might be reporting an empty sw_ver
?
I'm using:
clearly: 0.7.0
celery: 4.4.0rc3
python: 3.7
Thanks.
Any interest making a front end for this?
I tend to be a very visual person, and would really like a better alternative to flower.
Would that makes sense?
Hello,
Is there any way to send these logs to Logstash with Clearly container?
I mean using something like these :
https://logz.io/blog/python-logs-elk-elastic-stack/
https://python-logstash-async.readthedocs.io/en/stable/
I'm working on some polishing of clearly in my fork and noticed errors with 3.7 in the tests?
e.g.
___________________________________________________ test_expected_states_task[STARTED-STARTED-expected21] ____________________________________________________
self = <clearly.expected_state.ExpectedStateHandler object at 0x7f5b41078a20>, pre = 'STARTED', post = 'STARTED'
def states_through(self, pre, post):
if pre == post:
> raise StopIteration
E StopIteration
clearly/expected_state.py:28: StopIteration
Ideas on how to test?
Tests should include:
https://readthedocs.org/ is the best I've seen. We can link click commands out to sphinx as I do in pyborg...
Hey bud, found a bug with latest version :)
# clearly server $BROKER_URL
2020-02-04 21:58:57,871 clearly.core.event_listener INFO Creating EventListener: max_tasks=10000; max_workers=100
2020-02-04 21:58:57,871 clearly.core.event_listener INFO Celery broker=<url>; backend=None; using_result_backend=False
2020-02-04 21:58:57,872 clearly.core.event_listener INFO Starting listener: <Thread(clearly-listener, started daemon 140312796280576)>
2020-02-04 21:58:57,898 clearly.core.streaming_dispatcher INFO Creating StreamingDispatcher
2020-02-04 21:58:57,899 clearly.core.streaming_dispatcher INFO Starting dispatcher: <Thread(clearly-dispatcher, started daemon 140312785483520)>
2020-02-04 21:58:57,900 clearly.server INFO Creating ClearlyServer
2020-02-04 21:58:57,900 clearly.server INFO Initiating gRPC server: port=12223
2020-02-04 21:58:57,911 clearly.server INFO gRPC server ok
2020-02-04 21:58:58,058 celery.events.state WARNING Substantial drift from <serverX> may mean clocks are out of sync. Current drift is
10801 seconds. [orig: 2020-02-04 18:58:57.925612 recv: 2020-02-04 21:58:58.058112]
2020-02-04 21:58:58,060 celery.events.state WARNING Substantial drift from <serverX> may mean clocks are out of sync. Current drift is
10801 seconds. [orig: 2020-02-04 18:58:57.944250 recv: 2020-02-04 21:58:58.060834]
2020-02-04 21:58:58,062 celery.events.state WARNING Substantial drift from <serverX> may mean clocks are out of sync. Current drift is
10801 seconds. [orig: 2020-02-04 18:58:57.945517 recv: 2020-02-04 21:58:58.061910]
2020-02-04 21:58:58,064 celery.events.state WARNING Substantial drift from <serverX> may mean clocks are out of sync. Current drift is
10801 seconds. [orig: 2020-02-04 18:58:57.971172 recv: 2020-02-04 21:58:58.064185]
2020-02-04 21:58:58,065 celery.events.state WARNING Substantial drift from <serverX> may mean clocks are out of sync. Current drift is
10801 seconds. [orig: 2020-02-04 18:58:57.998381 recv: 2020-02-04 21:58:58.065216]
2020-02-04 21:58:58,065 celery.events.state WARNING Substantial drift from <serverX> may mean clocks are out of sync. Current drift is
10800 seconds. [orig: 2020-02-04 18:58:58.000780 recv: 2020-02-04 21:58:58.065810]
2020-02-04 21:58:58,066 amqp WARNING Received method (60, 60) during closing channel 1. This method will be ignored
2020-02-04 21:58:58,066 amqp WARNING Received method (60, 60) during closing channel 1. This method will be ignored
2020-02-04 21:58:58,066 amqp WARNING Received method (60, 60) during closing channel 1. This method will be ignored
2020-02-04 21:58:58,067 amqp WARNING Received method (60, 60) during closing channel 1. This method will be ignored
2020-02-04 21:58:58,067 amqp WARNING Received method (60, 60) during closing channel 1. This method will be ignored
2020-02-04 21:58:58,067 amqp WARNING Received method (60, 60) during closing channel 1. This method will be ignored
2020-02-04 21:58:58,067 amqp WARNING Received method (60, 60) during closing channel 1. This method will be ignored
2020-02-04 21:58:58,068 amqp WARNING Received method (60, 31) during closing channel 1. This method will be ignored
Exception in thread clearly-listener:
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 801, in __bootstrap_inner
self.run()
File "/usr/lib/python2.7/threading.py", line 754, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/local/lib/python2.7/dist-packages/clearly/event_core/event_listener.py", line 112, in __run_listener
self._celery_receiver.capture(limit=None, timeout=None, wakeup=True)
File "/usr/local/lib/python2.7/dist-packages/celery/events/receiver.py", line 93, in capture
return list(self.consume(limit=limit, timeout=timeout, wakeup=wakeup))
File "/usr/local/lib/python2.7/dist-packages/kombu/mixins.py", line 197, in consume
conn.drain_events(timeout=safety_interval)
File "/usr/local/lib/python2.7/dist-packages/kombu/connection.py", line 323, in drain_events
return self.transport.drain_events(self.connection, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/kombu/transport/pyamqp.py", line 103, in drain_events
return connection.drain_events(**kwargs)
File "/usr/local/lib/python2.7/dist-packages/amqp/connection.py", line 505, in drain_events
while not self.blocking_read(timeout):
File "/usr/local/lib/python2.7/dist-packages/amqp/connection.py", line 511, in blocking_read
return self.on_inbound_frame(frame)
File "/usr/local/lib/python2.7/dist-packages/amqp/method_framing.py", line 79, in on_frame
callback(channel, msg.frame_method, msg.frame_args, msg)
File "/usr/local/lib/python2.7/dist-packages/amqp/connection.py", line 518, in on_inbound_method
method_sig, payload, content,
File "/usr/local/lib/python2.7/dist-packages/amqp/abstract_channel.py", line 145, in dispatch_method
listener(*args)
File "/usr/local/lib/python2.7/dist-packages/amqp/channel.py", line 1615, in _on_basic_deliver
fun(msg)
File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 624, in _receive_callback
return on_m(message) if on_m else self.receive(decoded, message)
File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 590, in receive
[callback(body, message) for callback in callbacks]
File "/usr/local/lib/python2.7/dist-packages/celery/events/receiver.py", line 130, in _receive
[process(*from_message(event)) for event in body]
File "/usr/local/lib/python2.7/dist-packages/celery/events/receiver.py", line 71, in process
handler and handler(event)
File "/usr/local/lib/python2.7/dist-packages/clearly/event_core/event_listener.py", line 119, in _process_event
data = self._process_task_event(event)
File "/usr/local/lib/python2.7/dist-packages/clearly/event_core/event_listener.py", line 135, in _process_task_event
task.result = EventListener.compile_task_result(task)
File "/usr/local/lib/python2.7/dist-packages/clearly/event_core/event_listener.py", line 157, in compile_task_result
safe_compile_text(result, raises=True)
File "/usr/local/lib/python2.7/dist-packages/clearly/safe_compiler.py", line 77, in safe_compile_text
txt = NON_PRINTABLE_PATTERN.sub(_encode_to_hex, txt)
TypeError: expected string or buffer
Hugs :)
Hello,
I can't get to start the clearly server on ubuntu 16.04 /python 3.6.8. I got this error:
from google.protobuf import symbol_database as _symbol_database
File "/var/www/deddd/lib/python3.6/site-packages/google/protobuf/symbol_database.py", line 193, in <module>
_DEFAULT = SymbolDatabase(pool=descriptor_pool.Default())
AttributeError: module 'google.protobuf.descriptor_pool' has no attribute 'Default'
I used this command to start the server:
clearly server amqp://localhost
I was interested in the status or possible roadmap for Celery 5 support
Hello - I have been trying out the docker version of clearly with our instance of celery, backed by rabbitmq. I made the recommended change to the worker invocation to enable events. The server starts up with the following output
2020-04-13 14:54:17,376 clearly.event_core.event_listener INFO Creating EventListener: max_tasks=10000; max_workers=100
Celery broker=amqp://production:**@169.254.255.254:5672; backend=None; using_result_backend=False
2020-04-13 14:54:17,378 clearly.event_core.event_listener INFO Starting listener: <Thread(clearly-listener, started daemon 140432189708032)>
2020-04-13 14:54:17,415 clearly.event_core.streaming_dispatcher INFO Creating StreamingDispatcher
2020-04-13 14:54:17,418 clearly.event_core.streaming_dispatcher INFO Starting dispatcher: <Thread(clearly-dispatcher, started daemon 140432180209408)>
2020-04-13 14:54:17,419 clearly.server INFO Creating ClearlyServer
2020-04-13 14:54:17,419 clearly.server INFO Initiating gRPC server: port=12223
2020-04-13 14:54:17,428 clearly.server INFO gRPC server ok
After I start the client, I can query the workers and the output seems legit
In [3]: clearlycli.workers()
celery@10-65-70-179-useast1aprod 6
sw: Linux py-celery 3.1.20
load: [1.34, 1.22, 1.25] processed: 84998
fetched: 1 in 974.91us (1025.73/s)
However, when I try to just clearlycli.capture()
it seems to hang and never provides any output. When I try to query tasks already captured by the server, I don't get anything, even though I can check our application logs and see new output and new results of processing:
In [4]: clearlycli.tasks()
fetched: 0 in 431.52us (-)
Also the server output doesn't indicate any connection by the client, which seems strange to me, but maybe that's normal.
I'd be grateful for any advice you have. Clearly looks like it could be helpful.
Best regards,
Steve
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.