Giter Club home page Giter Club logo

clearly's Issues

Support custom Event exchange / multiple exchanges

Hi, We are using a custom named event queue (set by the event_exchange option in celery), but from what I can see, there is no way of configuring event_exchange in clearly at this point. Can you support configuring the event_exchange?

Also, we have multiple subapplication, and we are considering splitting event exchange on a per-subapplication basis. Would it be possible to support multiple event exchanges in clearly (so that we only need one cleary instance?

Thank you,

Possible to gracefully recover from StatusCode.RESOURCE_EXHAUSTED?

To give a little context, I have a couple of Celery tasks. One that processes and aggregates chunks of data and another that gets called when all of the data has been aggregated (my callback task). When my callback task gets called with all of the data as the task payload, Clearly chokes and stops listening for events. Are there any settings or flags I can pass in to gracefully ignore or recover and continue monitoring the tasks?

19:52:52.926  SUCCESS 0 v2.tasks.process_data.create_data_processors d4add4a5-1396-4697-8ff2-52cc3c21e681
19:52:52.926  STARTED 0 v2.tasks.process_data.chunk_processor ba383a3d-a7be-4044-b2bd-e0f63bd510b4
19:52:52.928 RECEIVED 0 v2.tasks.process_data.chunk_processor ab7741dc-cd3e-494d-929d-169bbc6c164b
19:52:52.929 RECEIVED 0 v2.tasks.process_data.chunk_processor ddae671f-ccbf-4b18-9b29-c69c20e9617a
19:52:53.012  STARTED 0 v2.tasks.process_data.chunk_processor 405ffbb1-4316-4107-8386-4cf6f53879fc
19:52:53.014 RECEIVED 0 v2.tasks.process_data.chunk_processor d4cc2c42-e5ca-4309-b533-5e65a5980ee7
19:52:53.114  STARTED 0 v2.tasks.process_data.chunk_processor 1bb463af-74de-46d7-ba25-a85bca290914
19:52:53.116 RECEIVED 0 v2.tasks.process_data.chunk_processor b7b0ddd3-a767-4cc2-ad76-a08c1a969147
19:52:53.222  STARTED 0 v2.tasks.process_data.chunk_processor ab7741dc-cd3e-494d-929d-169bbc6c164b
19:52:53.223  STARTED 0 v2.tasks.process_data.chunk_processor ddae671f-ccbf-4b18-9b29-c69c20e9617a
19:52:53.224  STARTED 0 v2.tasks.process_data.chunk_processor d4cc2c42-e5ca-4309-b533-5e65a5980ee7
19:52:53.228  STARTED 0 v2.tasks.process_data.chunk_processor b7b0ddd3-a767-4cc2-ad76-a08c1a969147
19:52:56.302  SUCCESS 0 v2.tasks.process_data.chunk_processor b7b0ddd3-a767-4cc2-ad76-a08c1a969147
19:52:59.080  SUCCESS 0 v2.tasks.process_data.chunk_processor ba383a3d-a7be-4044-b2bd-e0f63bd510b4
19:52:59.136  SUCCESS 0 v2.tasks.process_data.chunk_processor 405ffbb1-4316-4107-8386-4cf6f53879fc

# --> my callback task happened here

Server communication error: Received message larger than max (4764885 vs. 4194304) (StatusCode.RESOURCE_EXHAUSTED)

Exception in thread clearly-listener: task.worker.sw_ver is None

Hi,

I'm hitting an error in the clearly listener that stops new tasks from being processed. When using cli.workers() some workers have sw: .

Here's the traceback:

  File "/usr/local/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/usr/local/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.7/site-packages/clearly/event_core/event_listener.py", line 103, in __run_listener
    self._celery_receiver.capture(limit=None, timeout=None, wakeup=True)
  File "/usr/local/lib/python3.7/site-packages/celery/events/receiver.py", line 93, in capture
    return list(self.consume(limit=limit, timeout=timeout, wakeup=wakeup))
  File "/usr/local/lib/python3.7/site-packages/kombu/mixins.py", line 197, in consume
    conn.drain_events(timeout=safety_interval)
  File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 321, in drain_events
    return self.transport.drain_events(self.connection, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/kombu/transport/virtual/base.py", line 979, in drain_events
    get(self._deliver, timeout=timeout)
  File "/usr/local/lib/python3.7/site-packages/kombu/transport/redis.py", line 370, in get
    ret = self.handle_event(fileno, event)
  File "/usr/local/lib/python3.7/site-packages/kombu/transport/redis.py", line 352, in handle_event
    return self.on_readable(fileno), self
  File "/usr/local/lib/python3.7/site-packages/kombu/transport/redis.py", line 348, in on_readable
    chan.handlers[type]()
  File "/usr/local/lib/python3.7/site-packages/kombu/transport/redis.py", line 679, in _receive
    ret.append(self._receive_one(c))
  File "/usr/local/lib/python3.7/site-packages/kombu/transport/redis.py", line 709, in _receive_one
    message, self._fanout_to_queue[exchange])
  File "/usr/local/lib/python3.7/site-packages/kombu/transport/virtual/base.py", line 999, in _deliver
    callback(message)
  File "/usr/local/lib/python3.7/site-packages/kombu/transport/virtual/base.py", line 633, in _callback
    return callback(message)
  File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 624, in _receive_callback
    return on_m(message) if on_m else self.receive(decoded, message)
  File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 590, in receive
    [callback(body, message) for callback in callbacks]
  File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 590, in <listcomp>
    [callback(body, message) for callback in callbacks]
  File "/usr/local/lib/python3.7/site-packages/celery/events/receiver.py", line 132, in _receive
    self.process(*self.event_from_message(body))
  File "/usr/local/lib/python3.7/site-packages/celery/events/receiver.py", line 71, in process
    handler and handler(event)
  File "/usr/local/lib/python3.7/site-packages/clearly/event_core/event_listener.py", line 110, in _process_event
    data = self._process_task_event(event)
  File "/usr/local/lib/python3.7/site-packages/clearly/event_core/event_listener.py", line 126, in _process_task_event
    task.result = EventListener.compile_task_result(task)
  File "/usr/local/lib/python3.7/site-packages/clearly/event_core/event_listener.py", line 144, in compile_task_result
    if task.worker.sw_ver < '4':
TypeError: '<' not supported between instances of 'NoneType' and 'str'

Do you know why our workers might be reporting an empty sw_ver?

I'm using:
clearly: 0.7.0
celery: 4.4.0rc3
python: 3.7

Thanks.

Frontend?

Any interest making a front end for this?
I tend to be a very visual person, and would really like a better alternative to flower.
Would that makes sense?

3.7 Support

I'm working on some polishing of clearly in my fork and noticed errors with 3.7 in the tests?

e.g.

___________________________________________________ test_expected_states_task[STARTED-STARTED-expected21] ____________________________________________________

self = <clearly.expected_state.ExpectedStateHandler object at 0x7f5b41078a20>, pre = 'STARTED', post = 'STARTED'

    def states_through(self, pre, post):
        if pre == post:
>           raise StopIteration
E           StopIteration

clearly/expected_state.py:28: StopIteration

TypeError: expected string or buffer on txt = NON_PRINTABLE_PATTERN.sub(_encode_to_hex, txt)

Hey bud, found a bug with latest version :)

# clearly server $BROKER_URL 
2020-02-04 21:58:57,871 clearly.core.event_listener INFO Creating EventListener: max_tasks=10000; max_workers=100
2020-02-04 21:58:57,871 clearly.core.event_listener INFO Celery broker=<url>; backend=None; using_result_backend=False
2020-02-04 21:58:57,872 clearly.core.event_listener INFO Starting listener: <Thread(clearly-listener, started daemon 140312796280576)>
2020-02-04 21:58:57,898 clearly.core.streaming_dispatcher INFO Creating StreamingDispatcher
2020-02-04 21:58:57,899 clearly.core.streaming_dispatcher INFO Starting dispatcher: <Thread(clearly-dispatcher, started daemon 140312785483520)>
2020-02-04 21:58:57,900 clearly.server INFO Creating ClearlyServer
2020-02-04 21:58:57,900 clearly.server INFO Initiating gRPC server: port=12223
2020-02-04 21:58:57,911 clearly.server INFO gRPC server ok
2020-02-04 21:58:58,058 celery.events.state WARNING Substantial drift from <serverX> may mean clocks are out of sync.  Current drift is
10801 seconds.  [orig: 2020-02-04 18:58:57.925612 recv: 2020-02-04 21:58:58.058112]

2020-02-04 21:58:58,060 celery.events.state WARNING Substantial drift from <serverX> may mean clocks are out of sync.  Current drift is
10801 seconds.  [orig: 2020-02-04 18:58:57.944250 recv: 2020-02-04 21:58:58.060834]

2020-02-04 21:58:58,062 celery.events.state WARNING Substantial drift from <serverX> may mean clocks are out of sync.  Current drift is
10801 seconds.  [orig: 2020-02-04 18:58:57.945517 recv: 2020-02-04 21:58:58.061910]

2020-02-04 21:58:58,064 celery.events.state WARNING Substantial drift from <serverX> may mean clocks are out of sync.  Current drift is
10801 seconds.  [orig: 2020-02-04 18:58:57.971172 recv: 2020-02-04 21:58:58.064185]

2020-02-04 21:58:58,065 celery.events.state WARNING Substantial drift from <serverX> may mean clocks are out of sync.  Current drift is
10801 seconds.  [orig: 2020-02-04 18:58:57.998381 recv: 2020-02-04 21:58:58.065216]

2020-02-04 21:58:58,065 celery.events.state WARNING Substantial drift from <serverX> may mean clocks are out of sync.  Current drift is
10800 seconds.  [orig: 2020-02-04 18:58:58.000780 recv: 2020-02-04 21:58:58.065810]

2020-02-04 21:58:58,066 amqp WARNING Received method (60, 60) during closing channel 1. This method will be ignored
2020-02-04 21:58:58,066 amqp WARNING Received method (60, 60) during closing channel 1. This method will be ignored
2020-02-04 21:58:58,066 amqp WARNING Received method (60, 60) during closing channel 1. This method will be ignored
2020-02-04 21:58:58,067 amqp WARNING Received method (60, 60) during closing channel 1. This method will be ignored
2020-02-04 21:58:58,067 amqp WARNING Received method (60, 60) during closing channel 1. This method will be ignored
2020-02-04 21:58:58,067 amqp WARNING Received method (60, 60) during closing channel 1. This method will be ignored
2020-02-04 21:58:58,067 amqp WARNING Received method (60, 60) during closing channel 1. This method will be ignored
2020-02-04 21:58:58,068 amqp WARNING Received method (60, 31) during closing channel 1. This method will be ignored
Exception in thread clearly-listener:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 801, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 754, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/local/lib/python2.7/dist-packages/clearly/event_core/event_listener.py", line 112, in __run_listener
    self._celery_receiver.capture(limit=None, timeout=None, wakeup=True)
  File "/usr/local/lib/python2.7/dist-packages/celery/events/receiver.py", line 93, in capture
    return list(self.consume(limit=limit, timeout=timeout, wakeup=wakeup))
  File "/usr/local/lib/python2.7/dist-packages/kombu/mixins.py", line 197, in consume
    conn.drain_events(timeout=safety_interval)
  File "/usr/local/lib/python2.7/dist-packages/kombu/connection.py", line 323, in drain_events
    return self.transport.drain_events(self.connection, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/kombu/transport/pyamqp.py", line 103, in drain_events
    return connection.drain_events(**kwargs)
  File "/usr/local/lib/python2.7/dist-packages/amqp/connection.py", line 505, in drain_events
    while not self.blocking_read(timeout):
  File "/usr/local/lib/python2.7/dist-packages/amqp/connection.py", line 511, in blocking_read
    return self.on_inbound_frame(frame)
  File "/usr/local/lib/python2.7/dist-packages/amqp/method_framing.py", line 79, in on_frame
    callback(channel, msg.frame_method, msg.frame_args, msg)
  File "/usr/local/lib/python2.7/dist-packages/amqp/connection.py", line 518, in on_inbound_method
    method_sig, payload, content,
  File "/usr/local/lib/python2.7/dist-packages/amqp/abstract_channel.py", line 145, in dispatch_method
    listener(*args)
  File "/usr/local/lib/python2.7/dist-packages/amqp/channel.py", line 1615, in _on_basic_deliver
    fun(msg)
  File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 624, in _receive_callback
    return on_m(message) if on_m else self.receive(decoded, message)
  File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 590, in receive
    [callback(body, message) for callback in callbacks]
  File "/usr/local/lib/python2.7/dist-packages/celery/events/receiver.py", line 130, in _receive
    [process(*from_message(event)) for event in body]
  File "/usr/local/lib/python2.7/dist-packages/celery/events/receiver.py", line 71, in process
    handler and handler(event)
  File "/usr/local/lib/python2.7/dist-packages/clearly/event_core/event_listener.py", line 119, in _process_event
    data = self._process_task_event(event)
  File "/usr/local/lib/python2.7/dist-packages/clearly/event_core/event_listener.py", line 135, in _process_task_event
    task.result = EventListener.compile_task_result(task)
  File "/usr/local/lib/python2.7/dist-packages/clearly/event_core/event_listener.py", line 157, in compile_task_result
    safe_compile_text(result, raises=True)
  File "/usr/local/lib/python2.7/dist-packages/clearly/safe_compiler.py", line 77, in safe_compile_text
    txt = NON_PRINTABLE_PATTERN.sub(_encode_to_hex, txt)
TypeError: expected string or buffer

Hugs :)

Got this Attribute Error

Hello,

I can't get to start the clearly server on ubuntu 16.04 /python 3.6.8. I got this error:

 from google.protobuf import symbol_database as _symbol_database
  File "/var/www/deddd/lib/python3.6/site-packages/google/protobuf/symbol_database.py", line 193, in <module>
    _DEFAULT = SymbolDatabase(pool=descriptor_pool.Default())
AttributeError: module 'google.protobuf.descriptor_pool' has no attribute 'Default'

I used this command to start the server:

clearly server amqp://localhost

Unable to see tasks with clearly client

Hello - I have been trying out the docker version of clearly with our instance of celery, backed by rabbitmq. I made the recommended change to the worker invocation to enable events. The server starts up with the following output

2020-04-13 14:54:17,376 clearly.event_core.event_listener INFO Creating EventListener: max_tasks=10000; max_workers=100
Celery broker=amqp://production:**@169.254.255.254:5672; backend=None; using_result_backend=False
2020-04-13 14:54:17,378 clearly.event_core.event_listener INFO Starting listener: <Thread(clearly-listener, started daemon 140432189708032)>
2020-04-13 14:54:17,415 clearly.event_core.streaming_dispatcher INFO Creating StreamingDispatcher
2020-04-13 14:54:17,418 clearly.event_core.streaming_dispatcher INFO Starting dispatcher: <Thread(clearly-dispatcher, started daemon 140432180209408)>
2020-04-13 14:54:17,419 clearly.server INFO Creating ClearlyServer
2020-04-13 14:54:17,419 clearly.server INFO Initiating gRPC server: port=12223
2020-04-13 14:54:17,428 clearly.server INFO gRPC server ok

After I start the client, I can query the workers and the output seems legit

In [3]: clearlycli.workers()
celery@10-65-70-179-useast1aprod 6
sw: Linux py-celery 3.1.20
load: [1.34, 1.22, 1.25] processed: 84998
fetched: 1 in 974.91us (1025.73/s)

However, when I try to just clearlycli.capture() it seems to hang and never provides any output. When I try to query tasks already captured by the server, I don't get anything, even though I can check our application logs and see new output and new results of processing:

In [4]: clearlycli.tasks()
fetched: 0 in 431.52us (-)

Also the server output doesn't indicate any connection by the client, which seems strange to me, but maybe that's normal.

I'd be grateful for any advice you have. Clearly looks like it could be helpful.

Best regards,

Steve

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.