eandersson / amqpstorm Goto Github PK
View Code? Open in Web Editor NEWThread-safe Python RabbitMQ Client & Management library
Home Page: https://www.amqpstorm.io/
License: MIT License
Thread-safe Python RabbitMQ Client & Management library
Home Page: https://www.amqpstorm.io/
License: MIT License
Hi,
break_on_empty=True
in channel.build_inbound_messages
is set, it randomly gets between 1 to 15 messages before exiting whereas there are still thousands of messages in the queue. The queue contains 10k messages, no publishers, the queue is still. Each run of the below code would get a random amount of messages and then exit. Without break_on_empty
, it gets the 10k messages in less than 10sec but then it remains connected which I don't want.An edited version of the code I used.
from amqpstorm import Connection
queue='simple_queue'
server='127.0.0.1'
username = password = 'guest'
count=0
with Connection(server, username, password) as connection:
with connection.channel() as channel:
#channel.queue.declare(queue)
channel.basic.qos(100)
channel.basic.consume(queue=queue, no_ack=False)
for message in channel.build_inbound_messages(break_on_empty=True):
if message:
count += 1
message.ack()
ref: https://www.amqpstorm.io/api/channel.html#amqpstorm.Channel.build_inbound_messages
Thanks for this library.
I'm seeing this in particular with Docker... I run $ docker run -p 5672:5672 rabbitmq
, and then before RabbitMQ has had a chance to start up, I run $ python3 -c "import amqpstorm; amqpstorm.Connection('localhost', 'guest', 'guest')"
. I get the following error thirty seconds later (well after RabbitMQ has finished starting): amqpstorm.exception.AMQPConnectionError: Connection timed out
Replicated in a single command:
$ > docker run -p 5672:5672 -d rabbitmq; sleep 1s; python -c "import amqpstorm; amqpstorm.Connection('localhost', 'guest', 'guest')"
31a4da1f7aa3015550be080af4bc4bb76f0540b323ca0935bbe88f4d1082b1b0
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "/usr/lib/python3.5/site-packages/amqpstorm/connection.py", line 65, in __init__
self.open()
File "/usr/lib/python3.5/site-packages/amqpstorm/connection.py", line 123, in open
self._wait_for_connection_to_open()
File "/usr/lib/python3.5/site-packages/amqpstorm/connection.py", line 227, in _wait_for_connection_to_open
raise AMQPConnectionError('Connection timed out')
amqpstorm.exception.AMQPConnectionError: Connection timed out
$ >
If I sleep
for, say, five seconds after starting the Docker container, then the connection gets made just fine, and immediately.
I plan on releasing pamqp 3.0 soon and wanted to make sure you were aware of the changes that are making it into 3.0 and give you an opportunity to report any issues prior to its release.
Please see the version history @ https://pamqp.readthedocs.io/en/latest/history.html
Hello,
I'am trying to set up a fanout
exchange to deliver a message to multiple consumers. I didn't find any example for that. I tried to write something similar to the pika
example here https://www.rabbitmq.com/tutorials/tutorial-three-python.html. The code doesn't raise any error, but only the first consumer receives message. Here the relevant code:
Publisher:
self.connection = amqpstorm.Connection(self.host, self.username, self.password)
self.channel = self.connection.channel()
self.channel.exchange.declare(exchange=self.exchange, exchange_type='fanout')
self.channel.queue.declare(self.queue, durable=True)
self.channel.queue.bind(self.queue, exchange=self.exchange)
self.channel.confirm_deliveries()
self.channel.basic.publish(body=body, routing_key=self.queue, mandatory=True, exchange=self.exchange)
Consumer:
self.connection = amqpstorm.Connection(self.host, self.username, self.password)
self.channel = self.connection.channel()
self.channel.exchange.declare(exchange=self.exchange, exchange_type='fanout')
self.channel.queue.declare(self.queue, durable=True)
self.channel.queue.bind(self.queue, exchange=self.exchange)
self.channel.confirm_deliveries()
self.channel.basic.consume(self.queue, no_ack=False)
Is the correct code to set up a fanout
exchange ? I tried with no_ack=False
and no_ack=True
, and different variants of the code above, but I always get the same thing: no error, but only the first consumer receives the message.
Thank you in advance for your help.
Hi,
in io. _find_address_and_connect the except: (IOError and OSError) are handled silently and the actual error message is missed.
when neither of address is reachable the AMQPConnectionError("Could not connect host : port") is thrown without the original cause.
The issue with it is: When for example certificate validation error appears, it is not printed anywhere (it enters the except clause above)
Could You consider either throwing the original message in AMQPConnectionError or at least print it to the logger?
Best regards,
Kamil
It's currently not possible to set the Heartbeat interval to 0.
Hi,
First, let me thank you for your library. It works really well. I am using it inside celery tasks to send log message in an AMQP queue that gets further process by logstash (so i need thread safety). I was just wondering why does the library spawns a new thread to handle the amqp connection, can't it be done in the main process ? I would greatly appreciate if you explain to me the workflow of the library in a threading point of view, I don't really understand how rabbitmq and threading should be linked..
Cheers,
I'm currently working on a RabbitMQ plugins written in python to improve Openstack backend monitoring and I found that management API and saw that api/aliveness-test is here already
Expected returned HTTP body on a stopped RabbitMQ server
$ curl --silent -k -u monitor:$MonitorPass https://localhost:15671/api/healthchecks/node/rabbit@eulab-os-01-rmq02 | python -m json.tool
{
"reason": "nodedown",
"status": "failed"
}
Expected returned HTTP body on a running RabbitMQ server
$ curl --silent -k -u monitor:$MonitorPass https://localhost:15671/api/healthchecks/node/rabbit@eulab-os-01-rmq01 | python -m json.tool
{
"status": "ok"
}
Official API doc : https://cdn.rawgit.com/rabbitmq/rabbitmq-management/rabbitmq_v3_6_10/priv/www/api/index.html
Hi, i had this exception:
"Traceback (most recent call last):
File "rpc_server.py", line 10, in
CONNECTION = amqpstorm.Connection('localhost', 'user', 'pass',port=5672)
File "/tmp/test/lib/python3.3/site-packages/amqpstorm/connection.py", line 70, in init
self.open()
File "/tmp/test/lib/python3.3/site-packages/amqpstorm/connection.py", line 190, in open
self._io.open()
File "/tmp/test/lib/python3.3/site-packages/amqpstorm/io.py", line 99, in open
sock_addresses = self._get_socket_addresses()
File "/tmp/test/lib/python3.3/site-packages/amqpstorm/io.py", line 150, in _get_socket_addresses
raise AMQPConnectionError(why)
amqpstorm.exception.AMQPConnectionError: [Errno 9] service name not available for the specified socket type"
trying to run your rpc_server.py exemple on a Sun Solaris 10 VM.
I've roughly solved the problem adding this last parameter to line 145 of "io.py":
"addresses = socket.getaddrinfo(self._parameters['hostname'],self._parameters['port'],family,socket.SOCK_STREAM)"
I hope it can be useful.
Could you please distribute the MIT license there so distributors can provide it when installing the version.
Another issue is that 2.4.1 tag is missing from the git repo so I can't just quickly swap the tarball from pypi to github one.
We use a version of scalable_consumer where we attempt to re-open the channel on AmqpChannelError rather than destroying the entire connection.
However, this leads to a channel leak. The channels are never removed from the channel list on the server.
I have noticed that on a channel error that CloseOk is never sent... the spec (to me) is unclear as to whether it is necessary but maybe it is as the channels are never fully closed on the server. (v3.5.8)
I attempt to bind to an non-existent queue to cause the error in testing.
Hello again. I'm using AMQPStorm==2.4.0 on production and it a great little lib - thanks again! However, I'm occasionally (twice a month?) seeing a connection timeout:
amqpstorm.exception.AMQPConnectionError: Connection dead, no heartbeat or data received in >= 20s
what's the best practice here? should I catch the exception and re-establish the connection on the app level with backoff? is there some mechanism in amqpstorm that can do this?
thx!
The value of some properties (i.e: message_id) are being returned as byte array, same thing for headers keys, they could be returned as string for easy use.
Thanks!
Hello,
Just a reminder, to renew the certificate.
Thank you.
I use RabbitMQ, all the excahnges and consumers were preconfigured with differnet options. Unfortunately when I try to consume from a durable queue I get
mqpstorm.exception.AMQPChannelError: Channel 1 was closed by remote server: NOT_FOUND - no previously declared queue
If I try to declare a queue without specifying any params I get
Channel 1 was closed by remote server: PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'test_queue' in vhost 'lizziebot': received 'false' but current is 'true'
The only way is to specify all the params including including ttl, autodelete, etc. I cannot use this library because I need rabbitmq structure to be configured not on the client side ;-(
dear all,
I have a very simple publish example and running into this error Could you advise?
self.channel.basic.publish(body=content, exchange=destination, routing_key=destination)
File "/usr/local/lib/python3.6/site-packages/amqpstorm/basic.py", line 199, in publish
self._channel.write_frames(frames_out)
File "/usr/local/lib/python3.6/site-packages/amqpstorm/channel.py", line 353, in write_frames
self.check_for_errors()
File "/usr/local/lib/python3.6/site-packages/amqpstorm/channel.py", line 183, in check_for_errors
self._connection.check_for_errors()
File "/usr/local/lib/python3.6/site-packages/amqpstorm/connection.py", line 186, in check_for_errors
raise self.exceptions[0]
File "/rtmtb/rtmtb/strategy/strategy_consumer.py", line 382, in work
self.send_to_output(signal=signal, dest_idx=0)
File "/rtmtb/rtmtb/consumer.py", line 67, in send_to_output
self.message.publish(self.destination[dest_idx], signal._asdict())
File "/rtmtb/rtmtb/advisors/message/rabbit_storm.py", line 59, in publish
self.channel.basic.publish(body=content, exchange=destination, routing_key=destination)
File "/usr/local/lib/python3.6/site-packages/amqpstorm/basic.py", line 199, in publish
self._channel.write_frames(frames_out)
File "/usr/local/lib/python3.6/site-packages/amqpstorm/channel.py", line 353, in write_frames
self.check_for_errors()
File "/usr/local/lib/python3.6/site-packages/amqpstorm/channel.py", line 183, in check_for_errors
self._connection.check_for_errors()
File "/usr/local/lib/python3.6/site-packages/amqpstorm/connection.py", line 186, in check_for_errors
raise self.exceptions[0]
File "/rtmtb/rtmtb/strategy/strategy_consumer.py", line 382, in work
self.send_to_output(signal=signal, dest_idx=0)
File "/rtmtb/rtmtb/consumer.py", line 67, in send_to_output
self.message.publish(self.destination[dest_idx], signal._asdict())
File "/rtmtb/rtmtb/advisors/message/rabbit_storm.py", line 59, in publish
self.channel.basic.publish(body=content, exchange=destination, routing_key=destination)
File "/usr/local/lib/python3.6/site-packages/amqpstorm/basic.py", line 199, in publish
self._channel.write_frames(frames_out)
File "/usr/local/lib/python3.6/site-packages/amqpstorm/channel.py", line 353, in write_frames
self.check_for_errors()
File "/usr/local/lib/python3.6/site-packages/amqpstorm/channel.py", line 183, in check_for_errors
self._connection.check_for_errors()
File "/usr/local/lib/python3.6/site-packages/amqpstorm/connection.py", line 186, in check_for_errors
raise self.exceptions[0]
File "/rtmtb/rtmtb/strategy/strategy_consumer.py", line 382, in work
self.send_to_output(signal=signal, dest_idx=0)
File "/rtmtb/rtmtb/consumer.py", line 67, in send_to_output
self.message.publish(self.destination[dest_idx], signal._asdict())
File "/rtmtb/rtmtb/advisors/message/rabbit_storm.py", line 59, in publish
self.channel.basic.publish(body=content, exchange=destination, routing_key=destination)
File "/usr/local/lib/python3.6/site-packages/amqpstorm/basic.py", line 199, in publish
self._channel.write_frames(frames_out)
File "/usr/local/lib/python3.6/site-packages/amqpstorm/channel.py", line 353, in write_frames
self.check_for_errors()
File "/usr/local/lib/python3.6/site-packages/amqpstorm/channel.py", line 183, in check_for_errors
self._connection.check_for_errors()
File "/usr/local/lib/python3.6/site-packages/amqpstorm/connection.py", line 186, in check_for_errors
raise self.exceptions[0]
File "/rtmtb/rtmtb/strategy/strategy_consumer.py", line 382, in work
self.send_to_output(signal=signal, dest_idx=0)
File "/rtmtb/rtmtb/consumer.py", line 67, in send_to_output
self.message.publish(self.destination[dest_idx], signal._asdict())
File "/rtmtb/rtmtb/advisors/message/rabbit_storm.py", line 59, in publish
self.channel.basic.publish(body=content, exchange=destination, routing_key=destination)
File "/usr/local/lib/python3.6/site-packages/amqpstorm/basic.py", line 199, in publish
self._channel.write_frames(frames_out)
File "/usr/local/lib/python3.6/site-packages/amqpstorm/channel.py", line 353, in write_frames
self.check_for_errors()
File "/usr/local/lib/python3.6/site-packages/amqpstorm/channel.py", line 183, in check_for_errors
self._connection.check_for_errors()
File "/usr/local/lib/python3.6/site-packages/amqpstorm/connection.py", line 186, in check_for_errors
raise self.exceptions[0]
amqpstorm.exception.AMQPConnectionError: Connection dead, no heartbeat or data received in >= 120s
Hi
I'm using amqpstorm 2.2.0 to consume messages from RabbitMQ.
I need a high rate of messages. With no_ack=false I increased the QOS value higher and higher to get better message rates. With ack I can reach up to 1.5k msg/s
Then I tried with no_ack=True and got the rates (4k/s) I needed. But the QUS value is ignored and it seems that amqpstorm reads all messages from RabbitMQ to its local queue.
The memory usage of my script raises until the system crasches ( this only happens if the rabbit queue is very big, after pausing the consumer for a few days).
Is there a way to limit the amount of used memory in amqpstorm basic.consume?
Chris
Hi,
It seems like the publish returns true even if I explicitly return a nack for a message. for example I have queue with the following settings in a RabbitMQ server:
args = {
'x-max-length': 10, # maximum length of queue, will receive nack afterwards
'x-overflow': 'reject-publish', # Reject the new publish with a nack
# 'x-message-ttl': 5000,
# 'x-dead-letter-exchange': 'amq.direct',
# 'x-dead-letter-routing-key': self.rpc_queue + '/dlx'
}
But even if the max-length is reached, the publish function returns true. I need this info to reject the process in my flask app.
I also tried to add a consumer and explicitly nack the message with reque=False, still the sender has no idea. BTW, I started from the scalable rpc example and when the maximum queue length has reached, I want to reject the request.
BTW, I also have called the channel.confirm_deliveries() method.
Is there anything (like settings) that I have missed?
DEBUG:amqpstorm.connection:Connection Opening
Traceback (most recent call last):
File "C:/Users/eandersson/Documents/GitHub/amqp-storm/examples/simple_publisher.py", line 29, in <module>
publisher()
File "C:/Users/eandersson/Documents/GitHub/amqp-storm/examples/simple_publisher.py", line 0, in publisher
File "C:\Users\eandersson\Documents\GitHub\amqp-storm\amqpstorm\connection.py", line 69, in __init__
self.open()
File "C:\Users\eandersson\Documents\GitHub\amqp-storm\amqpstorm\connection.py", line 174, in open
self._io.open()
File "C:\Users\eandersson\Documents\GitHub\amqp-storm\amqpstorm\io.py", line 98, in open
self.socket = self._find_address_and_connect(sock_addresses)
File "C:\Users\eandersson\Documents\GitHub\amqp-storm\amqpstorm\io.py", line 160, in _find_address_and_connect
sock = self._create_socket(socket_family=address[0])
File "C:\Users\eandersson\Documents\GitHub\amqp-storm\amqpstorm\io.py", line 177, in _create_socket
sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
AttributeError: 'module' object has no attribute 'SOL_TCP'
Hi there,
thanks for a great package and implementing heartbeat timer.
I was developing something locally (pointing to local rabbitmq) and it was working perfectly, but after deploying my app to the cloud and pointing to cloudamqp.com, your package cannot connect anymore...
I've tried to compare your package with other clients:
Python 3.6.3 (default, Oct 3 2017, 21:45:48)
[GCC 7.2.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import os
>>> RABBIT_DSN = os.getenv('RABBIT_DSN')
>>> import pika
>>> connection = pika.BlockingConnection(pika.URLParameters(RABBIT_DSN))
>>> channel = connection.channel() # worked
>>> import rabbitpy
>>> connection = rabbitpy.Connection(RABBIT_DSN)
>>> channel = connection.channel() # worked
>>> import amqpstorm
>>> connection = amqpstorm.UriConnection(RABBIT_DSN) # timeout :(
command terminated with exit code 137
the last command terminates the terminal.
Traceback (most recent call last):
File "importer.py", line 240, in <module>
connection = UriConnection(RABBIT_DSN)
File "/app/.local/share/virtualenvs/src-sjxfABcx/lib/python3.6/site-packages/amqpstorm/uri_connection.py", line 46, in __init__
**kwargs)
File "/app/.local/share/virtualenvs/src-sjxfABcx/lib/python3.6/site-packages/amqpstorm/connection.py", line 70, in __init__
self.open()
File "/app/.local/share/virtualenvs/src-sjxfABcx/lib/python3.6/site-packages/amqpstorm/connection.py", line 193, in open
self._wait_for_connection_state(state=Stateful.OPEN)
File "/app/.local/share/virtualenvs/src-sjxfABcx/lib/python3.6/site-packages/amqpstorm/connection.py", line 315, in _wait_for_connection_state
raise AMQPConnectionError('Connection timed out')
amqpstorm.exception.AMQPConnectionError: Connection timed out
For you information, our deployment is made to kubernetes on GCP. The connection URL string has a following format:
amqp://user:[email protected]/user
I would appreciate any help :)
Good morning,
I see that closing connection takes 10 seconds executing this code:
import ssl
import os
import sys
import logging
import time
LOG = logging.getLogger('SM.AmqpRpcClient')
class AmqpRpcClient(object):
def __init__(self,props):
self.__props = props
def exec_rpc(self,exchange = "", routing_key="",message_body=None,timeout=15):
hostname = self.__props["host"]
ca_path = self.__props["ca_path"]
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) # pylint: disable=no-member
ssl_context.load_verify_locations(cafile=ca_path)
import amqpstorm # pylint: disable=import-error
with amqpstorm.Connection(
hostname = hostname,
port = int(self.__props["port"]),
username = self.__props["user"],
password = self.__props["password"],
virtual_host=self.__props["vhost"],
ssl=True,
ssl_options={
"server_hostname":hostname,
"suppress_ragged_eofs":False,
"context":ssl_context},
heartbeat=60) as connection:
with connection.channel() as channel:
result = channel.queue.declare(exclusive=True,auto_delete=True,durable=False)
response_queue = result['queue']
message = amqpstorm.Message.create(channel, body=message_body)
message.reply_to = response_queue
correlation_id = message.correlation_id
LOG.info("RPC correlation id: " + correlation_id)
message.publish(exchange=exchange,routing_key=routing_key)
i = 0
while i< timeout:
message = channel.basic.get(response_queue)
if not message:
LOG.debug("waiting for response...")
i+=1
time.sleep(1)
else:
if message.correlation_id == correlation_id:
message.ack()
return message.body
else:
LOG.info("not my message " + str(message.correlation_id))
LOG.info("Closing channel...")
LOG.info("Closing connection...")
raise TimeoutError("RPCCall: No response returned")
In this case the there is no response in a queue so the TimeoutError error is raised 10 seconds after closing connection.
I am using SSL TLS 1.2 connection.
best regards,
Kamil Gรณrka
Hey,
So while AMQP-Storm has existed, as well... AMQP-Storm for two years now, and I have been thinking about re-branding the project for a long time. This is primarily because the current name, is easily confused with StormMQ. Instead, I would like something more unique.
The implications of changing the name of a project are huge, but as the library is extremely stable at the moment, and is unlikely to be changed a lot over the next year or so. I think it would give people sufficient time to migrate to the new name.
Anyway, I was hoping to get some feedback on the idea, and hopefully some neat name suggestions!
Is acknowledging multiple messages supported? Calling channel.basic.ack(multiple=True) gives me
File "/Users/kramer/Documents/amqpstorm/amqpstorm/basic.py", line 217, in ack
self._channel.write_frame(ack_frame)
File "/Users/kramer/Documents/amqpstorm/amqpstorm/channel.py", line 328, in write_frame
self._connection.write_frame(self.channel_id, frame_out)
File "/Users/kramer/Documents/amqpstorm/amqpstorm/connection.py", line 206, in write_frame
frame_data = pamqp_frame.marshal(frame_out, channel_id)
File "/anaconda/envs/turnip/lib/python3.6/site-packages/pamqp/frame.py", line 103, in marshal
return _marshal_method_frame(frame_value, channel_id)
File "/anaconda/envs/turnip/lib/python3.6/site-packages/pamqp/frame.py", line 255, in _marshal_method_frame
frame_value.marshal())
File "/anaconda/envs/turnip/lib/python3.6/site-packages/pamqp/specification.py", line 191, in marshal
output.append(encode.by_type(data_value, data_type))
File "/anaconda/envs/turnip/lib/python3.6/site-packages/pamqp/encode.py", line 358, in by_type
return long_long_int(value)
File "/anaconda/envs/turnip/lib/python3.6/site-packages/pamqp/encode.py", line 127, in long_long_int
raise TypeError("int type required")
TypeError: int type required
I know this sounds crazy :)
There seems to be an issue with prefetch/ack/messagebody
If you set QOS to 1
and publish messages with headers but no body (via management plugin)
1 message is fetched but never hits the handler and hence never acked.
Using simple_consumer.py against 3.5.7 and 3.6.2
The use case is I'm trying to consume rabbit event messages which have no body
QOS 1 is to clearly illustrate the issue, any QOS seems to result in one unprocessed message
$ python test-aliveness.py
Connection Error: HTTPSConnectionPool(host='127.0.0.1', port=15671): Max retries exceeded with url: /api/aliveness-test/%2F (Caused by SSLError(SSLError(1, u'[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed (_ssl.c:579)'),))
From the example
from amqpstorm.management import ApiConnectionError
from amqpstorm.management import ApiError
from amqpstorm.management import ManagementApi
if __name__ == '__main__':
API = ManagementApi('https://127.0.0.1:15671', 'Username', 'Password')
try:
result = API.aliveness_test('/')
if result['status'] == 'ok':
print("RabbitMQ is alive!")
else:
print("RabbitMQ is not alive! :(")
except ApiConnectionError as why:
print('Connection Error: %s' % why)
except ApiError as why:
print('ApiError: %s' % why)
Possible solution would be to allow "verify=False" for requests
thank you for your great work. Would you be able to provide an example how to send a message to a queue with a defined TTL?
I would like to set the ttl on the message it self, since different messages could have different TTL's
kind regards
Very minor issue.
if you use UriConnection and add a heartbeat querystring option i.e. ?heartbeat=30 a warning is output "invalid ssl option: heartbeat"
The heartbeat is still set and nothing seems affected.
Hello,
We recently updated the library version to 2.1.3 (from 1.1.7) and we now face several errors we did not have previously. One of them is the following:
self.channel.basic.publish(body=data, exchange=self.exchange, routing_key='')
File "/home/wirebot/.virtualenvs/cayzn_yield.etl/local/lib/python2.7/site-packages/amqpstorm/basic.py", line 194, in publish
self._channel.write_frames(frames_out)
File "/home/wirebot/.virtualenvs/cayzn_yield.etl/local/lib/python2.7/site-packages/amqpstorm/channel.py", line 326, in write_frames
self.check_for_errors()
File "/home/wirebot/.virtualenvs/cayzn_yield.etl/local/lib/python2.7/site-packages/amqpstorm/channel.py", line 169, in check_for_errors
self._connection.check_for_errors()
File "/home/wirebot/.virtualenvs/cayzn_yield.etl/local/lib/python2.7/site-packages/amqpstorm/connection.py", line 155, in check_for_errors
raise self.exceptions[0]
AMQPConnectionError: Connection was closed by remote server: CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'
On the rabbitmq server, we have the following logs:
client unexpectedly closed TCP connection
Finally here is the code i use to create an amqp socket (this socket is then used by logging to send log in the queue) :
class AMQPStormSocket(object):
def __init__(self, host, port, username, password, virtual_host, exchange, queue, exchange_is_durable,
queue_is_durable, exchange_type, fallback_call):
# create connection & channel
self.connection = amqpstorm.Connection(host, username, password, port, virtual_host=virtual_host, timeout=1)
self.channel = self.connection.channel()
# create an exchange, if needed
self.channel.exchange.declare(exchange=exchange, exchange_type=exchange_type, durable=exchange_is_durable)
# create a queue, if needed
self.channel.queue.declare(queue=queue, durable=queue_is_durable, passive=False, auto_delete=False)
# bind it
self.channel.queue.bind(queue=queue, exchange=exchange)
# needed when publishing
self.exchange = exchange
self.fallback_call = fallback_call
def sendall(self, data):
try:
self.channel.basic.publish(body=data, exchange=self.exchange, routing_key='')
except Exception as e:
self.fallback_call(e)
def close(self):
try:
self.channel.close()
self.connection.close()
except Exception:
pass
Do you have an idea on how to fix these errors ?
hi,
we run into another problem, basically for some reason the amqp stack is consuming ungodly amounts of memory and we feel we are doing something wrong.
example using tracemalloc:
Top 30 lines
#1: pamqp/frame.py:62: 951.8 MiB
frame_data = data_in[FRAME_HEADER_SIZE:byte_count - 1]
#2: pamqp/decode.py:417: 361.4 MiB
return value.decode('utf-8')
#3: pamqp/decode.py:296: 165.6 MiB
data = {}
#4: pamqp/decode.py:258: 154.9 MiB
return 8, time.gmtime(value[0])
#5: pamqp/header.py:84: 109.0 MiB
self.properties = properties or specification.Basic.Properties()
#6: pamqp/body.py:21: 78.8 MiB
self.value = value
#7: pamqp/header.py:81: 78.8 MiB
self.class_id = None
#8: pamqp/frame.py:135: 57.4 MiB
method = specification.INDEX_MAPPINGmethod_index
#9: pamqp/frame.py:157: 40.2 MiB
content_header = header.ContentHeader()
#10: pamqp/frame.py:172: 40.2 MiB
content_body = body.ContentBody()
#11: pamqp/header.py:104: 20.1 MiB
self.body_size) = struct.unpack('>HHQ', data[0:12])
#12: pamqp/decode.py:157: 20.1 MiB
return 8, struct.unpack('>q', value[0:8])[0]
#13: amqpstorm/channel.py:229: 18.9 MiB
self._inbound.append(frame_in)
#14: :525: 9.8 MiB
#15: json/decoder.py:353: 4.6 MiB
obj, end = self.scan_once(s, idx)
This is after running for about 15-20 minutes and receiving maybe a million messages. Any idea why there is such a buildup of memory? It quickly exceeds a couple GB and we feel there is some odd issue happening.
kind regards
Hello,
I've run into an issue with creating new channels and receiving the reached the maximum number of channels 65535
when attempting to create a new channel.
After some digging, I noticed Connection._get_next_available_channel_id()
accounts for all channels, both open and closed. I believe filtering the count for just opened should resolve this issue.
I tested with a quick fix
def _get_next_available_channel_id(self):
channel_id = len(self._channels) + 1
active_channels = [
ch for ch in list(self._channels.values()) if ch and ch.is_open
]
if len(active_channels) >= self.max_allowed_channels:
raise AMQPConnectionError(
'reached the maximum number of channels %d' %
self.max_allowed_channels)
return channel_id
However it may be better to just keep an active count
Hello there. First please allow me to extend my gratitude for amqp-storm: I love it and have been using it for a few months.
here's my issue:
I wrote a small connection-pool implementation on top of amqp-storm and was hoping to use that in my gevent-patched Flask webservice. However, the handshake between my client and rabbit fails with timeout.
Below is the output from docker-compose (the server and rabbit are each docker'd) - its interleaved.
As you can see the connection is opened but it appears that the handshake fails with timeout (nothing comes through?)
server-dev_1 | Connection Opening
rabbit_1 |
rabbit_1 | =INFO REPORT==== 23-Jun-2016::14:26:48 ===
rabbit_1 | accepting AMQP connection <0.19907.0> (172.18.0.5:52906 -> 172.18.0.2:5672)
rabbit_1 |
rabbit_1 | =ERROR REPORT==== 23-Jun-2016::14:26:58 ===
rabbit_1 | closing AMQP connection <0.19907.0> (172.18.0.5:52906 -> 172.18.0.2:5672):
rabbit_1 | {handshake_timeout,frame_header}
server-dev_1 | Frame Received: Connection.Start
server-dev_1 | [Errno 104] Connection reset by peer
server-dev_1 | Connection Closing
Many thanks!
hello,
libary is using deprecated way of sll connection (ssl.wrap_socket) which does not handle hostname and server certificate validation.
This enables MITM attack so this is not good.
in io.py in _ssl_wrap_socket instead of
ssl.wrap_socket (...
it should be:
def _ssl_wrap_socket(self, sock):
"""Wrap SSLSocket around the Socket.
:param socket.socket sock:
:rtype: SSLSocket
"""
import ssl
context = ssl.create_default_context()
return context.wrap_socket(
sock, do_handshake_on_connect=True,
server_hostname='hostname.bla.bla',
**self._parameters['ssl_options']
)
This is dirty solution but I just wanted to show where is the problem.
Please see: https://docs.python.org/3/library/ssl.html
Best regards,
Kamil Gรณrka
Is there any guidance for how to reconnect on lost connection when you have multiple consumers?
At the moment I have a channel per thread.
On the main thread I reconnect on error
if self.connection.is_closed:
self.connection.open()
I restart each thread which opens a new channel and consumes.
I notice that after a reconnect it adds another heartbeat checker - each period i see multiple
"Checking for a heartbeat"
Thanks
Whats the best way to reconnect when a channel errors?
Is it OK to call connection.open() to re-open a closed/errored connection? same for channel.open ?
Is it thread safe for multiple threads to try and re-open the connection on error?
Or do we need to pass around a connection wrapper that locks?
hello and thank you again for this fine library (which I'm once again using).
It appears that no heartbeat messages are being sent from amqpstorm. I've configured it to 10 seconds and the rabbit server clearly states that no heartbeat was received in 10 secs.
-I'm working with version 2.4.0
-I'm working with python 3.5.1-3 on ubuntu16.04 LTS
-I've yet to really dig in, but it looks like the timer that should send the message is primed but does not 'pop'.
is this a known issue? I've also tried downgrading to 2.1.2 but to no avail.
at the moment I've disabled heartbeat by setting it to '0'.
thanks!
More of a request than an actual issue.
I have been using the library successfully and thank you for that, however I reached a point of doing the unit tests for my app, and I noticed there are some utility classes such as FakeChannel, FakeConnection, TestFramework and I have been trying to figure out how or if I could use them.
I am looking to test something like the robust consumer from the examples: https://github.com/eandersson/amqpstorm/blob/master/examples/robust_consumer.py - there might be a simpler way to test this and I am missing it.
Some pointers would be great. ๐
Hi,
In this last version the url localhost does not work anymore on OS X (Ubuntu works fine), I got Connection Refused.
url: amqp://guest:guest@localhost/
Below the line which raise the exception:
# file io.py
sock.connect(sock_address_tuple[4])
sock_address_tuple[4] = ('::1', 5672, 0, 0)
If I use 127.0.0.1 rather than localhost it works well.
Cheers!
Part of Producer code service.py
:
class Producer(object):
def __init__(self, **kwargs):
self.max_retries = kwargs.get('max_retries', 3)
self.connection = None
self.host = kwargs.get('host', '127.0.0.1')
self.port = kwargs.get('port', 5672)
self.vhost = kwargs.get('vhost', '/')
self.user = kwargs.get('user', 'guest')
self.password = kwargs.get('password', 'guest')
self.exchange = kwargs.get('exchange', '')
self.exchange_type = kwargs.get('exchange_type', 'direct')
self.tms_queue = kwargs.get('tms_queue', '')
self.account_queue = kwargs.get('account_queue', '')
self.routing_key = kwargs.get('routing_key', 'no-routing-key')
def create_connection(self):
"""Create a connection.
:return:
"""
attempts = 0
while True:
attempts += 1
try:
self.connection = Connection(self.host, self.user,
self.password)
break
except amqpstorm.AMQPError as why:
LOGGER.exception(why)
if self.max_retries and attempts > self.max_retries:
break
time.sleep(min(attempts * 2, 30))
except KeyboardInterrupt:
break
def pub_msg(self, message):
"""Start the Consumers.
:return:
"""
if not self.connection:
self.create_connection()
try:
self._pub_msg(message)
except amqpstorm.AMQPError as why:
LOGGER.info('Start reconnect')
LOGGER.exception(why)
self.create_connection()
self._pub_msg(message)
except KeyboardInterrupt:
self.connection.close()
def _pub_msg(self, message):
channel = self.connection.channel()
# Message Properties.
properties = {
'content_type': 'text/plain',
'delivery_mode': 2
}
# Create the message.
print('Connection is: ', self.connection, message)
message = Message.create(channel, message, properties)
message.publish(self.routing_key, exchange=self.exchange)
channel.close()
def setup(self):
"""
Set up exchange and queue
"""
self.create_connection()
channel = self.connection.channel()
channel.exchange.declare(
exchange=self.exchange, exchange_type=self.exchange_type)
channel.queue.declare(self.tms_queue, durable=True)
channel.queue.declare(self.account_queue, durable=True)
channel.queue.bind(
queue=self.tms_queue,
exchange=self.exchange,
routing_key=self.routing_key)
channel.queue.bind(
queue=self.account_queue,
exchange=self.exchange,
routing_key=self.routing_key)
channel.close()
My producer init in Django project settings.py
from service.py import Producer
MQ_PRODUCER = Producer(**kwargs)
MQ_PRODUCER.setup()
And in one Django app's tasks.py, I use the MQ_PRODUCER
like this:
@app.task(**MQ_TASK_CONFIG)
def send_to_mq(msg):
"""
Send message to mq
"""
try:
if settings.MQ_PRODUCER:
try:
settings.MQ_PRODUCER.pub_msg(msg)
except Exception:
raise Exception
else:
return 'success'
When I start the Celery for the first time, the KeyError
shows, after Producer
recreate a connection, everything is fine. It takes about one minute to start create new connection.
Hi all,
Considering 'official' PyPy support would be great! 5-10x speedup on messaging applications is a good amount!
I am porting code from pika to amqpstorm for thread safety. In the existing code there is a channel that is consuming multiple queues. With amqpstorm all messages go to the last consumed callback:
producer.py
from amqpstorm import Message, UriConnection
msg_properties = {
'content_type': 'application/json',
'delivery_mode': 2
}
connection = UriConnection(
'amqp://guest:guest@localhost:5672/%2F?heartbeat=600'
)
channel = connection.channel()
channel.confirm_deliveries()
channel.exchange.declare(
exchange='testing',
exchange_type='direct',
durable=True
)
Message.create(
channel,
'Test service event.',
msg_properties
).publish(
'service_event',
exchange='testing',
)
Message.create(
channel,
'Test message event.',
msg_properties
).publish(
'message_event',
exchange='testing',
)
channel.close()
connection.close()
consumer.py
from amqpstorm import UriConnection
connection = UriConnection(
'amqp://guest:guest@localhost:5672/%2F?heartbeat=600'
)
channel = connection.channel()
channel.confirm_deliveries()
def service_event(message):
message.ack()
print('From service method.')
def message_event(message):
message.ack()
print('From message method.')
channel.exchange.declare(
exchange='testing',
exchange_type='direct',
durable=True
)
s_queue = 'service_event.queue'
channel.queue.declare(s_queue)
channel.basic.consume(service_event, s_queue)
channel.queue.bind(
exchange='testing',
queue=s_queue,
routing_key='service_event'
)
m_queue = 'message_event.queue'
channel.queue.declare(m_queue)
channel.basic.consume(message_event, m_queue)
channel.queue.bind(
exchange='testing',
queue=m_queue,
routing_key='message_event'
)
try:
channel.start_consuming()
except:
channel.stop_consuming()
channel.close()
connection.close()
Yields:
From message method.
From message method.
But should be:
From service method.
From message method.
Am I doing something wrong here? It seems like one channel should be able to consume multiple queues into the correct callbacks?
Whoooo, more wierdness!
.Thread-21 - INFO - Message channel: <amqpstorm.channel.Channel object at 0x7f5d08cdbcc8>
Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message properties: {'headers': None, 'reply_to': '', 'user_id': '', 'cluster_id': '', 'app_id': '', 'delivery_mode': None, 'content_type': '', 'correlation_id': 'keepalive', 'expiration': '', 'message_id': '', 'message_type': '', 'priority': None, 'content_encoding': 'utf-8', 'timestamp': None}
Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message _channel: <amqpstorm.channel.Channel object at 0x7f5d08cdbcc8>
Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message _method: {'routing_key': b'nak', 'redelivered': False, 'delivery_tag': 5, 'exchange': b'keepalive_exchange140037604406920', 'consumer_tag': b'amq.ctag-DCdsyFR66UJsU7sgkNp9oQ'}
ACK For delivery tag: 5
Main.Connector.Container(/rpcsys).Thread-21 - INFO - Heartbeat packet received! wat
Main.Connector.Internal(/rpcsys).Thread-23 - INFO - Timeout watcher loop. Current message counts: 0 (out: 0, in: 0)
Main.Connector.Container(/rpcsys).Thread-23 - INFO - Interface timeout thread. Ages: heartbeat -> 4.83, last message -> 32.51.
Main.Connector.Container(/rpcsys).Thread-21 - INFO - Received message!
Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message channel: <amqpstorm.channel.Channel object at 0x7f5d08cdbcc8>
Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message properties: {'headers': None, 'reply_to': '', 'user_id': '', 'cluster_id': '', 'app_id': '', 'delivery_mode': None, 'content_type': '', 'correlation_id': 'keepalive', 'expiration': '', 'message_id': '', 'message_type': '', 'priority': None, 'content_encoding': 'utf-8', 'timestamp': None}
Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message _channel: <amqpstorm.channel.Channel object at 0x7f5d08cdbcc8>
Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message _method: {'routing_key': b'nak', 'redelivered': False, 'delivery_tag': 6, 'exchange': b'keepalive_exchange140037604406920', 'consumer_tag': b'amq.ctag-DCdsyFR66UJsU7sgkNp9oQ'}
ACK For delivery tag: 6
Main.Connector.Container(/rpcsys).Thread-21 - INFO - Heartbeat packet received! wat
Main.Connector.Container(/rpcsys).Thread-21 - INFO - Received message!
Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message channel: <amqpstorm.channel.Channel object at 0x7f5d08cdbcc8>
Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message properties: {'headers': None, 'reply_to': '', 'user_id': '', 'cluster_id': '', 'app_id': '', 'delivery_mode': None, 'content_type': '', 'correlation_id': 'keepalive', 'expiration': '', 'message_id': '', 'message_type': '', 'priority': None, 'content_encoding': 'utf-8', 'timestamp': None}
Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message _channel: <amqpstorm.channel.Channel object at 0x7f5d08cdbcc8>
Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message _method: {'routing_key': b'nak', 'redelivered': True, 'delivery_tag': 7, 'exchange': b'keepalive_exchange140037604406920', 'consumer_tag': b'amq.ctag-DCdsyFR66UJsU7sgkNp9oQ'}
ACK For delivery tag: 7
Main.Connector.Container(/rpcsys).Thread-21 - INFO - Heartbeat packet received! wat
Main.Connector.Internal(/rpcsys).Thread-21 - ERROR - Error while in rx runloop!
Main.Connector.Internal(/rpcsys).Thread-21 - ERROR - Channel 1 was closed by remote server: PRECONDITION_FAILED - unknown delivery tag 6
Main.Connector.Internal(/rpcsys).Thread-21 - ERROR - Traceback (most recent call last):
Main.Connector.Internal(/rpcsys).Thread-21 - ERROR - File "/media/Storage/Scripts/ReadableWebProxy/FetchAgent/AmqpConnector/__init__.py", line 546, in _rx_poll
Main.Connector.Internal(/rpcsys).Thread-21 - ERROR - self.interface.process_rx_events()
Main.Connector.Internal(/rpcsys).Thread-21 - ERROR - File "/media/Storage/Scripts/ReadableWebProxy/FetchAgent/AmqpConnector/__init__.py", line 171, in process_rx_events
Main.Connector.Internal(/rpcsys).Thread-21 - ERROR - self.storm_channel.process_data_events(to_tuple=False)
Main.Connector.Internal(/rpcsys).Thread-21 - ERROR - File "/media/Storage/Scripts/ReadableWebProxy/amqpstorm/channel.py", line 266, in process_data_events
Main.Connector.Internal(/rpcsys).Thread-21 - ERROR - for message in self.build_inbound_messages(break_on_empty=True):
Main.Connector.Internal(/rpcsys).Thread-21 - ERROR - File "/media/Storage/Scripts/ReadableWebProxy/amqpstorm/channel.py", line 113, in build_inbound_messages
Main.Connector.Internal(/rpcsys).Thread-21 - ERROR - self.check_for_errors()
Main.Connector.Internal(/rpcsys).Thread-21 - ERROR - File "/media/Storage/Scripts/ReadableWebProxy/amqpstorm/channel.py", line 188, in check_for_errors
Main.Connector.Internal(/rpcsys).Thread-21 - ERROR - raise exception
Main.Connector.Internal(/rpcsys).Thread-21 - ERROR - amqpstorm.exception.AMQPChannelError: Channel 1 was closed by remote server: PRECONDITION_FAILED - unknown delivery tag 6
Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -
Main.Connector.Internal(/rpcsys).Thread-21 - WARNING - Should die flag! Runstate: running, threads live: threads alive, had exception: yes.
Main.Connector.Internal(/rpcsys).Thread-21 - INFO - RX Poll process dying. Threads_live: 1, had exception 1, should_die True
Main.Connector.Internal(/rpcsys).Thread-22 - WARNING - Should die flag! Runstate: running, threads live: threads alive, had exception: yes.
Main.Connector.Internal(/rpcsys).Thread-22 - WARNING - Should die flag! Runstate: running, threads live: threads alive, had exception: yes.
Main.Connector.Internal(/rpcsys).Thread-22 - INFO - TX Poll process dying (should die: True). Threads_live: 1, runstate: 1, resp queue size: 0, had exception 1.
Main.Connector.Internal(/rpcsys).Thread-23 - WARNING - Should die flag! Runstate: running, threads live: threads alive, had exception: yes.
Disconnecting!
Main.Connector.Container(/rpcsys).Thread-2 - INFO - Closing channel
Main.Connector.Container(/rpcsys).Thread-2 - INFO - Closing connection
Running state: True, thread alive: True, thread id:140037440014080
Joining _inbound_thread. Runstate: %s False
Context:
I have a connection with a thread processing the receiving messages. I have instrumented Message.ack()
with a print statement that prints the delivery tag that it's acking.
It appears I'm calling [delivery tag 6].ack(), [delivery tag 7].ack(), and somehow the ack for delivery tag 7 is getting received by the rabbitmq server /first/, resulting in a PRECONDITION_FAILED
error because acking 7 implicitly acks previous tags, and therefore 6 is not a valid delivery tag anymore.
I'm working on pulling out a testable example, but it's certainly odd.
Incidentally, the new docs pages are fancypants!
I have been experiencing a strange error, just for some messages I got this error:
'Deliver' object has no attribute 'body_size'
channel.py (line: 253)
I looked at self._inbound and I saw some Deliver objects in sequence instead of Deliver/ContentHeader/ContentBody
The messages are being sent to the queue by another project in another language, so I have no clue why it is happening.
I regularly get [SSL: BAD_WRITE_RETRY] bad write retry (_ssl.c:1647) errors when running AMQP-Storm using SSL.
Looking at the code I think its because it's not handling SSL Retry exceptions from the socket
i'll get a stack trace to see if its on write or on do_handshake
Hi,
The default values for the timeout
parameter are different in UriConnection
and Connection
cases.
When using UriConnection
, the default value is 30: https://github.com/eandersson/amqpstorm/blob/master/amqpstorm/uri_connection.py#L60
When using Connection
, the default value is 10: https://github.com/eandersson/amqpstorm/blob/master/amqpstorm/connection.py#L58
Is there a reason for this?
Hi. I use your library (ver 1.2.7) in several projects and it works well. But some of this projects rarely prints exception traceback in a log:
Exception in thread amqpstorm.io:
Traceback (most recent call last):
File "/home/user/.pyenv/versions/2.7.10/lib/python2.7/threading.py", line 810, in __bootstrap_inner
self.run()
File "/home/user/.pyenv/versions/2.7.10/lib/python2.7/threading.py", line 763, in run
self.__target(*self.__args, **self.__kwargs)
File "/home/user/.pyenv/versions/2.7.10/lib/python2.7/site-packages/amqpstorm/io.py", line 221, in _process_incoming_data
self.buffer = self.on_read(self.buffer)
File "/home/user/.pyenv/versions/2.7.10/lib/python2.7/site-packages/amqpstorm/connection.py", line 246, in _read_buffer
self._channel0.on_frame(frame_in)
File "/home/user/.pyenv/versions/2.7.10/lib/python2.7/site-packages/amqpstorm/channel0.py", line 40, in on_frame
self._write_frame(Heartbeat())
File "/home/user/.pyenv/versions/2.7.10/lib/python2.7/site-packages/amqpstorm/channel0.py", line 104, in _write_frame
self._connection.write_frame(0, frame_out)
File "/home/user/.pyenv/versions/2.7.10/lib/python2.7/site-packages/amqpstorm/connection.py", line 177, in write_frame
self.io.write_to_socket(frame_data)
File "/home/user/.pyenv/versions/2.7.10/lib/python2.7/site-packages/amqpstorm/io.py", line 133, in write_to_socket
self.on_error(why)
File "/home/user/.pyenv/versions/2.7.10/lib/python2.7/site-packages/amqpstorm/connection.py", line 289, in _handle_socket_error
self.close()
File "/home/user/.pyenv/versions/2.7.10/lib/python2.7/site-packages/amqpstorm/connection.py", line 134, in close
self.io.close()
File "/home/user/.pyenv/versions/2.7.10/lib/python2.7/site-packages/amqpstorm/io.py", line 104, in close
self.inbound_thread.join(timeout=1)
File "/home/user/.pyenv/versions/2.7.10/lib/python2.7/threading.py", line 940, in join
raise RuntimeError("cannot join current thread")
RuntimeError: cannot join current thread
I can't reproduce this error by hand but it still occurs. What are reasons of this behavior? Any suggestions? Thanks
Upgraded to RabbitMq 3.6.2 starting to see the following errors which I think lead to using all the memory on rabbitmq server.
[Channel%d] Unhandled Frame: %s -- %s
No code changes, just a server upgrade.
Not sure if this storm or rabbitmq.
Hello,
when closing connection the following is printed to console:
Exception
in thread amqpstorm.io:
traceback (most recent call last):
File "C:\python371\lib\threading.py", line 917, in _bootstrap_inner
self.run()
File "C:\python371\lib\threading.py", line 865, in run
self._target(*self._args, **self._kwargs)
File "C:\python371\lib\site-packages\amqpstorm\io.py", line 224, in _process_incoming_data
self.data_in += self._receive()
File "C:\python371\lib\site-packages\amqpstorm\io.py", line 239, in _receive
data_in = self._read_from_socket()
File "C:\python371\lib\site-packages\amqpstorm\io.py", line 254, in _read_from_socket
return self.socket.read(MAX_FRAME_SIZE)
File "C:\python371\lib\ssl.py", line 908, in read
raise ValueError("Read on closed or unwrapped SSL socket.")
ValueError: Read on closed or unwrapped SSL socket.`
Is this correct behaviour?
I attach the sample code
import amqpstorm
import sys
import time
import logging
import threading
rabbit_channel = None
rabbit_connection = None
logging.basicConfig(level = logging.DEBUG)
def on_message(message):
"""When server schedules a check for a client, the check payload is received in this callback"""
print(message.body)
def setupChannel():
"""Sets up a rabbitmq channel. Ensure the subscription exchanges exist. Creates client's own exchange and receive queue.
Creates necessary bindings. Sets a consumer class"""
global rabbit_channel
global rabbit_connection
rabbit_channel = rabbit_connection.channel()
client_name = "test"
client_subscription = "test"
#create susbcription exchange for this particular client
rabbit_channel.exchange.declare(exchange=client_subscription, exchange_type='fanout', passive=False, durable=False, auto_delete=False)
# Declare the queue to which the checks requests will be sent
rabbit_channel.queue.declare(queue=client_name, durable=False, exclusive=False, auto_delete=True)
rabbit_channel.queue.bind(client_name, client_subscription)
#setup a consumer
rabbit_channel.basic.consume(on_message, no_ack=True, exclusive = False,queue=client_name)
rabbit_channel.start_consuming()
def setupConnection():
"""
Creates a rabbitmq connection and connects
"""
global rabbit_connection
try:
#we load that lib not from relative path - this will change after package restructure
#if that is not urgently needed I would prefer this to stay here
import amqpstorm # pylint: disable=import-error
except:
print("Cannot import \"amqpstorm\" lib! Exit!")
sys.exit(2)
try:
rabbit_connection = amqpstorm.Connection(
hostname = "test",
port = 5671,
username = "test",
password = "test",
virtual_host="/test",
ssl=True)
except Exception as ex:
message = "Exception on connection open: " + str(ex)
print(message)
raise ConnectionError(str(ex))
def closeConnection():
"""Closes rabbitmq connection and set connection variables to None"""
try:
global rabbit_channel
global rabbit_connection
if rabbit_channel:
rabbit_channel.close()
if rabbit_connection is not None:
if not rabbit_connection.is_closed:
print("Attempt to close transport connection")
rabbit_connection.close()
except Exception as ex:
print("Exception on connection close. " + str(ex))
finally:
rabbit_connection = None
rabbit_channel = None
setupConnection()
t = threading.Thread(target=setupChannel)
t.setDaemon(True)
t.start()
time.sleep(5)
rabbit_channel.close()
closeConnection()
t.join()
input()
Hi, I think I've found one problem in delivery confirmation - in _publish_confirm method. There are three following important (and probably buggy) lines:
self._channel.write_frames(frames_out)
result = self._channel.rpc.get_request(confirm_uuid, raw=True)
self._channel.check_for_errors()
The first line sends data to RMQ, second one tries to get status of this action and the third line checks for errors. Where is the bug? That many things may happen between the write_frames
and delivery checks. I've checked briefly implementation of those two methods (rpc.get_request
and check_for_errors
) and it seems that there is some other network communication that may fail. Which results to delivery-failed response while the messages was actually published.
This may be really unpleasant for a system that e.g. retries in case of a failed publish and duplicate messages are undesirable.
I've verified my theory using docker RMQ, but you can of course use custom RMQ server and somehow interrupt internet connection, see below.
rabbitmq:3-management
, expose ports 5672 and 15672 and run tests to ensure that it works. I did run only some tests: amqpstorm/tests/functional/generic_tests.py
, using nose)_publish_confirm
method and put time.sleep(30)
(30, more or less, so you have enough time to disable connection) after write_frames
, before delivery checks.nosetests amqpstorm/tests/functional/generic_tests.py -m test_functional_publish_and_get_a_large_message
. I also wrapped publish
call in this test to try-except
block so I could print the thrown error in except
block. it was AMQPConnectionError("Connection was closed by remote server: CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'"
sleep
, put to that publish
method, expires.sleep
finishes, it should print the error message.I hope the description is clear. Otherwise, let me know, I'll try to answer your questions.
Error is raised only when a message was not delivered to the RMQ server.
This is a weird one, I've upgraded to 2.2 and trying to do the following script
import os
import logging
logging.basicConfig(level=logging.DEBUG)
from amqpstorm import Message
from amqpstorm import UriConnection
keys = ["1","2","3"]
#signer = Signer()
bus = UriConnection("***")
with bus.channel(rpc_timeout=10) as channel:
channel.confirm_deliveries()
for key in keys:
print key
msg = "My Message"
#properties = {"headers": {"md5-signature": signer.sign(msg)}}
Message.create(channel, msg, properties).publish(key, exchange="amq.topic")
I get this error on the second publish.
DEBUG:amqpstorm.connection:Connection Opening
DEBUG:amqpstorm.channel0:Frame Received: Connection.Start
DEBUG:amqpstorm.channel0:Frame Sent: Connection.StartOk
DEBUG:amqpstorm.channel0:Frame Received: Connection.Tune
DEBUG:amqpstorm.channel0:Frame Sent: Connection.TuneOk
DEBUG:amqpstorm.channel0:Frame Sent: Connection.Open
DEBUG:amqpstorm.channel0:Frame Received: Connection.OpenOk
DEBUG:amqpstorm.heartbeat:Heartbeat Checker Started
DEBUG:amqpstorm.connection:Connection Opened
DEBUG:amqpstorm.connection:Opening a new Channel
DEBUG:amqpstorm.connection:Channel #1 Opened
1
2
DEBUG:amqpstorm.channel:Channel #1 Closing
DEBUG:amqpstorm.channel:Channel #1 Closed
Exception in thread amqpstorm.io:
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 801, in __bootstrap_inner
self.run()
File "/usr/lib/python2.7/threading.py", line 754, in run
self.__target(*self.__args, **self.__kwargs)
File "/home/adam/.venv/local/lib/python2.7/site-packages/amqpstorm/io.py", line 219, in _process_incoming_data
if self.poller.is_ready:
File "/home/adam/.venv/local/lib/python2.7/site-packages/amqpstorm/io.py", line 48, in is_ready
except select.error as why:
AttributeError: 'NoneType' object has no attribute 'error'
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.