Giter Club home page Giter Club logo

aioredis-py's Introduction

aioredis


๐Ÿ“ข๐Ÿšจ Aioredis is now in redis-py 4.2.0rc1+ ๐Ÿšจ๐Ÿšจ

Aioredis is now in redis-py 4.2.0rc1+

To install, just do pip install redis>=4.2.0rc1. The code is almost the exact same. You will just need to import like so:

from redis import asyncio as aioredis

This way you don't have to change all your code, just the imports.

https://github.com/redis/redis-py/releases/tag/v4.2.0rc1

Now that aioredis is under Redis officially, I hope there will never be an unmaintained, asyncio Redis lib in the Python ecosystem again. I will be helping out maintenance at Redis-py for the foreseeable future just to get some of the asyncio stuff out of the way. There are also some bugs that didn't make it into the PR that I'll be slowly migrating over throughout the next few weeks -- so long as my exams don't kill me beforehand :)

Thank you all so much for your commitment to this repository! Thank you so much to @abrookins @seandstewart @bmerry for all the commits and maintenance. And thank you to everyone here who has been adopting the new code base and squashing bugs. It's been an honor!

Cheers, Andrew


asyncio (3156) Redis client library.

The library is intended to provide simple and clear interface to Redis based on asyncio.

Features

Feature Supported
hiredis parser โœ…
Pure-python parser โœ…
Low-level & High-level APIs โœ…
Pipelining support โœ…
Multi/Exec support โœ…
Connections Pool โœ…
Pub/Sub support โœ…
Sentinel support โœ…
ACL support โœ…
Streams support โœ…
Redis Cluster support ๐Ÿšซ
Tested Python versions 3.6, 3.7, 3.8, 3.9, 3.10
Tested for Redis servers 5.0, 6.0
Support for dev Redis server through low-level API

Installation

The easiest way to install aioredis is by using the package on PyPi:

pip install aioredis

Recommended with hiredis for performance and stability reasons:

pip install hiredis

Requirements

  • Python 3.6+
  • hiredis (Optional but recommended)
  • async-timeout
  • typing-extensions

Benchmarks

Benchmarks can be found here: https://github.com/popravich/python-redis-benchmark

Contribute

Feel free to file an issue or make pull request if you find any bugs or have some suggestions for library improvement.

License

The aioredis is offered under a MIT License.

aioredis-py's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

aioredis-py's Issues

Transactions (multi_exec) do not honor encoding parameter

>>> __import__('sys').version
>>> loop = asyncio.get_event_loop()
'3.4.3 (default, Oct 30 2015, 20:31:57) \n[GCC 5.2.0]
>>> redis = aioredis.create_redis(('localhost', 6379), db=1, encoding='utf8', loop=loop)
>>> redis = loop.run_until_complete(redis)
>>> loop.run_until_complete(redis.lrange('test', 0, -1))
['ab', 'cd']  # strings
>>> tr = redis.multi_exec()
>>> tr.lrange('test', 0, -1)
>>> loop.run_until_complete(tr.execute())[0]
[b'ab', b'cd']  # bytes!

>>> tr = redis.multi_exec()
>>> tr.lrange('test', 0, -1, encoding='utf8')  # note the encoding
>>> loop.run_until_complete(tr.execute())[0]
[b'ab', b'cd']  # bytes!

aioredis scan command example

Hello,

Can you please show me how to use one of SCAN command with aioredis? I understand how use with simple redis and without asyncio, but here I can't figure out how to iterate over elements.
It would be very helpful.

Thank you!

make db and password args keyword-only

In functions create_connection, create_pool and create_redis
db and password arguments should be made keyword-only.
There should be only one positional argument -- address.
This will protect from specifying host/port address as separate arguments (not tuple):

# protect from this
yield from create_connection(host, port)

db index control

Exposing RedisConnection.execute method to end-user (as low-level interface ofcourse) makes it possible to change db index manually thus bypassing select method which stores db index in connection.

Problem becomes even more dramatic in case of RedisPool ex:

with (yield from pool) as conn:
    conn.select(other_db)
# connection released and points to `other_db`; not reusable.

In case of pool -- connection can be closed and dropped;
in case of .execute -- TBD (the only solution yet is to hard-code check for 'select' command)

Blocking operations (such as blpop) does not works properly with timeout parameter

If timeout is specified - it will be ignored and, as a result, - the coroutine potentially will never be done.

Example:

await redis.blpop('non existent list', 10)

What should be done:
Coroutine should be unblocked after 10 seconds if list is empty or does not exists.

What really happens:
Coroutine continues to wait for list creation or first element.

Better report about connection problems

Trying to connect to non-existing host I get the following traceback:

Traceback (most recent call last):
  File "/usr/local/bin/controller", line 9, in <module>
    load_entry_point('job-router==2.7.0', 'console_scripts', 'controller')()
  File "/usr/local/lib/python3.4/site-packages/job_router/run_controller.py", line 44, in main
    loop=loop))
  File "/usr/local/lib/python3.4/asyncio/base_events.py", line 316, in run_until_complete
    return future.result()
  File "/usr/local/lib/python3.4/asyncio/futures.py", line 275, in result
    raise self._exception
  File "/usr/local/lib/python3.4/asyncio/tasks.py", line 238, in _step
    result = next(coro)
  File "/usr/local/lib/python3.4/site-packages/aioredis/pool.py", line 26, in create_pool
    yield from pool._fill_free(override_min=False)
  File "/usr/local/lib/python3.4/site-packages/aioredis/pool.py", line 154, in _fill_free
    conn = yield from self._create_new_connection()
  File "/usr/local/lib/python3.4/site-packages/aioredis/commands/__init__.py", line 139, in create_redis
    loop=loop)
  File "/usr/local/lib/python3.4/site-packages/aioredis/connection.py", line 53, in create_connection
    host, port, loop=loop)
  File "/usr/local/lib/python3.4/asyncio/streams.py", line 63, in open_connection
    lambda: protocol, host, port, **kwds)
  File "/usr/local/lib/python3.4/asyncio/base_events.py", line 581, in create_connection
    infos = f1.result()
  File "/usr/local/lib/python3.4/asyncio/futures.py", line 275, in result
    raise self._exception
  File "/usr/local/lib/python3.4/concurrent/futures/thread.py", line 54, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/usr/local/lib/python3.4/socket.py", line 533, in getaddrinfo
    for res in _socket.getaddrinfo(host, port, family, type, proto, flags):
socket.gaierror: [Errno -2] Name or service not known

socket.gaierror: [Errno -2] Name or service not known is not very handy, adding host/port pair to error message (like https://github.com/KeepSafe/aiohttp/blob/master/aiohttp/connector.py#L295-L299) would be useful.

Type convertors

I don't sure that you need to use converters in low level API -- that may clash with high level one.
I'm +0 for removing conversions but the final decision if up to you, sure.

Add wait_closed coroutine

For connection, pool and commands.Redis

.wait_closed() should wait for finishing RedisConnection._reader_task.
conn._reader_task.cancel() schedules task cancellation but asyncio needs eventloop iteration for actual cancelling.

Pub/Sub mode

This must be implemented in connection.

TODO:

  • add separate methods for subscribing to channels/patterns;
  • mark connection as "in subscribe" mode;
  • add check to execute method that connection is not "in subscribe" mode;
  • check if connection "in subscribe" in _read_data method and start other reader task;
  • create "other reader task" (need better name) that will wait for messages (similar way as asyncio.as_completed();

result must be something like:

ok = yield from conn.subscribe('channel:1', 'channel:2')
for waiter in conn.wait_messages(): # (this is the "other reader task" that need a good name)
    data = yield from waiter
    # do something

As for high-level part Redis docs have a good hint: http://redis.io/topics/pubsub (Client library implementation hints)

How to help?

I want to help this project but I don't know what are the features that you've in mind! There is some way to we talk or list that describe what must be done?

Redis connection hangs in unusuable state

Traceback is following on each request to redis:

    if (yield from self.redis.exists(self._result_key())):
  File "/lib/python3.4/site-packages/aioredis/util.py", line 50, in wait_convert
    result = yield from fut
  File "/lib/python3.4/site-packages/aioredis/commands/__init__.py", line 35, in execute
    return (yield from conn.execute(*args, **kwargs))
  File "/nix/store/ymsrnz9fkcj4lcmyln5by9mab5r0h3zl-python3-3.4.2/lib/python3.4/asyncio/futures.py", line 388, in __iter__
    yield self  # This tells Task to wait for completion.
  File "/nix/store/ymsrnz9fkcj4lcmyln5by9mab5r0h3zl-python3-3.4.2/lib/python3.4/asyncio/tasks.py", line 285, in _wakeup
    value = future.result()
  File "/nix/store/ymsrnz9fkcj4lcmyln5by9mab5r0h3zl-python3-3.4.2/lib/python3.4/asyncio/futures.py", line 269, in result
    raise CancelledError
concurrent.futures._base.CancelledError

Version: v0.1.4-18-ga5b75d4
Redis instance is created using create_reconnecting_redis

strace shows that reports are sent, and replied well. And no reconnections are going in-between.

Get rid of `Connection.connect` method

Connection should be created with create_connection call.
Make Connection.__init__ method accept reader & writer streams
(or set them in some other method).

The idea behind Connection.connect was to be able to reconnect if connection is lost
but the code looks messy to me and reconnection process as a whole yet unclear.

Reconnection mechanism will be implemented in Pool

Transactions and not transaction commands in one connection

On a highly-loaded process with single connection I've got an AssertionError fromMultiExec._do_execute`.

assert len(results) == len(waiters), (results, waiters)
# results, waiters looked like: 
([('OK', 0)], [])

It seems, that when I use multi like this:

with (yield from self.tr_lock):  # asyncio.Lock                            
     tr = self.redis.multi_exec()                             
     tr.hmset(self.data_key, *chunk_data)        
     tr.zadd(self.chunks_key, *items)            
     yield from tr.execute()

other green threads somehow can add other commands to this connection.
Is it possible for reconnecting redis?

self.redis = yield from aioredis.create_reconnecting_redis(...)

Should I always use separate redis connection for transaction-only interactions?
By the way, the whole application freezes until Ctrl+C pressed.

response decoding

  1. Implement connection level optional encoding parameter (in RedisConnection) by-passed to hiredis parser.
  2. Implement per-command optional encoding parameter (basically needed if working without connection level encoding)

Must check pros&cons for both points;

wait_ok always failed when connection's encoding is used

_testutil.py

class RedisEncodingTest(BaseTest):
    def setUp(self):
        super().setUp()
        self.redis = self.loop.run_until_complete(self.create_redis(
            ('localhost', self.redis_port), loop=self.loop, encoding='utf-8'))

    def tearDown(self):
        del self.redis
        super().tearDown()

string_commands_test.py

class StringCommandsEncodingTest(RedisEncodingTest):
    @run_until_complete
    def test_set(self):
        ok = yield from self.redis.set('my-key', 'value')
        self.assertTrue(ok)

        with self.assertRaises(TypeError):
            yield from self.redis.set(None, 'value')

util.py

@asyncio.coroutine
def wait_ok(fut):
    res = yield from fut
    if res == b'QUEUED':
        return res
    return res == b'OK'

In wait_ok, fut's result is 'OK', not b'OK'.
so res == b'OK' is always False
maybe res == b'QUEUED' is always False

Multi/Exec bug

on one connection the following causes ERR Multi calls can not be nested:

redis = yield from create_redis()

def task():
    tr = redis.multi_exec()
    tr.set('key', 'abc')
    yield from tr.execute()

yield from asyncio.gather(task(), task())

RedisConnection.execute should raise custom exception when server is unreachable.

Currently, RediscConnection.execute asserting live connection: https://github.com/aio-libs/aioredis/blob/master/aioredis/connection.py#L211

As a result, if the connection to server lost (server restarted, or network lost), AssertionError will be raised.
This leads to requirement to catch AssertionError if client would like to reconnect to server, and this is not correct way to catch this - more appropriate way would be to raise custom exception here and catch this specific exception in the client.

Losing pub/sub messages while using channel.wait_message()

When I use channel.wait_message() I'm losing pub/sub messages.

It's easy to replicate this bug. Run in one terminal window subscribing script:

import aioredis
import asyncio

@asyncio.coroutine
def main():
    red = yield from aioredis.create_redis(('localhost', 6379))
    channel, = yield from red.subscribe('foo')
    num = 0
    while (yield from channel.wait_message()):
        msg = yield from channel.get()
        print(num, msg)
        num += 1

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

In second one terminal window run publishing script:

import aioredis
import asyncio

@asyncio.coroutine
def main():
    red = yield from aioredis.create_redis(('localhost', 6379))
    for i in range(100000):
        yield from red.publish('foo', i)
        print(i)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

Publishing script will finish with 99999 but subscribing script with much less number (e.g. 80340).

If you change while (yield from channel.wait_message()): to while True: in subscribing script it will finish with 99999 without any problem.

Python 3.4.3
OS X 10.10.3

pubsub docs problems

There are a few problems with the pub/sub docs:

  • the example doesn't run, not even nearly. I guess it should be more like the example in the examples directory.
  • asyncio.async is depreciated in favour of asyncio.ensure_future
  • async_reader2 is defined by never use, i guess the last line should be using it.

asyncio.async Deprecated Warning.

Under Python 3.5, asyncio.async is deprecated. If any codes use asyncio.async, a DeprecatedWarning will be printed.

/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py:509: DeprecationWarning: asyncio.async() function is deprecated, use ensure_future()

Python 3.5 suggests that asyncio.async should be replaced by asyncio.ensure_future.

One possible solution:

try:
    from asyncio import ensure_future as asyncio_ensure_future
except ImportError:
    from asyncio import async as asyncio_ensure_future

Then, use asyncio_ensure_future to replace all the asyncio.async in the codes.

If several tasks call Pool.acquire simultaneously, task hang occurs randomly.

Assume that there are task1, task2, task3 (Let's call that T1, T2, T3. and they use RedisPool) and self.minsize=1, self.maxsize=2 of Pool. (self.freesize = 1, self.size = 1)

While T1 acquired connection, T2 requests a connection to pool. (self.freesize = 0, self.size = 1)

    # Pool.acquire() -> Pool._fill_free()
    @asyncio.coroutine
    def _fill_free(self):
        while self.freesize < self.minsize and self.size < self.maxsize:
            conn = yield from self._create_new_connection()     <------------ (1) T2
            yield from self._pool.put(conn)

While pending T2 at (1), T3 requests a connection to pool and suspends at (1) and T1 release a connection. (self.freesize = 1, self.size = 1)
And then, T2 resume, acquire and release a connection (self.freesize = 2, self.size = 2)

    # Pool.acquire() -> Pool._fill_free()
    @asyncio.coroutine
    def _fill_free(self):
        while self.freesize < self.minsize and self.size < self.maxsize:
            conn = yield from self._create_new_connection()     <------------ (1) T3
            yield from self._pool.put(conn)                     

If T3 resumes, T3 tries to put a new connection to pool. However, T3 hangs until some task acquire connection, because it exceeds Maxsize of Pool.

Cancellation of connector's reader_task

You don't need to check for self._reader_task.done()
.cancel() does it itself and returns True if task is scheduled to cancellation, False if the task is already cancelled.
So just

self._reader_task.cancel()
self._reader_task = None

is enough.

Moreover, IIRC you can trap in the situation when task has been called and scheduled for cancellation but still had no chance to execute itself -- it's, say, scheduled for execution in next event loop iteration.
In this case task still is not done -- only scheduled to be done.

Pipelining

get rid of _writer.drain and make execute return Future

Handling of None in commands

I get the following exception:

  File "/work/grinder/web/query.py", line 45, in set_task
    yield from self.redis.set(key, value)
  File "/nix/store/a46yqr9lzi4ggk3iijm819c0a9hv8hb7-python3-3.4.1/lib/python3.4/asyncio/futures.py", line 348, in __iter__
    yield self  # This tells Task to wait for completion.
  File "/nix/store/a46yqr9lzi4ggk3iijm819c0a9hv8hb7-python3-3.4.1/lib/python3.4/asyncio/tasks.py", line 370, in _wakeup
    value = future.result()
  File "/nix/store/a46yqr9lzi4ggk3iijm819c0a9hv8hb7-python3-3.4.1/lib/python3.4/asyncio/futures.py", line 243, in result
    raise self._exception
aioredis.errors.ReplyError: ERR Protocol error: invalid bulk length

When the value is None. This is what iis going through the network:

"*3\r\n$3\r\nSET\r\n$53\r\nsqltask-d54bd60ee28f207c22d032452a51ad48094aa510-task\r\n$-1\r\n"

And it clobbers the connection.

Commands Mixins Implementation status

Implementation status:

In python3.4.1 all tests cause <Task pending _read_data() > warning.

Recently I've updated my python to 3.4.1 (debian testing switched to this version), as result every single test in aioredis causes following warning (however no warning with python3.4):

Task was destroyed but it is pending!
task: <Task pending _read_data() at /home/nick/sources/python/aioredis/aioredis/connection.py:78 wait_for=<Future pending cb=[Task._wakeup()]>>

I tried to run tests with PYTHONASYNCIODEBUG=1 and got following exception:

 pending _read_data() at /home/nick/sources/python/aioredis/aioredis/connection.py:78 wait_for=<Future pending cb=[Task._wakeup()]>>
Task was destroyed but it is pending!
source_traceback: Object created at (most recent call last):
  File "runtests.py", line 290, in <module>
    runtests()
  File "runtests.py", line 273, in runtests
    failfast=failfast).run(tests)
  File "/usr/lib/python3.4/unittest/runner.py", line 168, in run
    test(result)
  File "/usr/lib/python3.4/unittest/suite.py", line 87, in __call__
    return self.run(*args, **kwds)
  File "/usr/lib/python3.4/unittest/suite.py", line 125, in run
    test(result)
  File "/usr/lib/python3.4/unittest/case.py", line 625, in __call__
    return self.run(*args, **kwds)
  File "/usr/lib/python3.4/unittest/case.py", line 577, in run
    testMethod()
  File "tests/connection_test.py", line 35, in test_global_loop
    ('localhost', self.redis_port), db=0))
  File "/usr/lib/python3.4/asyncio/base_events.py", line 239, in run_until_complete
    self.run_forever()
  File "/usr/lib/python3.4/asyncio/base_events.py", line 212, in run_forever
    self._run_once()
  File "/usr/lib/python3.4/asyncio/base_events.py", line 912, in _run_once
    handle._run()
  File "/usr/lib/python3.4/asyncio/events.py", line 96, in _run
    self._callback(*self._args)
  File "/usr/lib/python3.4/asyncio/tasks.py", line 298, in _wakeup
    self._step(value, None)
  File "/usr/lib/python3.4/asyncio/tasks.py", line 244, in _step
    result = next(coro)
  File "/usr/lib/python3.4/asyncio/coroutines.py", line 78, in __next__
    return next(self.gen)
  File "/home/nick/sources/python/aioredis/aioredis/connection.py", line 42, in create_connection
    conn = RedisConnection(reader, writer, encoding=encoding, loop=loop)
  File "/home/nick/sources/python/aioredis/aioredis/connection.py", line 63, in __init__
    self._reader_task = asyncio.Task(self._read_data(), loop=self._loop)
task: <Task pending _read_data() at /home/nick/sources/python/aioredis/aioredis/connection.py:78 wait_for=<Future pending cb=[Task._wakeup()]>>
make: *** [test] Error

Any idea how to mute this warning?

*SCAN commands family implementation

Here is my thoughts regarding cursor for those commands:
wrap response of this commands in a subclass of namedtuple providing raw cursor value
and data itself. But also implementing context manager interface allowing to iterate over it. something like:

with (yield from redis.scan(0)) as cursor_context:
    while cursor_context:
        raw_cursor, response = yield from cursor_context

and simple usage

cur, resp = yield from redis.scan(0)
while cur != 0:
    cur, resp = yield from redis.scan(cur)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.