Giter Club home page Giter Club logo

nats.py's Introduction

NATS - Python3 Client for Asyncio

An asyncio Python client for the NATS messaging system.

docs pypi Build Status Versions License Apache 2.0

Supported platforms

Should be compatible with at least Python +3.7.

Installing

pip install nats-py

Getting started

import asyncio
import nats
from nats.errors import ConnectionClosedError, TimeoutError, NoServersError

async def main():
    # It is very likely that the demo server will see traffic from clients other than yours.
    # To avoid this, start your own locally and modify the example to use it.
    nc = await nats.connect("nats://demo.nats.io:4222")

    # You can also use the following for TLS against the demo server.
    #
    # nc = await nats.connect("tls://demo.nats.io:4443")

    async def message_handler(msg):
        subject = msg.subject
        reply = msg.reply
        data = msg.data.decode()
        print("Received a message on '{subject} {reply}': {data}".format(
            subject=subject, reply=reply, data=data))

    # Simple publisher and async subscriber via coroutine.
    sub = await nc.subscribe("foo", cb=message_handler)

    # Stop receiving after 2 messages.
    await sub.unsubscribe(limit=2)
    await nc.publish("foo", b'Hello')
    await nc.publish("foo", b'World')
    await nc.publish("foo", b'!!!!!')

    # Synchronous style with iterator also supported.
    sub = await nc.subscribe("bar")
    await nc.publish("bar", b'First')
    await nc.publish("bar", b'Second')

    try:
        async for msg in sub.messages:
            print(f"Received a message on '{msg.subject} {msg.reply}': {msg.data.decode()}")
            await sub.unsubscribe()
    except Exception as e:
        pass

    async def help_request(msg):
        print(f"Received a message on '{msg.subject} {msg.reply}': {msg.data.decode()}")
        await nc.publish(msg.reply, b'I can help')

    # Use queue named 'workers' for distributing requests
    # among subscribers.
    sub = await nc.subscribe("help", "workers", help_request)

    # Send a request and expect a single response
    # and trigger timeout if not faster than 500 ms.
    try:
        response = await nc.request("help", b'help me', timeout=0.5)
        print("Received response: {message}".format(
            message=response.data.decode()))
    except TimeoutError:
        print("Request timed out")

    # Remove interest in subscription.
    await sub.unsubscribe()

    # Terminate connection to NATS.
    await nc.drain()

if __name__ == '__main__':
    asyncio.run(main())

JetStream

Starting v2.0.0 series, the client now has JetStream support:

import asyncio
import nats
from nats.errors import TimeoutError

async def main():
    nc = await nats.connect("localhost")

    # Create JetStream context.
    js = nc.jetstream()

    # Persist messages on 'foo's subject.
    await js.add_stream(name="sample-stream", subjects=["foo"])

    for i in range(0, 10):
        ack = await js.publish("foo", f"hello world: {i}".encode())
        print(ack)

    # Create pull based consumer on 'foo'.
    psub = await js.pull_subscribe("foo", "psub")

    # Fetch and ack messagess from consumer.
    for i in range(0, 10):
        msgs = await psub.fetch(1)
        for msg in msgs:
            await msg.ack()
            print(msg)

    # Create single ephemeral push based subscriber.
    sub = await js.subscribe("foo")
    msg = await sub.next_msg()
    await msg.ack()

    # Create single push based subscriber that is durable across restarts.
    sub = await js.subscribe("foo", durable="myapp")
    msg = await sub.next_msg()
    await msg.ack()

    # Create deliver group that will be have load balanced messages.
    async def qsub_a(msg):
        print("QSUB A:", msg)
        await msg.ack()

    async def qsub_b(msg):
        print("QSUB B:", msg)
        await msg.ack()
    await js.subscribe("foo", "workers", cb=qsub_a)
    await js.subscribe("foo", "workers", cb=qsub_b)

    for i in range(0, 10):
        ack = await js.publish("foo", f"hello world: {i}".encode())
        print("\t", ack)

    # Create ordered consumer with flow control and heartbeats
    # that auto resumes on failures.
    osub = await js.subscribe("foo", ordered_consumer=True)
    data = bytearray()

    while True:
        try:
            msg = await osub.next_msg()
            data.extend(msg.data)
        except TimeoutError:
            break
    print("All data in stream:", len(data))

    await nc.close()

if __name__ == '__main__':
    asyncio.run(main())

TLS

TLS connections can be configured with an ssl context

ssl_ctx = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH)
ssl_ctx.load_verify_locations('ca.pem')
ssl_ctx.load_cert_chain(certfile='client-cert.pem',
                        keyfile='client-key.pem')
await nats.connect(servers=["tls://127.0.0.1:4443"], tls=ssl_ctx, tls_hostname="localhost")

Setting the scheme to tls in the connect URL will make the client create a default ssl context automatically:

import asyncio
import ssl
from nats.aio.client import Client as NATS

async def run():
    nc = NATS()
    await nc.connect("tls://demo.nats.io:4443")

Note: If getting SSL certificate errors in OS X, try first installing the certifi certificate bundle. If using Python 3.7 for example, then run:

$ /Applications/Python\ 3.7/Install\ Certificates.command
 -- pip install --upgrade certifi
Collecting certifi
...
 -- removing any existing file or link
 -- creating symlink to certifi certificate bundle
 -- setting permissions
 -- update complete

NKEYS and JWT User Credentials

Since v0.9.0 release, you can also optionally install NKEYS in order to use the new NATS v2.0 auth features:

pip install nats-py[nkeys]

Usage:

await nats.connect("tls://connect.ngs.global:4222", user_credentials="/path/to/secret.creds")

Development

  1. Install nats server.
  2. Make sure the server is available in your PATH: nats-server -v.
  3. Install dependencies: python3 -m pipenv install --dev.
  4. Run tests: python3 -m pytest.

License

Unless otherwise noted, the NATS source files are distributed under the Apache Version 2.0 license found in the LICENSE file.

nats.py's People

Contributors

4383 avatar allanbank avatar alparslanavci avatar arun11299 avatar bernolt avatar blablatdinov avatar brianshannan avatar bruth avatar bvanelli avatar charbonnierg avatar charliestrawn avatar chiaolun avatar colinsullivan1 avatar csuriano23 avatar domderen avatar ekeew avatar floscha avatar gcolliso avatar gr1n avatar hermesdt avatar kmilhan avatar lancetnik avatar matthiashanel avatar orsinium avatar raprek avatar squat avatar tekumara avatar wallyqs avatar willcodeco avatar yenonn avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

nats.py's Issues

Client.py error

Hi.
In examples/client.py, line 20: def start(self):
must be: async def start(self):
Or in another case this error will pop up:
File "client.py", line 22 await self.nc.connect(io_loop=self.loop) ^ SyntaxError: invalid syntax

Auth Code Authentication

Will auth code authentication be included? Authenticating with token in connection string (i.e. "nats://TOKEN@localhost:4222") passes in the token as username server side. Changing to "nats://:TOKEN@localhost:4222" passes the token in as password, but still does not work.

Pending_size of subscription `_INBOX` is always growing

I am using nats in a long-time task. After a long period, an error of "nats: slow consumer, messages dropped" is raised. I find out that there is a problem when dequeue a message from Subscription _INBOX. Following is a demo to show this case. The pending_size of _INBOX is always growing.

  • NATS Version: 2.0.0-linux (docker)
  • nats.py version: 0.9.2

Demo:

import asyncio
from nats.aio.client import Client as NATS
from nats.aio.errors import ErrConnectionClosed, ErrTimeout, ErrNoServers

async def run(loop):
    nc = NATS()

    await nc.connect("127.0.0.1:4222", loop=loop)

    async def help_request(msg):
        subject = msg.subject
        reply = msg.reply
        data = msg.data.decode()
        print("Received a message on '{subject} {reply}': {data}".format(
            subject=subject, reply=reply, data=data))
        await nc.publish(reply, b'I can help')

    # Use queue named 'workers' for distributing requests
    # among subscribers.
    sid = await nc.subscribe("help", "workers", help_request)

    while True:
        # Send a request and expect a single response
        # and trigger timeout if not faster than 1 second.
        try:
            response = await nc.request("help", b'help me', timeout=1)
            print("Received response: {message}".format(
                message=response.data.decode()))
        except ErrTimeout:
            print("Request timed out")

        # Output all subscriptions' pending_size
        print("NC subs: {}".format(len(nc._subs)))
        for k, v in nc._subs.items():
            print("Key: {}, SubName: {}, pending_size: {}".format(k, v.subject, v.pending_size))

    # Remove interest in subscription.
    await nc.unsubscribe(sid)

    # Terminate connection to NATS.
    await nc.close()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run(loop))
    loop.close()

Output:

NC subs: 2
Key: 1, SubName: help, pending_size: 0
Key: 2, SubName: _INBOX.sXm35gMyTu5v7u0Zmpi9h2.*, pending_size: 28450
Received a message on 'help _INBOX.sXm35gMyTu5v7u0Zmpi9h2.sXm35gMyTu5vlpVcmpi9h2': help me
Received response: I can help
NC subs: 2
Key: 1, SubName: help, pending_size: 0
Key: 2, SubName: _INBOX.sXm35gMyTu5v7u0Zmpi9h2.*, pending_size: 28460
Received a message on 'help _INBOX.sXm35gMyTu5v7u0Zmpi9h2.sXm35gMyTu5vUuVcmpi9h2': help me
Received response: I can help

connect_timeout is doesn't work as expected

When I try to connect to a server that is not responding the connection just hangs forever in _select_next_server. It appears that connect_timeout really only applies to receiving some form of response after the connection was established, but really a user doesn't care about how long one part of the connect takes but wants to control the entirety of the operation. And if that one part of the connection that cannot be constrained by the timeout takes forever that is rather...bad.

Something like that will do:

--- nats/aio/client.py
+++ nats/aio/client.py
@@ -287,7 +287,10 @@ class Client(object):

         while True:
             try:
-                yield from self._select_next_server()
+                select_server = self._select_next_server()
+                yield from asyncio.wait_for(
+                    select_server, self.options["connect_timeout"]
+                )
                 yield from self._process_connect_init()
                 self._current_server.reconnects = 0
                 break

but of course that causes all sorts of things to go south and also would use the same timeout twice.

first install fails

I tried to install, first try fails, probably my fault, but it is strange

$ sudo pip install asyncio-nats-client
Password:
The directory '/Users/sebgoa/Library/Caches/pip/http' or its parent directory is not owned by the current user and the cache has been disabled. Please check the permissions and owner of that directory. If executing pip with sudo, you may want sudo's -H flag.
The directory '/Users/sebgoa/Library/Caches/pip' or its parent directory is not owned by the current user and caching wheels has been disabled. check the permissions and owner of that directory. If executing pip with sudo, you may want sudo's -H flag.
Collecting asyncio-nats-client
  Downloading asyncio-nats-client-0.5.0.tar.gz
    Complete output from command python setup.py egg_info:
    Traceback (most recent call last):
      File "<string>", line 1, in <module>
      File "/private/tmp/pip-build-5STZqO/asyncio-nats-client/setup.py", line 2, in <module>
        from nats.aio.client import __version__
      File "nats/__init__.py", line 10
        yield from nc.connect(**options)
                 ^
    SyntaxError: invalid syntax
    
    ----------------------------------------
Command "python setup.py egg_info" failed with error code 1 in /private/tmp/pip-build-5STZqO/asyncio-nats-client/

Add Subscription type to the API

The subscription methods were modeled after the original Ruby client:

  # Simple publisher and async subscriber via coroutine.
  sid = await nc.subscribe("foo", cb=message_handler)

  # Stop receiving after 2 messages.
  await nc.auto_unsubscribe(sid, 2)

but similar as with other clients, we should change the API so that it is more consistent with the Go client:

  sub = await nc.subscribe("foo", cb=message_handler)
  await sub.auto_unsubscribe(2)
  await sub.unsubscribe()

TLS Reconnect tests flapping

=== RUN ClientTLSReconnectTest.test_tls_reconnect
Fatal write error on socket transport
protocol: <asyncio.sslproto.SSLProtocol object at 0x7f09c89760b8>
transport: <_SelectorSocketTransport fd=11 read=polling write=<idle, bufsize=0>>
Traceback (most recent call last):
  File "/opt/python/3.5.1/lib/python3.5/asyncio/selector_events.py", line 702, in write
    n = self._sock.send(data)
OSError: [Errno 9] Bad file descriptor
ERROR: test_tls_reconnect (tests.client_test.ClientTLSReconnectTest)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/travis/build/nats-io/asyncio-nats/nats/aio/client.py", line 424, in timed_request
    msg = yield from asyncio.wait_for(future, timeout, loop=self._loop)
  File "/opt/python/3.5.1/lib/python3.5/asyncio/tasks.py", line 390, in wait_for
    raise futures.TimeoutError()
concurrent.futures._base.TimeoutError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/home/travis/build/nats-io/asyncio-nats/tests/utils.py", line 295, in wrapper
    loop=test_case.loop))
  File "/opt/python/3.5.1/lib/python3.5/asyncio/base_events.py", line 337, in run_until_complete
    return future.result()
  File "/opt/python/3.5.1/lib/python3.5/asyncio/futures.py", line 274, in result
    raise self._exception
  File "/opt/python/3.5.1/lib/python3.5/asyncio/tasks.py", line 239, in _step
    result = coro.send(None)
  File "/opt/python/3.5.1/lib/python3.5/asyncio/tasks.py", line 386, in wait_for
    return fut.result()
  File "/opt/python/3.5.1/lib/python3.5/asyncio/futures.py", line 274, in result
    raise self._exception
  File "/opt/python/3.5.1/lib/python3.5/asyncio/tasks.py", line 239, in _step
    result = coro.send(None)
  File "/home/travis/build/nats-io/asyncio-nats/tests/client_test.py", line 1050, in test_tls_reconnect
    response = yield from nc.timed_request("example", b'Help!', timeout=1)
  File "/home/travis/build/nats-io/asyncio-nats/nats/aio/client.py", line 428, in timed_request
    raise ErrTimeout
nats.aio.errors.ErrTimeout: nats: Timeout

'Queue' object has no attribute 'task_done' after asyncio loop getters problem

Hi,

I'm currently facing an issue with asyncio loop after more than 48h run (so not easy to reproduce).

This asyncio problem (some loop getters problem I need to troubleshoot deeper) impact the asyncio-nats code as shown bellow :

  File "/usr/lib/python3.4/site-packages/nats/aio/client.py", line 584, in _process_op_err
    self._flush_queue.task_done()
AttributeError: 'Queue' object has no attribute 'task_done'

Maybe is there some protection code to add here ? If you have any tips to help me troubleshoot deeper let me now anyway...

Here's the code where I'm using asyncio-nats : https://github.com/echinopsii/net.echinopsii.ariane.community.cli.python3/blob/master/ariane_clip3/natsd/driver.py

NOTE : I'm using actor pattern (pykka) with asyncio-nats which may not be a great fit... Probably should I use deeper functions (IE : function implementing NATS protocol only) in asyncio-nats to avoid asyncio logic ?

Full stack trace :

Traceback (most recent call last):
  File "/usr/lib/python3.4/site-packages/nats/aio/client.py", line 491, in _flush_pending
    yield from self._flush_queue.put(None)
  File "/usr/lib64/python3.4/asyncio/queues.py", line 126, in put
    'queue non-empty, why are getters waiting?')
AssertionError: queue non-empty, why are getters waiting?

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/python3.4/site-packages/ariane_docker/components.py", line 105, in sniff
    next_action=InjectorCachedComponent.action_update, data_blob=self.data_blob())
  File "/usr/lib/python3.4/site-packages/ariane_clip3/injector.py", line 829, in cache
    json_last_refresh=json_last_refresh, data_blob=data_blob).get()
  File "/usr/lib64/python3.4/site-packages/pykka/threading.py", line 52, in get
    compat.reraise(*self._data['exc_info'])
  File "/usr/lib64/python3.4/site-packages/pykka/compat.py", line 24, in reraise
    raise value
  File "/usr/lib64/python3.4/site-packages/pykka/actor.py", line 201, in _actor_loop
    response = self._handle_receive(message)
  File "/usr/lib64/python3.4/site-packages/pykka/actor.py", line 295, in _handle_receive
    return callee(*message['args'], **message['kwargs'])
  File "/usr/lib/python3.4/site-packages/ariane_clip3/injector.py", line 696, in save
    result = InjectorCachedComponentService.requester.call(args).get()
  File "/usr/lib64/python3.4/site-packages/pykka/threading.py", line 52, in get
    compat.reraise(*self._data['exc_info'])
  File "/usr/lib64/python3.4/site-packages/pykka/compat.py", line 24, in reraise
    raise value
  File "/usr/lib64/python3.4/site-packages/pykka/actor.py", line 201, in _actor_loop
    response = self._handle_receive(message)
  File "/usr/lib64/python3.4/site-packages/pykka/actor.py", line 295, in _handle_receive
    return callee(*message['args'], **message['kwargs'])
  File "/usr/lib/python3.4/site-packages/ariane_clip3/natsd/driver.py", line 507, in call
    next(self.nc.flush(1))
  File "/usr/lib/python3.4/site-packages/nats/aio/client.py", line 433, in flush
    yield from self._send_ping(future)
  File "/usr/lib/python3.4/site-packages/nats/aio/client.py", line 788, in _send_ping
    yield from self._flush_pending()
  File "/usr/lib/python3.4/site-packages/nats/aio/client.py", line 496, in _flush_pending
    NatsError("nats: error kicking the flusher"))
  File "/usr/lib/python3.4/site-packages/nats/aio/client.py", line 584, in _process_op_err
    self._flush_queue.task_done()
AttributeError: 'Queue' object has no attribute 'task_done'

Thank you for your help :)

TLS support

gnatsd supports TLS since version 0.7.0. It would be nice if this library did as well.

Socketio asyncio NATS manage can't get subscribe message

I write client manager for python-socketio on NATS, following the example of a redis manager.
This manager send publish but not get subscribe messages, what am I doing wrong?

NATS (docker latest)
Python (docker python:alpine)

import asyncio
import pickle

try:
    import nats
except ImportError:
    nats = None

from socketio.asyncio_pubsub_manager import AsyncPubSubManager


class AsyncNatsManager(AsyncPubSubManager):  # pragma: no cover
    """NATS based client manager for asyncio servers.

    This class implements a NATS backend for event sharing across multiple
    processes. Only kept here as one more example of how to build a custom
    backend, since the kombu backend is perfectly adequate to support a NATS
    message queue.

    To use a NATS backend, initialize the :class:`Server` instance as
    follows::

        server = socketio.Server(client_manager=socketio.AsyncNatsManager(
            'nats://hostname:port'))

    :param url: The connection URL for the NATS server. For a default NATS
                store running on the same host, use ``nats://``.
    :param channel: The channel name on which the server sends and receives
                    notifications. Must be the same in all the servers.
    :param write_only: If set ot ``True``, only initialize to emit events. The
                       default of ``False`` initializes the class for emitting
                       and receiving.
    """
    name = 'asyncionats'

    def __init__(self, servers=None, channel='socketio',
                 write_only=False):
        if servers is None:
            servers = ["nats://nats:4222"]
        if nats is None:
            raise RuntimeError('NATS package is not installed '
                               '(Run "pip install asyncio-nats-client" in your '
                               'virtualenv).')
        self.servers = servers
        self.queue = asyncio.Queue()
        self.producer = None
        self.consumer = None
        self.sid = None
        super().__init__(channel=channel, write_only=write_only)

    async def _publish(self, data):
        if self.producer is None:
            self.producer = await nats.connect(servers=self.servers)
        return await self.producer.publish(self.channel, pickle.dumps(data))

    async def _listen(self):
        print(self)
        if self.consumer is None:
            self.consumer = await nats.connect(servers=self.servers)
            self.sid = await self.consumer.subscribe('socketio', cb=self.message_handler)
        return await self.queue.get()

    async def message_handler(self, msg):
        subject = msg.subject
        reply = msg.reply
        data = msg.data.decode()
        print(f"Received a message on '{subject} {reply}': {data}")
        await self.queue.put(data)

Configurable concurrency level for messages processing

I have encountered some limitations of the client, I would like to set how many concurrent tasks can be created simultaneously for the subscription.
Currently it can be either one or unlimited (is_async=True).

I found a comment in the client:

# NOTE: Deprecate this usage in a next release,
# the handler implementation ought to decide
# the concurrency level at which the messages
# should be processed.

We can take it a bit further and add ability to configure an exact number of running tasks per subscription.
It should relatively easy to implement using asyncio.Semaphore.

What do you think about it? I would be happy to create a PR.

functools.partial as callback on request

I think it would be a nice idea if you could pass an async function wrapped on functools.partial.
It would be just to add two lines in nats-->aio-->client.py-->line 509:
elif hasattr(cb, "func") and asyncio.iscoroutinefunction(cb.func): sub.coro = cb

aiohttp loop issue

I'd like to have my messages post to nats as I run the function, but cannot workout how to do this within aiohttp

import lasio
import numpy
from aiohttp import web
import json
from nats.aio.client import Client as NATS
import asyncio
from datetime import datetime
import os

async def parse_las(filepath):
    try:
        # read log file
        l = lasio.read(filepath)

        # create log curves array
        log_curves = []

        # loop through curves in file
        for curve in l.curves:

            # create curve object values
            log_curve = {}
            log_curve["mnem"] = curve.mnemonic
            log_curve["units"] = curve.unit
            log_curve["desc"] = curve.descr

            # Load curves
            curves = l[curve.mnemonic]

            # Replace nulls with -999.25 by default (config later)
            curves[numpy.isnan(curves)] = -999.25

            # Convert curve values from ndarray to list
            log_curve["values"] = curves.tolist()

            # write curve to log curves array
            log_curves.append(log_curve)

        # get log params
        params = []
        for parameter in l.params:
            param = {}
            param['mnem'] = parameter.mnemonic
            param['desc'] = parameter.descr
            param['value'] = parameter.value
            params.append(param)

        # get log header
        log_well = []
        for well in l.well:
            log_well_mnem = {}
            log_well_mnem['mnem'] = well.mnemonic
            log_well_mnem['desc'] = well.descr
            log_well_mnem['value'] = well.value
            log_well_mnem['unit'] = well.unit
            log_well.append(log_well_mnem)

        # return data
        return {"status": "success", "filename": os.path.basename(filepath), "data": {"curves": log_curves, "well": log_well, "params": params}}

    # errors
    except KeyError:
        return {"status": "failed", "data": "Could not parse file. Error in file." }

    except WindowsError:
        return {"status": "failed", "data": "Could not open file specified. Check permissions and file location."}

async def send_message(loop, message):
    mq_url = "nats://10.0.20.205:30005"
    client = NATS()
    await client.connect(io_loop=loop, servers=[mq_url])
    await client.publish("logs", message.encode())
    #await client.flush()
    client.close()


async def parse_log(request):
    '''Handles log parsing'''
    print('origin: ' + datetime.now().strftime("%H:%M:%S.%f"))
    try:
        # parse input data
        try:
            data = await request.json()
            #print(data)
        except:
            raise ValueError

        if data is None:
            raise ValueError

        try:
            filepath = str(data['filepath'])
            #filepath = "C:\\las\\1AA040307806W400_5.las"
        except (TypeError, KeyError):
            raise ValueError
        #print(filepath)
        data = await parse_las(filepath)
        #data = parse_las("C:\\las\\1AA040307806W400_5.las")

    except ValueError:
        # if bad request data, return 400 Bad Request
        web.web_response.Response.status = 400
        return web.web_response.Response.status

    except KeyError:
        # if name already exists, return 409 Conflict
        web.web_response.Response.status = 409
        return web.web_response.Response.status

    # return 200 Success
    print('message: ' + datetime.now().strftime("%H:%M:%S.%f"))
    await send_message(loop, json.dumps(data))

    return web.Response(text="{status: success}")

app = web.Application()
app.router.add_post('/log/parse', parse_log)

async def main():
    runner = web.AppRunner(app)
    await runner.setup()
    site = web.TCPSite(runner, '0.0.0.0', 8080)
    await site.start()

loop = asyncio.get_event_loop()
srv = loop.run_until_complete(main())
loop.run_forever()

Simpler way to set NATS endpoint in connect API

Currently when connecting, it is required to set an array with the list of servers as follows:

await nc.connect(servers=["nats://demo.nats.io:4222"])

await nc.connect(["nats://demo.nats.io:4222"], max_reconnect_attempts=-1)

But as in the Go client, this could be simplified to have a bare string with a single endpoint instead without having to specify the scheme:

await nc.connect("demo.nats.io:4222", name='my-client')

or assuming the NATS server port is 4222 as well:

await nc.connect("demo.nats.io", name='my-client')

Configuring TLS connections in python3.7

Hi @wallyqs , a change to python in version 3.7 breaks the recommended method of setting up TLS connections.

>>> import ssl
>>> ssl_ctx = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH)
>>> ssl_ctx.protocol = ssl.PROTOCOL_TLSv1_2
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
AttributeError: can't set attribute

Is there another way to set up an SSL context without setting the protocol property?

Silently Hiding Exceptions in Callbacks

An exception that occur in the call back function are not bubbling up.

Here's an example based on the Basic Usage code form the README. The print statements within the callbacks have been sabotaged.

import asyncio
from nats.aio.client import Client as NATS
from nats.aio.errors import ErrConnectionClosed, ErrTimeout, ErrNoServers

async def run(loop):
    nc = NATS()

    await nc.connect("demo.nats.io:4222", loop=loop)

    async def message_handler(msg):
        subject = msg.subject
        reply = msg.reply
        data = msg.data.decode()
        # 'oops' is not provided so this print will raise an exception
        print("Received a message on '{subject} {reply}': {data} {oops}".format(
            subject=subject, reply=reply, data=data))

    # Simple publisher and async subscriber via coroutine.
    sid = await nc.subscribe("foo", cb=message_handler)

    # Stop receiving after 2 messages.
    await nc.auto_unsubscribe(sid, 2)
    await nc.publish("foo", b'Hello')
    await nc.publish("foo", b'World')
    await nc.publish("foo", b'!!!!!')

    async def help_request(msg):
        subject = msg.subject
        reply = msg.reply
        data = msg.data.decode()
        # 'oops' is not provided so this print will raise an exception
        print("Received a message on '{subject} {reply}': {data} {oops}".format(
            subject=subject, reply=reply, data=data))
        await nc.publish(reply, b'I can help')

    # Use queue named 'workers' for distributing requests
    # among subscribers.
    sid = await nc.subscribe("help", "workers", help_request)

    # Send a request and expect a single response
    # and trigger timeout if not faster than 200 ms.
    try:
        response = await nc.request("help", b'help me', 0.2)
        print("Received response: {message}".format(
            message=response.data.decode()))
    except ErrTimeout:
        print("Request timed out")

    # Remove interest in subscription.
    await nc.unsubscribe(sid)

    # Terminate connection to NATS.
    await nc.close()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run(loop))
    loop.close()

Library reconnects don't work

Originally posted here: nats-io/stan.py#7
I can not get reconnects working, I have tested it with a program publishing and consuming, if I shut down the server for a few seconds, they both will stop working (the publisher will timeout on publishing, the consumer will just not receive any messages anymore)

Connection code:

from functools import partial


async def test_cb(cb_type, *args, **kwargs):
    print(cb_type, args, kwargs)

nc = NATS()
await nc.connect(io_loop=loop, error_cb=partial(test_cb, 'error'), disconnected_cb=partial(test_cb, 'disconnect'), closed_cb=partial(test_cb, 'closed'), reconnected_cb=partial(test_cb, 'reconnect'), ping_interval=25)

Output:

error (NatsError('nats: empty response from server when expecting INFO message',),) {}
reconnect () {}
error (<class 'nats.aio.errors.ErrStaleConnection'>,) {}
disconnect () {}
error (NatsError('nats: empty response from server when expecting INFO message',),) {}
error (ConnectionRefusedError(111, "Connect call failed ('127.0.0.1', 4222)"),) {}
disconnect () {}
closed () {}

What I understand from the defaults it should try to reconnect 10 times with a delay of 2 seconds, which means if the server is down for less then 20 seconds it should at least reconnect and resume operation, unfortunately it won't resume operation (the consumer will just not receive anything anymore, the producer will timeout on a publish call, even when the server is back up again).

Is buffer / max payload size is fixed?

Hello~
I'm bit experienced in rabbitmq, but almost newbie in nats.
While looking on project, it seems max_payload and buffer size are set as default and unchangeable outside of module. Is this the correct part?

Thanks.

Closing the NATS connections blocks on something

After closing the connection with close() some tasks keep running for a few seconds. I expected that 'await nc.close()' would wait until all tasks are cancelled and stopped.

Sample code:

import asyncio
import signal

from nats.aio.client import Client as NATS

NATS_SERVERS = ['nats://localhost:4222']

async def main():
    nc = NATS()
    await nc.connect(servers=NATS_SERVERS)
    print('Connected to NATS at {}'.format(nc.connected_url.netloc))

    async def signal_handler():
        if nc.is_closed:
            return

        print('Disconnecting from NATS...')
        await nc.close()

        pending = asyncio.Task.all_tasks()
        for task in pending:
            print('Task is still running: {} cancelled? {}'.format(task, task.cancelled())) 

    loop = asyncio.get_event_loop()
    for sig in ('SIGINT', 'SIGTERM'):
        loop.add_signal_handler(getattr(signal, sig),
                                lambda: asyncio.ensure_future(signal_handler()))

    # subscribe on messages


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    try:
        loop.run_forever()
    finally:
        print('Shutting down')
        loop.close()

This is the output I get, you can see some tasks are still running.

Connected to NATS at localhost:4222
Disconnecting from NATS...
Task is still running: <Task pending coro=<main.<locals>.signal_handler() running at /usr/src/app/notifier/app.py:35>>, cancelled? False
Task is still running: <Task pending coro=<Client._ping_interval() running at /usr/local/lib/python3.6/site-packages/nats/aio/client.py:963> wait_for=<Future cancelled>>, cancelled? False
Task is still running: <Task pending coro=<Client._read_loop() running at /usr/local/lib/python3.6/site-packages/nats/aio/client.py:996> wait_for=<Future cancelled>>, cancelled? False
Task is still running: <Task cancelling coro=<Client._flusher() running at /usr/local/lib/python3.6/site-packages/nats/aio/client.py:944> wait_for=<Future finished result=None>>, cancelled? False

This is unfortunatly completly useless without even the most basic examples

I was hopeful when i saw CloudNative but this doc is horrible and seems to be moved forward forever. Go side is useful, but this.....

Not a single example where the nats server is somewhere except on localhost. I really doubt NATS is valuable in the community at this stage and maybe we should select more community-friendly cloud-native tools.

And not a single example or ambition to support non-asyncio-approach. Most python scripts are small and become over-complex if they need asyncio mandatory for simple devops stuff or similar.

To bad.
/T

Why is the default behavior of the subscribe method to ignore errors when there is no error_cb defined?

(This was probably discussed before, but I couldn't find where.)

The examples of the library do not force one to use the error cb from day-1. However, without it defined, errors are silently discarded and there are no signs of them ever happening.

I would prefer for the error to actually be raised. Right, the task will stop, but that is a louder noise than just processing messages as if nothing had happened. It would help if error_cb is always defined in the examples.

We really need #76 for this not to matter, I think.

Client silently crashes if callbacks are not co-routines

Documentation does not make it clear that callback functions must be async coroutines. The fact that message handlers can be simple functions does not help with that either.

If, by mistake, someone writes...

def error_cb():
    print("an error occured")

client = Client(...., error_cb=error_cb)

... he will not get any error, and the client will silently crash the first time the error callback is awaited, without any indications.

Sometimes gnats server responds with a "PING" of its own instead of "OK" after initial connection, causing is_connected to never get set on the client.

This can lead to situations where an attempt to connect via a NATS instance using await finishes waiting for the asynchronous connection to complete, yet the NATS instance is not formally connected via the is_connected property being set to True.

When the gnats server responds to a connect command with "OK", if we proceed to immediately write "PING", the server will respond with the expected "PONG".

But when the gnats server responds to a connect command with its own "PING", when we immediately write "PING", the next op from the io_reader buffer will be the server's own "PING" instead of the expected "PONG", which is what client.py is using to consider the client connected.

My proposed fix is to simply add an extra next_op = yield from self._io_reader.readline() after the main connect command is written, but before we send the client's "PING" to the server.

The value of next_op after the initial connection will still be either "OK" or sometimes "PING". In either case, after we write our own "PING" from the client and then drain the writer, the next op read from the io_reader buffer will consistenly be "OK" and then "PONG" (unless there was an actual problem with the client's PING from being recognized by the gnats server).

With the fix, example of when server responds with expected "OK" after initial connection:

client.py: writing connect_cmd [b'CONNECT {"auth_token": "xxxxxxx", "echo": true, "lang": "python3", "pedantic": true, "protocol": 1, "verbose": true, "version": "0.8.2"}\r\n']
client.py: next_op [b'+OK\r\n']
client.py: writing ping_proto [b'PING\r\n']
client.py: next_op [b'PONG\r\n']
client.py: client is_connected set to True!
client.py: process_connect_init() complete

With the fix, example of when server responds with unexpected "PING" after initial connection:

client.py: writing connect_cmd [b'CONNECT {"auth_token": "xxxxxxx", "echo": true, "lang": "python3", "pedantic": true, "protocol": 1, "verbose": true, "version": "0.8.2"}\r\n']
client.py: next_op [b'PING\r\n']
client.py: writing ping_proto [b'PING\r\n']
client.py: next_op [b'+OK\r\n']
client.py: next_op [b'PONG\r\n']
client.py: client is_connected set to True!
client.py: process_connect_init() complete

Note that in the above case, the "OK" is read from the io_reader because verbose is currently set to True.

Without the fix mentioned above, we can sometimes see:

client.py: writing connect_cmd [b'CONNECT {"auth_token": "xxxxxxx", "echo": true, "lang": "python3", "pedantic": true, "protocol": 1, "verbose": true, "version": "0.8.2"}\r\n']
client.py: writing ping_proto [b'PING\r\n']
client.py: next_op [b'PING\r\n']
client.py: client is_connected is NOT set to True!
client.py: process_connect_init() complete

https://github.com/nats-io/asyncio-nats/blob/a370dcc6733a3f32544568afe5885ffdf6c96905/nats/aio/client.py#L1311

Wrong order of callbacks

Hello,

NATS documentation says, that messages from one publisher are received in the same order as they were published. But if we make few subscriptions, and publish messages for each one few times, callbacks will be called in different order. This is because each subscriber has own message queue. Here is example:

receive.py

import asyncio
from nats.aio.client import Client as NATS

async def run(loop):

    await nc.connect("nats://nats:4222", loop=loop)

    async def message_handler_A(msg):
        print('message_handler_A')

    async def message_handler_B(msg):
        print('message_handler_B')

    async def message_handler_C(msg):
        print('message_handler_C')

    await nc.subscribe("message_handler_A", cb=message_handler_A)
    await nc.subscribe("message_handler_B", cb=message_handler_B)
    await nc.subscribe("message_handler_C", cb=message_handler_C)
    print('receiving')


if __name__ == '__main__':
    nc = NATS()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run(loop))
    loop.run_forever()

publish.py:

import asyncio
from nats.aio.client import Client as NATS

async def run(loop):
    await nc.connect("nats://nats:4222", loop=loop)
    for i in range(10):
      await nc.publish("message_handler_B", b"")
      await nc.publish("message_handler_C", b"")
      await nc.publish("message_handler_A", b"")

if __name__ == '__main__':
    nc = NATS()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run(loop))
    loop.run_forever()

Just run publish.py after receive.py.

Use f-strings instead of .format

Python 3.6+ supports a new style of string formatting which reads much easier.

uri = urlparse("{}://{}".format(scheme, connect_url))

becomes

uri = urlparse(f'{scheme}://{connect_url}')

This reads nicer in areas like the protocol where we are concatting strings with + as well.

Need help getting started

Hi sorry about newbie questioin. I've been messing with the basic examples and can't seem to get the effect i want.
I just need a basic example of having a subscription setup and then wait forever, not sure if there is already a built in function for this. The Go version is marvelous but here i'm lost.

Many thanks.

Publishing OpenCV VideoCapture to NATS server

Hi, I am using NATS to perform video streaming between microservices. It works great on NATS video subscriber.

However, I'm facing a problem with reading video frames from webcam and publishing them to nats-server by using OpenCV, cv2.VideoCapture.

I guess, the problem is because of I/O blocking from cv2.VideoCapture.read (I've tried with cv2.imread too, and it caused the same problem.)

And when I move these 3 lines outside the loop, the streaming works fine.

        frame = cap.read()[1]
        frame_encoded = cv2.imencode('.jpg', frame)[1]
        frame_bytes = frame_encoded.tostring()

Can anyone suggest me a solution to solve this NATS video streaming?
or what could be best practices for doing video streaming between microservices by using NATS.io?

Here is my sample code:
publisher.py

import cv2
import asyncio
from nats.aio.client import Client as NATS

async def run(loop):
    nc = NATS()
    await nc.connect(loop=loop)

    cap = cv2.VideoCapture(0)
    while True:
        frame = cap.read()[1]
        frame_encoded = cv2.imencode('.jpg', frame)[1]
        frame_bytes = frame_encoded.tostring()
        
        await nc.publish('foo', frame_bytes)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run(loop))
    loop.run_forever()

My subscriber just simply listens and prints random output to check if it's receiving something.
subscriber.py

import uuid
import asyncio
from nats.aio.client import Client as NATS

async def run(loop):
    nc = NATS()
    await nc.connect(loop=loop)

    async def cb(msg):
        print(uuid.uuid4())

    await nc.subscribe('foo', cb=cb)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run(loop))
    loop.run_forever()

NATS consumer breaks the last callback on drain()

NATS consumer breaks the last message callback while drain()-ing with non-empty message buffer.

Testing code:

#!/usr/bin/env python3

import asyncio
from nats.aio.client import Client as NATS

async def main(loop, uri='nats://localhost:4222', subj='tests.drain_bug', nmsgs=10):
        sent, got, ok = 0, 0, 0

        async def on_msg(msg):
                nonlocal got, ok
                got += 1
                await asyncio.sleep(0.1, loop=loop)
                ok += 1

        sub = NATS()
        await sub.connect(uri, loop=loop)
        await sub.subscribe(subj, cb=on_msg)

        pub = NATS()
        await pub.connect(uri, loop=loop)
        for i in range(nmsgs):
                body = 'message #%d' % i
                await pub.publish(subj, body.encode())
                sent += 1
        await pub.flush() 
        await pub.close() 

        await sub.drain()
        print('sent=%d got=%d ok=%d' % (sent, got, ok))

if __name__ == '__main__':
        loop = asyncio.get_event_loop()
        loop.run_until_complete(main(loop))

Testing output: sent=10 got=10 ok=9
Expected output: sent=10 got=10 ok=10

Tested on Debian/buster with Python v3.7.3rc1 and asyncio-nats-client==0.8.2 (pip version).

How to run worker forever and process future requests via request-reply?

How will you write a code that would wait for all future requests using the request-reply feature for NATS? When I run the script:

    import asyncio
    from nats.aio.client import Client as NATS
    
    async def run(loop):
        nc = NATS()
    
        await nc.connect("nats:4222", loop=loop)
    
        async def help_request(msg):
            subject = msg.subject
            reply = msg.reply
            data = msg.data.decode()
            print("Received a message on '{subject} {reply}': {data}".format(
                subject=subject, reply=reply, data=data))
            await nc.publish(reply, b'I can help')
    
        # Use queue named 'workers' for distributing requests
        # among subscribers.
        await nc.subscribe("help", "workers", help_request)
    
        await nc.close()
    
    if __name__ == '__main__':
        loop = asyncio.get_event_loop()
        loop.run_until_complete(run(loop))
        loop.close()

It just runs through all the codes and then exits quickly. Coming from a NodeJS background, I'm assuming nc.subscribe("help", "workers", help_request) opens up a running job that would hold the process from exiting. How do you do this in python?

Performance problems

Hi!

I want to get know NATS + asyncio-nats-client RPS (request-response case). The test scenario is simple:
server.py

import asyncio

import uvloop

from nats.aio.client import Client


async def main(loop):
    client = Client()
    await client.connect(io_loop=loop)
    print('connected')

    async def handler(msg):
        await client.publish(msg.reply, b'pong')

    await client.subscribe('test', queue='workers', cb=handler)
    print('subscribed')


if __name__ == '__main__':
    loop = uvloop.new_event_loop()
    asyncio.set_event_loop(loop)

    loop.run_until_complete(main(loop))
    print('start listening')
    loop.run_forever()
    loop.close()

client.py

import asyncio
import sys

import uvloop
from nats.aio.client import Client


async def main(loop):
    futures = [
        asyncio.ensure_future(client.timed_request('test', b'ping', 1))
        for i in range(int(sys.argv[1]))
    ]

    await asyncio.wait_for(asyncio.gather(*futures), 1.0)


if __name__ == '__main__':
    loop = uvloop.new_event_loop()
    asyncio.set_event_loop(loop)

    client = Client()
    loop.run_until_complete(client.connect(io_loop=loop))
    print('connected')

    loop.run_until_complete(main(loop))

How to run:

$ gnatsd
$ python3.6 server.py
$ python3.6 client.py 1000

I cant' get more than 1000 requests/responses in 1 second on my MacBook Pro (2.3 GHz Intel Core i5). Maybe I do something wrong or it's library limitations?

Thanks

v0.3.3 release ?

Hi,

I was wondering if something is blocking the v0.3.3 release or are we just missing the tag ?

Thanks

Python NATS Client not able to achieve the desired performance

I have written a short python NATS Client that subscribes to a certain subject and then publishes 'n' number of requests to the gNATSd and eventually to a NatsClient written in C. This C NatsClient then replies on the 'reply subject' mentioned in the publish_request call. The problem that I see is : all the publish_requests are sent very quickly from python Natsclient to gNATSd and then to the C Natsclient, but the response takes pretty long. I am sure that the bottleneck here is somewhere in the subscribe and the response_handle part of the code and not in the C Natsclient that sends the responses. Here is the code snippet :

def connect(self):
    self.loop.run_until_complete(self.nc.connect(servers=["nats://localhost:4222"]))

def disconnect(self):
    return self.loop.run_until_complete(self.nc.close())

def send(self, request_number):
    self.loop.run_until_complete(self.pub())

def recv(self):
    self.loop.run_until_complete(self.sub())

def wait_for_msg(self):
    self.loop.run_until_complete(self.wait())

async def pub(self):
    await self.nc.publish_request(self.subject, self.reply, json.dumps(self.data1).encode())

async def sub(self):
    async def response_handle(msg):
        message = json.loads(msg.data.decode())
        self.msgs.append(message)
        #self.future.set_result(message)

    await self.nc.subscribe(self.reply, cb = response_handle)

async def wait(self):
    try:
        msg = await asyncio.wait_for(self.future, 1)
    except asyncio.TimeoutError:
        print("NATS: No more messages to receive exiting")

Those methods are being called in the following order :

def send_recv(self):
    self.nats.recv()
    for i in range(100):
        self.nats.send()
    self.nats.wait_for_msg()

def run(self):
    self.nats.connect()
    t = time.time()
    self.send_recv()
    self.nats.flush_nats()
    total_time = time.time() - t
    self.nats.print()
    print ("Time taken:", total_time)

Also, a general observation is that on increasing the number of response from 100 to say 10000, I see that the program exits after receiving only a few responses and after raising the exception "NATS: No more messages to receive exiting". However, if I raise the timeout value to a greater number here : await asyncio.wait_for(self.future, 1) I do get all the responses.

Could someone please suggest what can be done to reduce the time for processing all those requests.

Yield subscription if present in error handler for slow consumers

Currently in order to handle slow consumers in the error handlers it has to be done as follows:

   async def error_cb(e):
     if type(e) is nats.aio.errors.ErrSlowConsumer:
       print("Slow consumer error, unsubscribing from handling further messages...")
       await nc.unsubscribe(e.sid)

But we should change so that it is more similar to how the Go client works and also pass the subscription:

   async def error_cb(e, sub):
     if type(e) is nats.aio.errors.ErrSlowConsumer:
       print("Slow consumer error, unsubscribing from handling further messages...")
       await sub.unsubscribe()

Revise Python 3.7 support

At first glance it looks like example works but there might me few regressions... a list of a few TODOs:

Provide public max_payload

Hello,

following issue is a minor one. I've implemented a split algo when message payload are greater to the configured NATS max payload. So I need to get this max payload from asyncio_nats but this is currently a private field (_max_payload). Would be great if you provide this as a public one.

Thank you !

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.