Giter Club home page Giter Club logo

puka's Introduction

Puka - the opinionated RabbitMQ client

Puka is yet-another Python client library for RabbitMQ. But as opposed to similar libraries, it does not try to expose a generic AMQP API. Instead, it takes an opinionated view on how the user should interact with RabbitMQ.

Puka is simple

Puka exposes a simple, easy to understand API. Take a look at the publisher example:

import puka

client = puka.Client("amqp://localhost/")

promise = client.connect()
client.wait(promise)

promise = client.queue_declare(queue='test')
client.wait(promise)

promise = client.basic_publish(exchange='', routing_key='test',
                              body='Hello world!')
client.wait(promise)

Puka is asynchronous

Puka is fully asynchronous. Although, as you can see in example above, it can behave synchronously. That's especially useful for simple tasks when you don't want to introduce callbacks.

Here's the same code written in an asynchronous way:

import puka

def on_connection(promise, result):
    client.queue_declare(queue='test', callback=on_queue_declare)

def on_queue_declare(promise, result):
    client.basic_publish(exchange='', routing_key='test',
                         body="Hello world!",
                         callback=on_basic_publish)

def on_basic_publish(promise, result):
    print " [*] Message sent"
    client.loop_break()

client = puka.Client("amqp://localhost/")
client.connect(callback=on_connection)
client.loop()

You can mix synchronous and asynchronous programming styles if you want to.

Puka never blocks

In the pure asynchronous programming style Puka never blocks your program waiting for network. However it is your responsibility to notify when new data is available on the network socket. To allow that Puka allows you to access the raw socket descriptor. With that in hand you can construct your own event loop. Here's an the event loop that may replace wait_for_any from previous example:

fd = client.fileno()
while True:
    client.run_any_callbacks()

    r, w, e = select.select([fd],
                            [fd] if client.needs_write() else [],
                            [fd])
    if r or e:
        client.on_read()
    if w:
        client.on_write()

Puka is fast

Puka is asynchronous and has no trouble in handling many requests at a time. This can be exploited to achieve a degree of parallelism. For example, this snippet creates 1000 queues in parallel:

promises = [client.queue_declare(queue='a%04i' % i) for i in range(1000)]
for promise in promises:
    client.wait(promise)

Puka also has a nicely optimized AMQP codec, but don't expect miracles

  • it can't go faster than Python.

Puka is sensible

Puka does expose only a sensible subset of AMQP, as judged by the author.

The major differences between Puka and normal AMQP libraries include:

  • Puka doesn't expose AMQP channels to the users.
  • Puka treats basic_publish as a synchronous method. You can wait on it and make sure that your data is delivered. Alternatively, you may ignore the promise and treat it as an asynchronous command.
  • Puka tries to cope with the AMQP exceptions and expose them to the user in a predictable way. Unlike other libraries it's possible (and recommended!) to recover from AMQP errors.

Puka is experimental

Puka is a side project, written mostly to prove if it is possible to create a reasonable API on top of the AMQP protocol.

I like it! Show me more!

The best examples to start with are in the rabbitmq-tutorials repo.

More code can be found in the ./examples directory. Some interesting bits:

  • ./examples/send.py: sends one message
  • ./examples/receive_one.py: receives one message
  • ./examples/stress_amqp_consume.py: a script used to benchmark the throughput of the server

There is also a bunch of fairly complicated examples hidden in the tests (see the ./tests directory).

I want to install Puka

Puka works with Python 2.7/3.6+.

You can install Puka system-wide using pip:

sudo pip install puka

Alternatively to install it in the virtualenv local environment:

 virtualenv my_venv
 pip -E my_venv install puka

Or if you need the code from trunk:

sudo pip install -e git+http://github.com/majek/puka.git#egg=puka

I want to run the examples

Great. Make sure you have rabbitmq server installed and follow this steps:

git clone https://github.com/majek/puka.git
cd puka
make
cd examples

Now you're ready to run the examples, start with:

python send.py

I want to see the API documentation

The easiest way to get started is to take a look at the examples and tweak them to your needs. Detailed documentation doesn't exist now. If it existed it would live here:

http://majek.github.com/puka/

puka's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

puka's Issues

Doesn't work

Hi Marek,

Didn't found issue option on your p0f SSL lib. I complied and it ran fine also output too works; however, the output return null for SSL data.

I exported all to the API and to the p0f-client even added SSL flag so it's pretty obvious that the lib is not working at all because Flag was null too.

At this stage, what would you suggest me to do? Is this due to TLS version conflicts?

2012 and 2020 is large time difference, not sure what changed in TLS/SSL. Nevertheless, your lib is great.

parse_amqp_url doesn't handle empty vhost

puka.connection.parse_amqp_url violates RabbitMQ's AMQP URI
Specification (http://www.rabbitmq.com/uri-spec.html)

parsing amqp://localhost/ should result in an empty vhost, but puka
uses '/' as vhost.

There's a comment in parse_puka_connection that reads:

# We do not support empty vhost case. Empty vhost is treated as
# '/'. This is mostly for backwards compatibility, and the fact
# that empty vhost is not very useful.

I think it would be better if puka favoured interoperability with
other implementations over backwards compatibility. (puka is at
version 0.0.6)

In my case I'm trying to configure a clojure program and a python
program with the same .ini based config file. the clojure library
conforms to the above specification and consequently both programs
'disagree' on the connection parameters.

Unacknowledged messages building up

I have a program that runs multiple python processes consuming from the same durable rabbit queue like this:

      _future = rabbit._client.basic_consume(
                queue=self.binding, prefetch_count=1, no_ack=False
                )
        while True:
            try:
                message = rabbit._client.wait(_future, timeout=message_wait_timeout=300)

                if message is not None:
                    rabbit._client.basic_ack(message) 
                    task = json.loads(message['body'])
                    #
                    # Do some work here using the data in task
                    # 

            except Exception as e:
                Log(e)

Where rabbit._client is my rabbit connection to a remote rabbit server, which is filled with 1 million + messages a day. The number of python processes consuming increases and decreases using more/less AWS nodes, depending on the number of messages in the queue, so I have anywhere from 40-400 python processes consuming from the queue concurrently depending on time of day.

My problem is that I'm slowly accumulating unacknowledged messages every day (not sure the exact amount, but it's around 10-100). I can kill every python connection, severing all connections to the rabbit server, but the messages remain unacknowledged and do not jump back into the ready queue.

My theory is that the rabbit server thinks there is still an open channel associated with an old process, but I'm not sure, and I wouldn't even know how to flush them if that was the case. It's my understanding that even though I call basic_ack() immediately after wait(), the ack actually isn't sent until the next wait(), so it's possible that my processes get killed while doing work using the data in the rabbit message (takes between 1-4 seconds) before the next wait(). However, I would think that the unacknowledged messages would become ready again.

Do you have any insights? Hopefully I stated the problem clearly.

'Client' object has no attribute 'x_publish_promise'

I'm randomly seeing this exception in my logs, when calling basic_publish. It seems to only happen once per several thousand publishes, so I'm yet to be able to reproduce it myself.

I have recently upgraded to 0.0.3, and I haven't seen this exception prior to upgrading.

Implement a way for doing "Direct reply-to"

Hello,

In RabbitMQ v3.4.0 they added a method for doing fast RPC called "Direct reply-to"

For users supposedly the server side of the RPC can remain the same (as long as it is publishing to the default exchange), the "magic" happens on the client side. However w/ the current version of Puka v0.0.7 I can not get this to work.

I have tried various versions of the rpc client:

Leaving the rpc_client mostly the same and only doing the wait on the self.consume_promise causes the client to simply hang. If I do wait on the publish first then i get back an exception:

  File "/usr/local/lib/python2.7/dist-packages/puka/connection.py", line 329, in wait
    raise_errors=raise_errors)
  File "/usr/local/lib/python2.7/dist-packages/puka/promise.py", line 35, in run_callback
    return self._promises[number].run_callback(**kwargs)
  File "/usr/local/lib/python2.7/dist-packages/puka/promise.py", line 135, in run_callback
    raise result.exception
puka.spec_exceptions.PreconditionFailed: {'class_id': 60, 'method_id': 40, 'reply_code': 406, 'reply_text': 'PRECONDITION_FAILED - fast reply consumer does not exist'}

I suspect it is because under the hood puka uses seperate channels for publishing and consuming? Perhaps this feature of RabbitMQ requires the same channel to be used when doing this type of RPC? If that is the case I wonder if there is any easy way to extend or modify Puka to support it?

which python versions are supported?

The test suite works with python 2.6, python 2.7 and pypy.

python 2.5 does not work (no collections.namedtuple) and simplebuffer contains a workaround for python 2.4!

basic_qos how to

It's a little unclear how basic_qos should be used. I currently have the following code:

def connect_and_receive(sub_to_process_queue, amqp_user, amqp_password, amqp_host, amqp_port, amqp_exchange_sub, amqp_queue_sub):
    # declare send and receive clients, both connecting to the same server on local machine
    subscribe_dsn = "amqp://{}:{}@{}:{}/".format(
                amqp_user, amqp_password, amqp_host, amqp_port
            )
    print("Subscribing to server:", subscribe_dsn)
    consumer = puka.Client(subscribe_dsn)

    # connect receiving party
    receive_promise = consumer.connect()
    consumer.wait(receive_promise)

    # Define receiving exchange and queue
    receive_promise = consumer.exchange_declare(exchange=amqp_exchange_sub, type='fanout', durable=True)
    consumer.wait(receive_promise)
    receive_promise = consumer.queue_declare(queue=amqp_queue_sub, durable=True, exclusive=False, auto_delete=False)
    consumer.wait(receive_promise)
    receive_promise = consumer.queue_bind(exchange=amqp_exchange_sub, queue=amqp_queue_sub)
    consumer.wait(receive_promise)

    # start waiting for messages
    receive_promise = consumer.basic_consume(queue=amqp_queue_sub, no_ack=False)
    consumer.basic_qos(receive_promise, prefetch_count=10)

    while True:
        message = consumer.wait(receive_promise)
        # print("Resc", end=" ")
        handle_message(message, sub_to_process_queue)
        consumer.basic_ack(message)

But when I run it I get an Exception:

transform_1             | Exception in thread Thread-1:
transform_1             | Traceback (most recent call last):
transform_1             |   File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
transform_1             |     self.run()
transform_1             |   File "/usr/local/lib/python3.8/threading.py", line 870, in run
transform_1             |     self._target(*self._args, **self._kwargs)
transform_1             |   File "/app/subscribe.py", line 29, in connect_and_receive
transform_1             |     message = consumer.wait(receive_promise)
transform_1             |   File "/app/src/puka/puka/connection.py", line 377, in wait
transform_1             |     self.on_read()
transform_1             |   File "/app/src/puka/puka/connection.py", line 191, in on_read_nohandshake
transform_1             |     self._handle_read(data, offset)
transform_1             |   File "/app/src/puka/puka/connection.py", line 224, in _handle_frame_read
transform_1             |     self.channels.channels[channel].inbound_method(frame)
transform_1             |   File "/app/src/puka/puka/channel.py", line 80, in inbound_method
transform_1             |     self._handle_inbound(frame)
transform_1             |   File "/app/src/puka/puka/channel.py", line 106, in _handle_inbound
transform_1             |     self.promise.recv_method(result)
transform_1             |   File "/app/src/puka/puka/promise.py", line 87, in recv_method
transform_1             |     callback = self.methods[result.method_id]
transform_1             | KeyError: 3932171
transform_1             | 
transform_1             | Thread stopped

How exactly should I be using the basic_qos function?

Consume Async...

There is some kind of async basic_consume?
I'm trying to use puka with Tornado.

Thanks.

puka should ignore WSAEWOULDBLOCK on windows

I just got the following error on windows:

  File "c:\users\ralf\home\fscrawler\_fscrawler\thread_helper.py", line 13, in run
    res = func(*args, **kwargs)
  File "c:\users\ralf\home\fscrawler\_fscrawler\connector.py", line 93, in

    call_methods(client, cmd_lst)
  File "c:\users\ralf\home\fscrawler\_fscrawler\connector.py", line 51, in

    client.wait_for_all(promises)
  File "c:\users\ralf\home\.buildout\cache-eggs\puka-0.0.6.2-py2.7.egg\puka\connection.py", line 270, in wait_for_all
    self.wait(promise, raise_errors=raise_errors)
  File "c:\users\ralf\home\.buildout\cache-eggs\puka-0.0.6.2-py2.7.egg\puka\connection.py", line 233, in wait
    self.on_write()
  File "c:\users\ralf\home\.buildout\cache-eggs\puka-0.0.6.2-py2.7.egg\puka\connection.py", line 193, in on_write
    r = self.sd.send(self.send_buf.read(128*1024))
socket.error: [Errno 10035] A non-blocking socket operation could not be completed immediately

10035 is WSAEWOULDBLOCK

http://msdn.microsoft.com/en-us/library/windows/desktop/ms740668%28v=vs.85%29.aspx
has the following to say on that:

Resource temporarily unavailable.

This error is returned from operations on nonblocking sockets that
cannot be completed immediately, for example recv when no data is
queued to be read from the socket. It is a nonfatal error, and the
operation should be retried later. It is normal for WSAEWOULDBLOCK to
be reported as the result from calling connect on a nonblocking
SOCK_STREAM socket, since some time must elapse for the connection to
be established.

Consider blocking `connect`

For a better usability we might consider doing two changes:

  • don't allow getting a socket descriptor before calling connect
  • make connect block

This would allow connect to iterate over resolved host names to check which one actually has rmq running.

Function table.encode produces NameError if value cannot be encoded.

Ran into this by mistake. The variable "key" is not defined anywhere, so it produces a NameError while trying to print the assert message. I guess it's mean to be "pieces" instead.

From lines 176:177 of puka/table.py

        assert False, "Unsupported field kind during encoding %r/%r" % (
            key, value)

Thanks for such a good implementation of an AMQP client!

Requeue option in basic_reject

It seems that basic_reject does not implement the requeue bit argument as specified by the AMQP protocol.

To be more precise, it is specified in the puka/spec.py file, but it is not expected as argument in the puka/machine.py, where it has the hard-coded value True.

Also, in order to test, I tried to change this value to False, however I get some KeyError exception in puka/machine.py, line 299, in basic_reject:
t = conn.promises.by_number(msg_result['promise_number'])

AttributeError when heartbeat is enabled

Hello all,

When I have heartbeat enabled in my client settings, I occasionally see AttributeError: 'NoneType' object has no attribute 'write' in my error logs.

Here's the stack trace:

Traceback (most recent call last):

 ... My Code ...

  File "/home/vagrant/.virtualenvs/thm/local/lib/python2.7/site-packages/puka/client.py", line 19, in wrapper
    p = method(*args, **kwargs)
  File "/home/vagrant/.virtualenvs/thm/local/lib/python2.7/site-packages/puka/machine.py", line 216, in queue_declare
    t = conn.promises.new(_queue_declare)
  File "/home/vagrant/.virtualenvs/thm/local/lib/python2.7/site-packages/puka/promise.py", line 21, in new
    promise = Promise(self.conn, number, on_channel, **kwargs)
  File "/home/vagrant/.virtualenvs/thm/local/lib/python2.7/site-packages/puka/promise.py", line 62, in __init__
    self.conn.channels.allocate(self, self._on_channel)
  File "/home/vagrant/.virtualenvs/thm/local/lib/python2.7/site-packages/puka/channel.py", line 45, in allocate
    machine.channel_open(promise, on_channel)
  File "/home/vagrant/.virtualenvs/thm/local/lib/python2.7/site-packages/puka/machine.py", line 207, in channel_open
    t.send_frames( spec.encode_channel_open('') )
  File "/home/vagrant/.virtualenvs/thm/local/lib/python2.7/site-packages/puka/promise.py", line 99, in send_frames
    self.conn._send_frames(self.channel.number, frames)
  File "/home/vagrant/.virtualenvs/thm/local/lib/python2.7/site-packages/puka/connection.py", line 185, in _send_frames
    for frame_type, payload in frames]) )
  File "/home/vagrant/.virtualenvs/thm/local/lib/python2.7/site-packages/puka/connection.py", line 177, in _send
    self.send_buf.write(data)
AttributeError: 'NoneType' object has no attribute 'write'

Looks to me like the way that heartbeats work in puka don't work very well with blocking architectures. A simple fix is to check if self.send_buf == None before writing, but I'm not sure what else that could break.

I've disabled heartbeats for now

Release 0.0.7

It's been almost 2 years and 40+ commits since the last release, there are no bugs reported, there is a single stale pull request waiting.. it's time to release the next version - I'd rather update my dependencies from pypi, not git and/or custom pip repo.

'NoneType' object has no attribute 'send'

I use puka with django and the following
client.wait(client.close())
sometimes causing exception 'NoneType' object has no attribute 'send' (not often though)

Here's the trace:

File "/home/ron/recondesk/recon/views.py" in recon_update
  1016.     client.wait(client.close())
File "/home/ron/virtualenvs/recondesk/local/lib/python2.7/site-packages/puka/connection.py" in wait
  218.                 self.on_write()
File "/home/ron/virtualenvs/recondesk/local/lib/python2.7/site-packages/puka/connection.py" in on_write
  151.             r = self.sd.send(self.send_buf.read(128*1024))

unify struct.pack params

~/puka/puka$ grep "pack(" *py|sed "s|.*pack(\(.*\)\$|\1|g"|cut -d "," -f 1|sort| uniq -c
     32 '!B'
      1 'B'
      1 '!BBBB'
      1 '!BHI'
      2 '>c'
      1 '>cB'
      2 '>cBi'
      1 '>cd'
      1 '>ci'
      2 '>cI'
      1 '>cq'
      1 '>cQ'
      2 '!HH'
      1 '!HHQH'
      1 '>I'
      4 '!I'
      8 '!IB'
     14 '!IHB'
      1 '!IHIH'
      1 '!II'
      1 '!IIHB'
      3 '!IQB'
      1 '!Q'
~/puka/puka$ grep "unpack_from(" *py|sed "s|.*unpack_from(\(.*\)\$|\1|g"|cut -d "," -f 1|sort| uniq -c
      2 '>B'
     26 '!B'
      2 'B'
      1 '!BB'
      1 '!BHI'
      1 '>d'
      1 '>f'
      1 '>h'
      1 '!H'
      3 '!HB'
      2 '!HH'
      1 '!HIH'
      1 '!HxxQ'
      2 '>i'
      3 '>I'
      8 '!I'
      1 '!II'
      1 '>q'
      1 '>Q'
      1 '!Q'
      1 '!QB'
      2 '!QBB'

We could kill '!' for 'B', not use '>', and maybe use ord instead of B. Or even proper pack constructructors for more complex things.

Unexpected: synchronous publishing without pub confirms is slower than with confirms

Running both RabbitMQ 3.5.1 and test app on my recent model development MacBookPro (OS X Yosemite), I encountered unexpected, reproducible performance conundrum: with pubacks=False, synchronous publishing is consistently slower than with pubacks=True. This is counterintuitive and differs from all other AMQP clients that I profiled.

I used the apps from https://github.com/vitaly-krugl/amqp-perf to profile basic_publish performance. Below are a few examples of what I am talking about.

puka with pubacks=False

I verified that the channel IS NOT in confirm mode

time python puka_perf.py publish --impl=Client --exg="" --size 1024 --msgs=100000

real    0m22.728s
user    0m12.593s
sys 0m2.129s

puka with pubacks=True

I verified that the channel IS in confirm mode

time python puka_perf.py publish --impl=Client --exg="" --size 1024 --msgs=100000 --delivery-confirmation

real    0m17.368s
user    0m9.054s
sys 0m2.112s

Here, you can see that both real and user execution times are smaller for pubacks=True. This is unexpected, because with pubacks=True the broker needs to check whether the exchange exists before it can Ack, whereas with pubacks=False, the broker can Ack right away.

Below are a couple of comparable tests and corresponding execution times for haigha:

haigha with pubacks=False

time python haigha_perf.py publish --impl=SocketTransport --exg="" --size 1024 --msgs=100000

real    0m10.010s
user    0m7.772s
sys 0m2.231s

haigha with pubacks=True

time python haigha_perf.py publish --impl=SocketTransport --exg="" --size 1024 --msgs=100000 --delivery-confirmation

real    0m21.344s
user    0m13.371s
sys 0m3.531s

basic_ack Is Not Properly Flushed

calling basic_ack on a client does not send to the server until either client.close() or client.needs_write() is called. Example:

Broken Example:

import puka

client = puka.Client('amqp://...')
promise = client.connect()
client.wait(promise)

promise = client.exchange_declare('e')
client.wait(promise)
promise = client.queue_declare('q')
client.wait(promise)
promise = client.queue_bind('q', 'e', 'r')
client.wait(promise)

promise = client.basic_publish(exchange='e', routing_key='r', body='asdf')
client.wait(promise)

consume_promise = client.basic_consume('q')
message = client.wait(consume_promise)

print message

client.basic_ack(message)\
time.sleep(60)

#sleep so that one has time to go look at server. You will see that 
#there remains one unack'ed message for the whole 60 seconds.
#No proper close...
#The message now returns to the queue

Output:

{'body': 'asdf', 'exchange': 'e', 'consumer_tag': 'amq.ctag-gVQNG9ru98B8KP1HW9FSQA==', 'promise_number': 7, 'routing_key': 'r', 'headers': {'x-puka-delivery-tag': 1}, 'redelivered': False, 'delivery_tag': 1}

Expected result: The message should have been acknowledged and removed from the queue.

Working Example:

import puka

client = puka.Client('amqp://...')
promise = client.connect()
client.wait(promise)

promise = client.exchange_declare('e')
client.wait(promise)
promise = client.queue_declare('q')
client.wait(promise)
promise = client.queue_bind('q', 'e', 'r')
client.wait(promise)

promise = client.basic_publish(exchange='e', routing_key='r', body='asdf')
client.wait(promise)

consume_promise = client.basic_consume('q')
message = client.wait(consume_promise)

print message

client.basic_ack(message)

#no need to sleep or even properly close the connection. Just call needs_write()
print client.needs_write()

Output:

{'body': 'asdf', 'exchange': 'e', 'consumer_tag': 'amq.ctag-gVQNG9ru98B8KP1HW9FSQA==', 'promise_number': 7, 'routing_key': 'r', 'headers': {'x-puka-delivery-tag': 1}, 'redelivered': False, 'delivery_tag': 1}
False

Proposed fix: perhaps puka is buffering the commands for a reason, but in that case there should be an explicit flush() for long lived connections that may otherwise go idle for long periods of time (or disconnect unexpectedly!)

None return value of "wait" method on timeout

To illustrate the issue quickly, here's a snippet of code:

...
consume_promise = client.basic_consume('queue', prefetch_count=1)
message = client.wait(consume_promise, timeout=1.0)
...

The content of "message" would be None if there is nothing on the queue and the wait times out after 1 second. In effect this could be used to probe if the queue has been empty for the duration of the timeout.

The issue occurs when the "valid" messages passed around include None itself. In that case, I don't believe we have a robust way of distinguishing between a valid None message fetched from the queue and a None object returned due to timeout.

Currently, I do timing before and after the wait, and decide a None is due to empty queue if that timing is longer than the timeout. Wouldn't there be a better solution to indicate empty queue (like raising some exception, which Queue does for example).

I realize it probably is a bad design to pass around None as a message to begin with, but legacy code often cannot be touched effectively.

intermittent error in TestBasic.test_close

I see intermittent errors in test_basic. I've seen this in test_close and test_basic_qos, when the code is waiting on the client.close() promise:

=============================================================== FAILURES ================================================================
_________________________________________________________ TestBasic.test_close __________________________________________________________

self = <test_basic.TestBasic testMethod=test_close>

    def test_close(self):
    client = puka.Client(self.amqp_url)
    promise = client.connect()
    client.wait(promise)

    promise = client.queue_declare(queue=self.name)
    client.wait(promise)

    promise = client.basic_publish(exchange='', routing_key=self.name,
                       body=self.msg)
    client.wait(promise)

    consume_promise = client.basic_consume(queue=self.name)
    msg_result = client.wait(consume_promise)

    promise = client.queue_delete(self.name)
    client.wait(promise)

    promise = client.close()
>       client.wait(promise)

client     = <puka.client.Client object at 0x2eb0fd0>
consume_promise = 5
msg_result = {'body': '0.479241924429', 'exchange': '', 'consumer_tag': 'amq.ctag-wjqFqhJU4...'headers': {'x-puka-delivery-tag': 1}, 'redelivered': False, 'delivery_tag': 1}
promise    = 1
self       = <test_basic.TestBasic testMethod=test_close>

tests/test_basic.py:428:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <puka.client.Client object at 0x2eb0fd0>, promise_numbers = set([1]), timeout = None, raise_errors = True

    def wait(self, promise_numbers, timeout=None, raise_errors=True):
    '''
        Wait for selected promises. Exit after promise runs a callback.
        '''
    if timeout is not None:
        t1 = time.time() + timeout
    else:
        td = None

    if isinstance(promise_numbers, int):
        promise_numbers = [promise_numbers]
    promise_numbers = set(promise_numbers)

    # Try flushing the write buffer before entering the loop, we
    # may as well return soon, and the user has no way to figure
    # out if the write buffer was flushed or not - (ie: did the
    # wait run select() or not)
    #
    # This is problem is especially painful with regard to
    # async messages, like basic_ack. See #3.
    #
    # Additionally, during the first round trip on windows - when
    # the connection is being established, the socket may not yet
    # be in the connected state - swallow an error in that case.
    try:
        self.on_write()
    except socket.error, e:
        if e.errno != errno.ENOTCONN:
        raise

    while True:
        while True:
        ready = promise_numbers & self.promises.ready
        if not ready:
            break
        promise_number = ready.pop()
        return self.promises.run_callback(promise_number,
                          raise_errors=raise_errors)

        if timeout is not None:
        t0 = time.time()
        td = t1 - t0
        if td < 0:
            break

        r, w, e = select.select([self],
                    [self] if self.needs_write() else [],
                    [self],
                    td)
        if r or e:
>               self.on_read()

e          = []
promise_numbers = set([1])
r          = [<puka.client.Client object at 0x2eb0fd0>]
raise_errors = True
ready      = set([])
self       = <puka.client.Client object at 0x2eb0fd0>
td         = None
timeout    = None
w          = []

puka/connection.py:256:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <puka.client.Client object at 0x2eb0fd0>

    def on_read(self):
    try:
        r = self.sd.recv(131072)
    except socket.error, e:
        if e.errno == errno.EAGAIN:
        return
        else:
        raise

    if len(r) == 0:
        # a = self.sd.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
        self._shutdown(exceptions.mark_frame(spec.Frame(),
                         exceptions.ConnectionBroken()))

    self.recv_buf.write(r)

    if len(self.recv_buf) >= self.recv_need:
        data = self.recv_buf.read()
        offset = 0
        while len(data) - offset >= self.recv_need:
        offset, self.recv_need = \
>                   self._handle_read(data, offset)

data       = '\x01\x00\x00\x00\x00\x00\x04\x00\n\x003\xce\x01\x00\x02\x00\x00\x00$\x00<\x00\x1f\x1famq.ctag-wjqFqhJU4E8LXaN5LIFpOg\xce'
offset     = 12
r          = '\x01\x00\x00\x00\x00\x00\x04\x00\n\x003\xce\x01\x00\x02\x00\x00\x00$\x00<\x00\x1f\x1famq.ctag-wjqFqhJU4E8LXaN5LIFpOg\xce'
self       = <puka.client.Client object at 0x2eb0fd0>

puka/connection.py:112:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <puka.client.Client object at 0x2eb0fd0>
data = '\x01\x00\x00\x00\x00\x00\x04\x00\n\x003\xce\x01\x00\x02\x00\x00\x00$\x00<\x00\x1f\x1famq.ctag-wjqFqhJU4E8LXaN5LIFpOg\xce'
start_offset = 12

    def _handle_frame_read(self, data, start_offset):
    offset = start_offset
    if len(data)-start_offset < 8:
        return start_offset, 8
    frame_type, channel, payload_size = \
        struct.unpack_from('!BHI', data, offset)
    offset += 7
    if len(data)-start_offset < 8+payload_size:
        return start_offset, 8+payload_size
    assert data[offset+payload_size] == '\xCE'

    if frame_type == 0x01: # Method frame
        method_id, = struct.unpack_from('!I', data, offset)
        offset += 4
        frame, offset = spec.METHODS[method_id](data, offset)
>           self.channels.channels[channel].inbound_method(frame)

channel    = 2
data       = '\x01\x00\x00\x00\x00\x00\x04\x00\n\x003\xce\x01\x00\x02\x00\x00\x00$\x00<\x00\x1f\x1famq.ctag-wjqFqhJU4E8LXaN5LIFpOg\xce'
frame      = {'consumer_tag': 'amq.ctag-wjqFqhJU4E8LXaN5LIFpOg'}
frame_type = 1
method_id  = 3932191
offset     = 55
payload_size = 36
self       = <puka.client.Client object at 0x2eb0fd0>
start_offset = 12

puka/connection.py:141:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <puka.channel.Channel object at 0x2eb0490>, frame = {'consumer_tag': 'amq.ctag-wjqFqhJU4E8LXaN5LIFpOg'}

    def inbound_method(self, frame):
    if frame.has_content:
        self.method_frame = frame
    else:
>           self._handle_inbound(frame)

frame      = {'consumer_tag': 'amq.ctag-wjqFqhJU4E8LXaN5LIFpOg'}
self       = <puka.channel.Channel object at 0x2eb0490>

puka/channel.py:76:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <puka.channel.Channel object at 0x2eb0490>, result = {'consumer_tag': 'amq.ctag-wjqFqhJU4E8LXaN5LIFpOg'}

    def _handle_inbound(self, result):
>       self.promise.recv_method(result)

result     = {'consumer_tag': 'amq.ctag-wjqFqhJU4E8LXaN5LIFpOg'}
self       = <puka.channel.Channel object at 0x2eb0490>

puka/channel.py:102:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <puka.promise.Promise object at 0x2eb0810>, result = {'consumer_tag': 'amq.ctag-wjqFqhJU4E8LXaN5LIFpOg'}

    def recv_method(self, result):
    # log.debug('#%i recv_method %r', self.number, result)
    # In this order, to allow callback to re-register to the same method.
>       callback = self.methods[result.method_id]
E       KeyError: 3932191

result     = {'consumer_tag': 'amq.ctag-wjqFqhJU4E8LXaN5LIFpOg'}
self       = <puka.promise.Promise object at 0x2eb0810>

puka/promise.py:87: KeyError
================================================== 1 failed, 60 passed in 6.06 seconds ==================================================

Basic publish pubacks

The logic appears to be inverted in that an ack packet is constructed even though pubacks are not defined.

in machine.py basic_publish() an ack packet should only be constructed when conn.x_pubacks is defined True.

Note that it is now not possible to wait for a promise from a basic_publish().

Puka with green threads

In our project we use both puka and green threads via eventlet library. I.e. puka is imported using eventlet.patcher.import_patched('puka') instead of "import puka" and that gives a great value as we can now write simple synchronous code with client.wait(...) and still have puka not blocking our single thread.

The only non-obvious issue that I saw using puka in such way it that you cannot ack mesages while another green thread is waiting for another message to be received.

Suppose we have 2 green threads.
Green-thread 1:
basic_consume_promise = self.client.basic_consume(
queue='test', prefetch_count=1)
while True:
msg = self.client.wait(basic_consume_promise)
eventlet.spawn(self.message_received, msg)

Green-thread 2:
def message_received(self, msg):
eventlet.sleep(1)
self.client.basic_ack(msg)

The problem with the code above is that the order of execution is
p = basic_consume()
msg = wait(p)
msg = wait(p)
basic_ack(msg)

and because basic_ack get called after wait() is waiting for another message data writen by basic_ack to send buffer remains unsent.

The fix is to always wait for writes in select() event if send buffer is empty at the moment. I.e.

def wait(self, promise_numbers, timeout=None, raise_errors=True):
....
r, w, e = select.select((self,),
(self,), #!!! if self.needs_write() else (),
(self,),
0)
...
while True:
...
r, w, e = select.select([self],
[self],#!!! if self.needs_write() else [],
[self],
td)
AFAIU this wouldn't harm normal single thread use cases in any way but would make puka useful for scenarios involving green-threads

AttributeError on basic_cancel for the last message in queue

I'm using the examples on synchronous send/receive in Basic Examples on this page: http://majek.github.io/puka/puka.html.

Call this receive.py:

import puka

client = puka.Client("amqp://localhost/")
promise = client.connect()
client.wait(promise)

promise = client.queue_declare(queue='test')
client.wait(promise)

consume_promise = client.basic_consume(queue='test')
for i in range(3):
    result = client.wait(consume_promise)
    print " [x] Received message %r" % (result,)

    client.basic_ack(result)

client.basic_cancel(consume_promise)
client.wait(consume_promise)

promise = client.close()
client.wait(promise)

and synchronous send (send.py):

import puka

client = puka.Client("amqp://localhost/")
promise = client.connect()
client.wait(promise)

promise = client.queue_declare(queue='test')
client.wait(promise)

promise = client.basic_publish(exchange='', routing_key='test',
                              body="Hello world!")
client.wait(promise)

promise = client.close()
client.wait(promise)

The synchronous receive appears to always fail with an AttributeError on waiting for basic_cancel in receive.py, when it does basic_ack on the very last message in the queue and then does basic_cancel on the same consume_promise:

$ python receive.py 
 [x] Received message {'body': 'Hello world!', 'exchange': '', 'consumer_tag': 'amq.ctag-V2fN9ND13qH4imH_uRQKRA', 'promise_number': 4, 'routing_key': 'test', 'headers': {'x-puka-delivery-tag': 1}, 'redelivered': False, 'delivery_tag': 1}
 [x] Received message {'body': 'Hello world!', 'exchange': '', 'consumer_tag': 'amq.ctag-V2fN9ND13qH4imH_uRQKRA', 'promise_number': 4, 'routing_key': 'test', 'headers': {'x-puka-delivery-tag': 1}, 'redelivered': False, 'delivery_tag': 2}
 [x] Received message {'body': 'Hello world!', 'exchange': '', 'consumer_tag': 'amq.ctag-V2fN9ND13qH4imH_uRQKRA', 'promise_number': 4, 'routing_key': 'test', 'headers': {'x-puka-delivery-tag': 1}, 'redelivered': False, 'delivery_tag': 3}
Traceback (most recent call last):
  File "receive.py", line 27, in <module>
    client.wait(consume_promise)
  File "/usr/local/lib/python2.7/dist-packages/puka/connection.py", line 222, in wait
    raise_errors=raise_errors)
  File "/usr/local/lib/python2.7/dist-packages/puka/promise.py", line 35, in run_callback
    return self._promises[number].run_callback(**kwargs)
  File "/usr/local/lib/python2.7/dist-packages/puka/promise.py", line 134, in run_callback
    if raise_errors and result.is_error:
AttributeError: 'NoneType' object has no attribute 'is_error'

This only happens on the very last message in the queue.

Is this an intended behavior? If it is, how should I cancel the consume more gracefully at the very end?

Thanks.

binding_key vs. routing_key in queue_bind

There is a missmatch between code and documentation of queue_bind/queue_unbind. The code uses the parameter "binding_key" where "routing_key" is used in the documentation. If "routing_key" is more correct, the following "patch" will solve the issue:

--- machine.py.orig     2011-01-18 09:03:40.000000000 +0100
+++ machine.py  2011-01-18 09:04:29.000000000 +0100
@@ -407,17 +407,17 @@
     t.x_frames = spec.encode_queue_purge(queue)
     return t

-def queue_bind(conn, queue, exchange, binding_key='', arguments={}):
+def queue_bind(conn, queue, exchange, routing_key='', arguments={}):
     t = conn.promises.new(_generic_callback)
     t.x_method = spec.METHOD_QUEUE_BIND_OK
-    t.x_frames = spec.encode_queue_bind(queue, exchange, binding_key,
+    t.x_frames = spec.encode_queue_bind(queue, exchange, routing_key,
                                         arguments)
     return t

-def queue_unbind(conn, queue, exchange, binding_key='', arguments={}):
+def queue_unbind(conn, queue, exchange, routing_key='', arguments={}):
     t = conn.promises.new(_generic_callback)
     t.x_method = spec.METHOD_QUEUE_UNBIND_OK
-    t.x_frames = spec.encode_queue_unbind(queue, exchange, binding_key,
+    t.x_frames = spec.encode_queue_unbind(queue, exchange, routing_key,
                                           arguments)
     return

Publishing to a 2nd channel - does not work.

Related to 'serious memory leak - if using one channel to consume & publish'.

An application reading from one queue, filtering the messages, and publishing to a second queue - works if a single channel is used, but the publish fails silently if a 2nd channel is used for the publishing. My understanding is AMQP channels should be uni-directional & a single channel should not used for both consumming and publishing. Thus the app opens two channels. Channel 1 to consume from queue A; and channel 2 to publish to queue B.

[Is my understanding correct?] If so why is consuming & publishing on the same channel not prevented?

Messages not written to buffer

I met a strange behaviour when using puka libray. It seems that messages are not written to simplebuffer when the exchange has not been declared before publishing messages. The following pseudo-code should show this:

import puka
client = puka.Client(host, pubacks=True)
promise = client.connect()
client.wait(promise)

promises = []
for i in range(0, 42):
promise = client.basic_publish("test_exchange", "test_key", "test_body")
print client.send_buf # prints <SimpleBuffer of 0 bytes, ...
promises.append(promise)

client.wait(promises)

When adding client.exchange_declare('test_exchange', type='topic', durable=True) before publishing then everything works as expected!

Is this a crazy bug, or it is somehow enforced to declare an exchange each time (of course for durable exchanges) ?

Serious memory leak - if same channel used to consume & publish

An application reading from one queue, filtering the messages, and publishing to a second queue - is experiencing an approx 1K memory leak per message published. Functionally the app works, but upon deploy to production (with a typical 800 msg/sec load) the memory leak was discovered when the Ubuntu (12.04) server quickly exhausted all virtual memory,

Subsequently I've learned separate channels should not be used. [Is this correct?]
If so why is it not prevented?

Changing the app to open two connections to the rabbitmq-server (channel 1 - to consume from queue A; and channel 2 - to publish to queue B) reveals another issue - "publishing to a 2nd channel does not work'.

memory leak - RPC pattern

Hello,

An application I have written which uses Puka in the RPC pattern, and is experiencing a memory leak when many messages are passed through it.

I believe this is probably the same as Serious memory leak - if same channel used to consume & publish.

I was able to reproduce this problem With Puka 0.0.7 using a slightly modified version of the example code: rpc_client.py and rpc_server.py.

Here is the modified server:

#!/usr/bin/env python
import puka

client = puka.Client("amqp://admin:admin@localhost/")
promise = client.connect()
client.wait(promise)

promise = client.queue_declare(queue='rpc_queue')
client.wait(promise)

print " [x] Awaiting RPC requests"
consume_promise = client.basic_consume(queue='rpc_queue', prefetch_count=1)
while True:
    msg_result = client.wait(consume_promise)
    n = int(msg_result['body'])

    print " [.] fib(%s)" % (n,)
    response = '99'  # Don't actually calculate anything, we want speed!

    # This publish doesn't need to be synchronous.
    client.basic_publish(exchange='',
                         routing_key=msg_result['headers']['reply_to'],
                         headers={'correlation_id':
                                  msg_result['headers']['correlation_id']},
                         body=str(response))
    client.basic_ack(msg_result)

Here is the modified client:

#!/usr/bin/env python
import puka
import uuid


class FibonacciRpcClient(object):
    def __init__(self):
        self.client = client = puka.Client("amqp://admin:admin@localhost/")
        promise = client.connect()
        client.wait(promise)

        promise = client.queue_declare(exclusive=True)
        self.callback_queue = client.wait(promise)['queue']

        self.consume_promise = client.basic_consume(queue=self.callback_queue,
                                                    no_ack=True)

    def call(self, n):
        correlation_id = str(uuid.uuid4())
        # We don't need to wait on promise from publish, let it happen async.
        self.client.basic_publish(exchange='',
                                  routing_key='rpc_queue',
                                  headers={'reply_to': self.callback_queue,
                                           'correlation_id': correlation_id},
                                  body=str(n))
        while True:
            msg_result = self.client.wait(self.consume_promise)
            if msg_result['headers']['correlation_id'] == correlation_id:
                return int(msg_result['body'])


fibonacci_rpc = FibonacciRpcClient()

while True:
    print " [x] Requesting fib(30)"
    response = fibonacci_rpc.call(30)
    print " [.] Got %r" % (response,)

Not sure if it matters but my RabbitMQ version is: RabbitMQ 3.3.5, Erlang R16B03

The memory usage for both the client and server will continue to increase the longer the two programs are run.

Thanks

Catch connection errors

This looks quite bad:

Traceback (most recent call last):
  File "./stress_amqp.py", line 104, in <module>
    main()
  File "./stress_amqp.py", line 96, in main
    client.loop(timeout=1.0)
  File "../puka/connection.py", line 247, in loop
    self.on_read()
  File "../puka/connection.py", line 67, in on_read
    r = self.sd.recv(131072)
socket.error: [Errno 104] Connection reset by peer

client.connect fails with "getsockaddrarg: bad family"

The following code

import puka

client = puka.Client("amqp://localhost/")
client.wait(client.connect())

fails on my system with

Traceback (most recent call last):
  File "s.py", line 4, in 
    client.wait(client.connect())
  File "/home/ralf/puka/puka/client.py", line 19, in wrapper
    p = method(*args, **kwargs)
  File "/home/ralf/puka/puka/client.py", line 48, in connect
    return self._connect()
  File "/home/ralf/puka/puka/connection.py", line 81, in _connect
    self.sd.connect(sockaddr)
  File "/usr/lib64/python2.7/socket.py", line 224, in meth
    return getattr(self._sock,name)(*args)
socket.error: getsockaddrarg: bad family

The problem is I have IPv6 disabled in my python installation, but
puka decides to use IPv6 addresses if it can resolve them.

puka shouldn't ask for IPv6 addresses if it can't use them later. You
can use socket.has_ipv6 to check if python supports IPv6.

Puka with qpid.

I have started to test puka with the qpid amqp client, and found a very basic problem:

Puka extracts the rabbitmq specific version string from the AMQP connect reply message, and this of course fails
with qpid.

The code that does this is in the file machine.py in the function _connection_start()

t.conn.x_server_props = result['server_properties']
try:
    t.conn.x_server_version = \
        map(int, t.conn.x_server_props['version'].split('.'))
except ValueError:
    t.conn.x_server_version = (Ellipsis,)

I think that the version should be the AMQP version not the rabbitMq version, in which case the values version_major and version_minor should be used and not the server_properties version value.

I have fixed this problem locally, and the next problem I run into is that the socket is closed after each transfer...

I anyone else working to make puka cross broker??

PRECONDITION_FAILED - unknown delivery tag

I'm trying to write a threaded rpc communication with puka and rabbitmq.
But i'm keeping getting:

puka.spec_exceptions.PreconditionFailed: {'class_id': 60, 'method_id': 80, 'reply_code': 406, 'reply_text': 'PRECONDITION_FAILED - unknown delivery tag X'}, where X can be any msg number.

The error is always on the RPCServer, following my code:

            #!/usr/bin/env python
            from threading import Thread
            import puka
            import uuid
            import logging

            class RPCServer(object):
                def __init__(self):
                    self.client = None
                    self.consume = None
                    try:
                        self.client = puka.Client("amqp://localhost/")
                        promise = self.client.connect(callback=self.on_connected)
                        self.client.wait(promise,timeout=2)#Waits 2s
                    except  Exception as e:
                        print "Error connecting to server - ",e

                def close(self):
                    self.client.wait(self.client.close())

                def on_connected(self,promise,result):
                    try:
                        self.client.wait(self.client.queue_declare(queue='rpc_queue_test'))
                        self.consume = self.client.basic_consume(queue='rpc_queue_test',
                                                                 callback=self.on_request)
                    except Exception as e:
                        print "Error declaring queue - ",e

                def on_request(self,promise,request):
                    th=Thread(target=self.test, args = [request])
                    th.start()

                def test(self,request):
                    pr = self.client.basic_publish(exchange='',routing_key=request['headers']['reply_to'],
                                              headers={'correlation_id':request['headers']['correlation_id']},
                                              body=str(2))
                    self.client.wait(pr)
                    self.client.basic_ack(request)

                def run(self):
                    while True:
                        request = self.client.wait(self.consume)
                        print "Request received (%s)"%(request)

            print " [x] Awaiting RPC requests"
            rpc_server = RPCServer()

            th1 = Thread(target=rpc_server.run())
            th1.start()

Is there anyway to get this server done in a thread for every request ??

Thanks in advance.

Crash with an unhelpful error

When I try to run the recieve.py example I get an unhelpful error:

Traceback (most recent call last):
  File "ex_puka.py", line 6, in <module>
    client.wait(promise)
  File ".../puka/puka/connection.py", line 233, in wait
    self.on_write()
  File ".../puka/puka/connection.py", line 193, in on_write
    r = self.sd.send(self.send_buf.read(128*1024))
socket.error: [Errno 32] Broken pipe

This is python 2.7, installed with brew on an OSX Mountain Lion machine. RabbitMQ runs on the standard port.

sync-mode connections fail

client = puka.Client("amqp://host/")
promise = client.connect()
client.wait(promise)

This code fails in 0.0.4 because of wait calls self.on_write() before the connection establishes

Traceback (most recent call last):
File "...", line 8, in
client.wait(promise)
File "untitled\lib\site-packages\puka\connection.py", line 183, in wait
self.on_write()
File "untitled\lib\site-packages\puka\connection.py", line 151, in on_write
r = self.sd.send(self.send_buf.read(128*1024))
socket.error: [Errno 10057] A request to send or receive data was disallowed because the socket is not connected and (when sending on a datagram socket using a sendto call) no address was supplied

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.