Giter Club home page Giter Club logo

asynch's Issues

Not recovering from interrupted database connection while query is executing

If the connection to the ClickHouse database is interrupted while a query is executing, asynch is unable to recover from it. Any subsequent queries fail with the following exception:

ProgrammingError: some records have not been fetched. fetch the remaining records before executing the next query

It looks like this is because the is_query_executing attribute on the Connection remains True if the end of stream packet was missed.

I'm not even sure how to work around this. I've had to restart my application to recover from this, which is far from ideal.

Connection Pool thread safety, secure connection, SSL, gather?

Hello @long2ice and contributors,
I am implementing connection pool in my Flask application. I am inserting a lot of files into Clickhouse at one time and wait for another long time. So I create a connection pool in each request and close it when the request is done.

My Flask app is running in Gunicorn with multiple threads.

However, i am getting an error like:

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/asyncio/sslproto.py", line 685, in _process_write_backlog
    self._transport.write(chunk)
  File "/usr/local/lib/python3.8/asyncio/selector_events.py", line 916, in write
    self._fatal_error(exc, 'Fatal write error on socket transport')
  File "/usr/local/lib/python3.8/asyncio/selector_events.py", line 711, in _fatal_error
    self._force_close(exc)
  File "/usr/local/lib/python3.8/asyncio/selector_events.py", line 723, in _force_close
    self._loop.call_soon(self._call_connection_lost, exc)
  File "/usr/local/lib/python3.8/asyncio/base_events.py", line 719, in call_soon
    self._check_closed()
  File "/usr/local/lib/python3.8/asyncio/base_events.py", line 508, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
2023-11-17 08:27:46,675 - ERROR - Fatal error on SSL transport
protocol: <asyncio.sslproto.SSLProtocol object at 0x4036c5c8b0>
transport: <_SelectorSocketTransport closing fd=26>
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/asyncio/selector_events.py", line 910, in write
    n = self._sock.send(data)
OSError: [Errno 9] Bad file descriptor

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/asyncio/sslproto.py", line 685, in _process_write_backlog
    self._transport.write(chunk)
  File "/usr/local/lib/python3.8/asyncio/selector_events.py", line 916, in write
    self._fatal_error(exc, 'Fatal write error on socket transport')
  File "/usr/local/lib/python3.8/asyncio/selector_events.py", line 711, in _fatal_error
    self._force_close(exc)
  File "/usr/local/lib/python3.8/asyncio/selector_events.py", line 723, in _force_close
    self._loop.call_soon(self._call_connection_lost, exc)
  File "/usr/local/lib/python3.8/asyncio/base_events.py", line 719, in call_soon
    self._check_closed()
  File "/usr/local/lib/python3.8/asyncio/base_events.py", line 508, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed

I only get this request from the second request and onwards (not on the first request). I am wondering if the problem could be related to something with thread-safety, SSL connections, or perhaps use of asyncio.gather with the connection pool

My route is like this:

@some_route("/route_name", methods=[POST])
async def insert():
    # some inits here
    list_of_files = [some list of files]
    async with create_pool(minSize=50, maxSize=100, host=host, port=port, user=username, password=pw, secure=True) as pool:
        tasks = []
        for files in list_of_files:
            tasks.append(execute_with_pool(pool, file))
        responses = await asyncio.gather(*tasks)
        response_list.append(responses)
    
    pool.close()
    await pool.wait_closed()
    return response_list
    
    async def execute_with_pool(pool, file):
        cmd = # I create some CH command with the filename
        try:
            async with pool.acquire() as conn:
                 async with conn.cursor() as cursor:
                    response = await cursor.execute(cmd)
                 return Response(response)
        except:
               # some exception handling

Is this an incorrect way of using the connection pool?

Using python3.8, linux OS, asynch=0.2.2

A issue with the Date type

Python: 3.12.0
ashynch: 0.2.3
ClickHouse: 23.8.5.16

Code

import asyncio

from asynch import connect


async def connect_database():
    conn = await connect("")
    async with conn.cursor() as cursor:
        await cursor.execute("""select makeDate(2149, 06, 06);""")
        ret = await cursor.fetchall()

asyncio.run(connect_database())

Traceback

Traceback (most recent call last):
  File "C:\Users\Cortel\Projects\Useless\test_asynch_date_issue\main.py", line 12, in <module>
    asyncio.run(connect_database())
  File "C:\Users\Cortel\AppData\Local\Programs\Python\Python312\Lib\asyncio\runners.py", line 194, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "C:\Users\Cortel\AppData\Local\Programs\Python\Python312\Lib\asyncio\runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Cortel\AppData\Local\Programs\Python\Python312\Lib\asyncio\base_events.py", line 664, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "C:\Users\Cortel\Projects\Useless\test_asynch_date_issue\main.py", line 9, in connect_database
    await cursor.execute("""select makeDate(2149, 06, 06);""")
  File "C:\Users\Cortel\AppData\Local\pypoetry\Cache\virtualenvs\test-asynch-date-issue-CodK8wwF-py3.12\Lib\site-packages\asynch\cursors.py", line 60, in execute
    response = await execute(query, args=args, with_column_types=True, **execute_kwargs)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Cortel\AppData\Local\pypoetry\Cache\virtualenvs\test-asynch-date-issue-CodK8wwF-py3.12\Lib\site-packages\asynch\proto\connection.py", line 633, in execute
    async with ExecuteContext(self, query, settings):
  File "C:\Users\Cortel\AppData\Local\pypoetry\Cache\virtualenvs\test-asynch-date-issue-CodK8wwF-py3.12\Lib\site-packages\asynch\proto\context.py", line 56, in __aexit__
    raise exc_val
  File "C:\Users\Cortel\AppData\Local\pypoetry\Cache\virtualenvs\test-asynch-date-issue-CodK8wwF-py3.12\Lib\site-packages\asynch\proto\connection.py", line 648, in execute
    rv = await self.process_ordinary_query(
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Cortel\AppData\Local\pypoetry\Cache\virtualenvs\test-asynch-date-issue-CodK8wwF-py3.12\Lib\site-packages\asynch\proto\connection.py", line 775, in process_ordinary_query
    return await self.receive_result(with_column_types=with_column_types, columnar=columnar)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Cortel\AppData\Local\pypoetry\Cache\virtualenvs\test-asynch-date-issue-CodK8wwF-py3.12\Lib\site-packages\asynch\proto\connection.py", line 498, in receive_result
    return await result.get_result()
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Cortel\AppData\Local\pypoetry\Cache\virtualenvs\test-asynch-date-issue-CodK8wwF-py3.12\Lib\site-packages\asynch\proto\result.py", line 56, in get_result
    async for packet in self.packet_generator:
  File "C:\Users\Cortel\AppData\Local\pypoetry\Cache\virtualenvs\test-asynch-date-issue-CodK8wwF-py3.12\Lib\site-packages\asynch\proto\connection.py", line 477, in packet_generator
    packet = await self.receive_packet()
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Cortel\AppData\Local\pypoetry\Cache\virtualenvs\test-asynch-date-issue-CodK8wwF-py3.12\Lib\site-packages\asynch\proto\connection.py", line 403, in receive_packet
    packet = await self._receive_packet()
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Cortel\AppData\Local\pypoetry\Cache\virtualenvs\test-asynch-date-issue-CodK8wwF-py3.12\Lib\site-packages\asynch\proto\connection.py", line 437, in _receive_packet
    packet.block = await self.receive_data()
                   ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Cortel\AppData\Local\pypoetry\Cache\virtualenvs\test-asynch-date-issue-CodK8wwF-py3.12\Lib\site-packages\asynch\proto\connection.py", line 340, in receive_data
    return await (self.block_reader_raw if raw else self.block_reader).read()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Cortel\AppData\Local\pypoetry\Cache\virtualenvs\test-asynch-date-issue-CodK8wwF-py3.12\Lib\site-packages\asynch\proto\streams\block.py", line 82, in read
    column = await read_column(
             ^^^^^^^^^^^^^^^^^^
  File "C:\Users\Cortel\AppData\Local\pypoetry\Cache\virtualenvs\test-asynch-date-issue-CodK8wwF-py3.12\Lib\site-packages\asynch\proto\columns\__init__.py", line 157, in read_column
    return await column.read_data(
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Cortel\AppData\Local\pypoetry\Cache\virtualenvs\test-asynch-date-issue-CodK8wwF-py3.12\Lib\site-packages\asynch\proto\columns\base.py", line 112, in read_data
    return await self._read_data(n_items, nulls_map=nulls_map)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Cortel\AppData\Local\pypoetry\Cache\virtualenvs\test-asynch-date-issue-CodK8wwF-py3.12\Lib\site-packages\asynch\proto\columns\base.py", line 120, in _read_data
    return self.after_read_items(items, nulls_map)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Cortel\AppData\Local\pypoetry\Cache\virtualenvs\test-asynch-date-issue-CodK8wwF-py3.12\Lib\site-packages\asynch\proto\columns\datecolumn.py", line 44, in after_read_items
    return tuple(date_lut[item] for item in items)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Cortel\AppData\Local\pypoetry\Cache\virtualenvs\test-asynch-date-issue-CodK8wwF-py3.12\Lib\site-packages\asynch\proto\columns\datecolumn.py", line 44, in <genexpr>
    return tuple(date_lut[item] for item in items)
                 ~~~~~~~~^^^^^^
KeyError: 65535

Connection hangs up

I have a Python program with asynch library. I insert the data that I cannot validate beforehand, and because of that, we can insert UInt64 data into UInt32 field and that returns

Code: 53. Type mismatch in VALUES section. Repeat query with types_check=True for detailed info. Column ie: argument out of range

I handle that error and try to insert the data once again, but Clickhouse hangs up the connection with a new error: Code: 33. DB::Exception: Cannot read all data in NativeBlockInputStream.

Release: v21.9.4.35-stable
Environment: Docker

How to reproduce

  • Clickhouse version: version 21.9.4 revision 54449
  • Python 3.8 with asynch library
  • CREATE TABLE IF NOT EXISTS test(data UInt32) ENGINE = MergeTree() ORDER BY(data) PRIMARY KEY (data) PARTITION BY (data)
  • [(429496729500,)] + [(x,) for x in range(100)]
  • INSERT INTO test(data) VALUES

Expected behavior

Successful insertion after invalid data removal

Error message and/or stacktrace

>> <Error> TCPHandler: Code: 33. DB::Exception: Cannot read all data in NativeBlockInputStream. Rows read: 0. Rows expected: 100. (CANNOT_READ_ALL_DATA), Stack trace (when copying this message, always include the lines below):
> 
> 0. DB::Exception::Exception(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, int, bool) @ 0x936a17a in /usr/bin/clickhouse
> 1. DB::NativeBlockInputStream::readData(DB::IDataType const&, COW<DB::IColumn>::immutable_ptr<DB::IColumn>&, DB::ReadBuffer&, unsigned long, double) @ 0x10d49083 in /usr/bin/clickhouse
> 2. DB::NativeBlockInputStream::readImpl() @ 0x10d49ca2 in /usr/bin/clickhouse
> 3. DB::IBlockInputStream::read() @ 0x104c3a86 in /usr/bin/clickhouse
> 4. DB::TCPHandler::receiveData(bool) @ 0x117b6ee2 in /usr/bin/clickhouse
> 5. DB::TCPHandler::receivePacket() @ 0x117ac3ab in /usr/bin/clickhouse
> 6. DB::TCPHandler::readDataNext() @ 0x117ae56f in /usr/bin/clickhouse
> 7. DB::TCPHandler::processInsertQuery() @ 0x117ac908 in /usr/bin/clickhouse
> 8. DB::TCPHandler::runImpl() @ 0x117a5dff in /usr/bin/clickhouse
> 9. DB::TCPHandler::run() @ 0x117b8519 in /usr/bin/clickhouse
> 10. Poco::Net::TCPServerConnection::start() @ 0x1437788f in /usr/bin/clickhouse
> 11. Poco::Net::TCPServerDispatcher::run() @ 0x1437931a in /usr/bin/clickhouse
> 12. Poco::PooledThread::run() @ 0x144ac199 in /usr/bin/clickhouse
> 13. Poco::ThreadImpl::runnableEntry(void*) @ 0x144a842a in /usr/bin/clickhouse
> 14. start_thread @ 0x9609 in /usr/lib/x86_64-linux-gnu/libpthread-2.31.so
> 15. clone @ 0x122293 in /usr/lib/x86_64-linux-gnu/libc-2.31.so``

Additional context

Here is the code that you can use to reproduce that error:

import asyncio

from asynch.errors import TypeMismatchError
from asynch import create_pool


async def execute_query(connection_pool, query, values=None):
    async with connection_pool.acquire() as connection:
        async with connection.cursor() as cursor:
            if values is not None:
                await cursor.execute(query, values)
            else:
                await cursor.execute(query)


async def main():
    connection_pool = await create_pool(
        host='127.0.0.1',
        port='9000',
        database='default',
        user='default',
        password='',
        maxsize=30,
    )

    await execute_query(
        connection_pool=connection_pool,
        query="""
        CREATE TABLE IF NOT EXISTS test(data UInt32) ENGINE = MergeTree()
        ORDER BY(data) PRIMARY KEY (data) PARTITION BY (data)
        """,
    )

    invalid_values = [(429496729500,)] + [(x,) for x in range(100)]

    try:
        await execute_query(
            connection_pool=connection_pool,
            query="INSERT INTO test(data) VALUES",
            values=invalid_values,
        )
    except TypeMismatchError as exc:
        print(exc)
        del invalid_values[0]
        await execute_query(
            connection_pool=connection_pool,
            query="INSERT INTO test(data) VALUES",
            values=invalid_values,
        )


if __name__ == '__main__':
    event_loop = asyncio.get_event_loop()
    event_loop.create_task(main())
    event_loop.run_forever()

lowcardinality column

Error reading low cardinality columns

│ os │ LowCardinality(String) │

~/.pyenv/versions/3.8.6/lib/python3.8/site-packages/asynch/proto/columns/lowcardinalitycolumn.py in __init__(self, nested_column, **kwargs)
     31     def __init__(self, nested_column, **kwargs):
     32         self.nested_column = nested_column
---> 33         super(LowCardinalityColumn, self).__init__(**kwargs)
     34 
     35     async def read_state_prefix(self,):

TypeError: __init__() missing 2 required positional arguments: 'reader' and 'writer'

Query execution statistics are protected attributes

I'm using the the last_query parameter on the clickhouse_driver.Client which exposes some useful statistics about the performed queries (see docs).

I have noticed that this could also be available via asynch.Connection._connection.last_query.

Is it possible / planned to make it public?

Exception

Clickhouse version: 21.3
Code:

async with self._pool.acquire() as conn:
    async with conn.cursor() as cursor:
        rc = await cursor.execute('INSERT INTO test VALUES', items)

Traceback

File "/usr/local/lib/python3.8/site-packages/ptms/event_combiner/writer.py", line 30, in write
    rc = await cursor.execute(self._query, self._items)
  File "/usr/local/lib/python3.8/site-packages/asynch/cursors.py", line 61, in execute
    response = await execute(query, args=args, with_column_types=True, **execute_kwargs)
  File "/usr/local/lib/python3.8/site-packages/asynch/proto/connection.py", line 526, in execute
    await self.force_connect()
  File "/usr/local/lib/python3.8/site-packages/asynch/proto/connection.py", line 575, in force_connect
    elif not await self.ping():
  File "/usr/local/lib/python3.8/site-packages/asynch/proto/connection.py", line 246, in ping
    packet_type = await self.reader.read_varint()
  File "/usr/local/lib/python3.8/site-packages/asynch/proto/io.py", line 117, in read_varint
    packet = self._read_one()
  File "/usr/local/lib/python3.8/site-packages/asynch/proto/io.py", line 107, in _read_one
    packet = self.buffer[self.position]
IndexError: bytearray index out of range

pypi wheels

Thanks for asynch !

Do you plan to publish wheels to pypi for asynch ?

Thanks

Compression still not working for 2.1.0 - 2.2.0

Hi! The compression does not work for 2.1.0

The reason is that buffer in BaseCompressor.writer.buffer is not cleared in get_compressed_data(). So the data can be compressed only one time per connection, second call will produce corrupted data.

I created a local fix, but not sure if it is the best way:

from asynch.proto.compression import BaseCompressor

origin_get_compressed_data = BaseCompressor.get_compressed_data

async def get_compressed_data(self: BaseCompressor, extra_header_size: int) -> bytearray:
    result = await origin_get_compressed_data(self, extra_header_size)
    self.writer.buffer = bytearray()
    return result

BaseCompressor.get_compressed_data = get_compressed_data

Maybe it is better to call clear() for buffer in CompressedBlockWriter.get_compressed() or refactor this code another way.

Why aren't the connections to Clickhouse closed?

Hi. I have this code.

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
import sentry_sdk
from sentry_sdk.integrations.asgi import SentryAsgiMiddleware
import uuid
from asynch.cursors import DictCursor
import motor.motor_asyncio
from settings import MONGODB_USER, MONGODB_PASS, MONGODB_PORT, MONGODB_HOST, MONGODB_NAME, CLICKHOUSE_HOST, \
    CLICKHOUSE_USER, CLICKHOUSE_PASS, CLICKHOUSE_PORT, CLICKHOUSE_NAME
from asynch import connect

MONGO_DETAILS = f"mongodb://{MONGODB_USER}:{MONGODB_PASS}@{MONGODB_HOST}:{MONGODB_PORT}"
client = motor.motor_asyncio.AsyncIOMotorClient(MONGO_DETAILS)
db = client[MONGODB_NAME]


async def connect_database_click():
    return await connect(
        host=CLICKHOUSE_HOST,
        port=CLICKHOUSE_PORT,
        database=CLICKHOUSE_NAME,
        user=CLICKHOUSE_USER,
        password=CLICKHOUSE_PASS,
    )


async def insert_1_events(params, db_clickhouse):
    async with db_clickhouse.cursor(cursor=DictCursor) as cursor:
        await cursor.execute(
            f'INSERT INTO events1 '
            f'(id,user_id,data VALUES',
            params
        )


async def insert_2_events(params, db_clickhouse):
    async with db_clickhouse.cursor(cursor=DictCursor) as cursor:
        await cursor.execute(
            f'INSERT INTO events2 '
            f'(id,user_id,data VALUES',
            params
        )


async def insert_3_events(params, db_clickhouse):
    async with db_clickhouse.cursor(cursor=DictCursor) as cursor:
        await cursor.execute(
            f'INSERT INTO events3 '
            f'(id,user_id,data VALUES',
            params
        )


async def insert_4_events(params, db_clickhouse):
    async with db_clickhouse.cursor(cursor=DictCursor) as cursor:
        await cursor.execute(
            f'INSERT INTO events4 '
            f'(id,user_id,data VALUES',
            params
        )


class EventScheme(BaseModel):
    user_id: Optional[str]
    data: Optional[dict]


app = FastAPI()
sentry_sdk.init(dsn="https://*******@sny.*****.com/14")


@app.get('/sentry-debug')
async def trigger_error():
    return 1 / 0


@app.get('/ping')
async def ping():
    return {"result": True}


@app.post("/save-event")
async def save_event(event: EventScheme):
    db_clickhouse = await connect_database_click()

    data = event.dict()
    del data['token']
    params = [{
        'id': str(uuid.uuid4()),
        "user_id": data.get('user_id'),

    }]

    if data.get('name') == 'ping':
        params['data'] = data.get('data_ping')
        await insert_1_events(params, db_clickhouse)
    elif data.get('name') == 'first_run':
        params['data'] = data.get('data_first_run')
        await insert_2_events(params, db_clickhouse)
    elif data.get('name') == 'first':
        params['data'] = data.get('data_first')
        await insert_3_events(params, db_clickhouse)
    elif data.get('name') == 'first':
        params['data'] = data.get('data_first')
        await insert_4_events(params, db_clickhouse)
    else:
        print('NONE EVENTS')

    await db_clickhouse.close()
    # пихаем в монгу
    try:
        await db.events.insert_one(data)
    except Exception as e:
        print(e)
    return {'success': True}


app = SentryAsgiMiddleware(app)

It works fine, but for some reason it doesn't close the clickhouse connections. Why is this happening? It gives 200 for all requests until the number of connections exceeds the limit, then an error occurs:

DB::Exception: Too many simultaneous queries. Maximum: 100

Streaming queries fail when user is readonly type 1

I'm using a database where I don't have access to the user settings. The provided user has a readonly profile with the <readonly>1</readonly> setting.
I'm using a Cursor with _stream_results set to True and not modifying the default _max_row_buffer value.
When executing the query, the server returns this error:

Code: 164.
DB::Exception: Cannot modify 'max_block_size' setting in readonly mode

One solution would be change the user readonly setting to <readonly>2</readonly>, which allows using settings, but I can't do that. The other is simply not sending settings with the query.
Looking at the source of Cursor._prepare, it always sets the "max_block_size" key in the settings when _stream_results is True. When I commented this line and tried again, the query executed without issues.

I'm wondering if there's a way to avoid using settings by passing a parameter or maybe reading the connection string, since it's something directly related to the user used in it. Clickhouse used to have the ?readonly=2 connection parameter, but I think it was removed some versions ago.

facing problem with installing asynch

Hi, I'm installing asynch on my system but facing this below error
I tried googling and checked stackoverflow
can someone help with this?

Using cached pytz_deprecation_shim-0.1.0.post0-py2.py3-none-any.whl (15 kB)
Collecting tzdata
Using cached tzdata-2022.1-py2.py3-none-any.whl (339 kB)
Building wheels for collected packages: ciso8601
Building wheel for ciso8601 (pyproject.toml) ... error
error: subprocess-exited-with-error

× Building wheel for ciso8601 (pyproject.toml) did not run successfully.
│ exit code: 1
╰─> [17 lines of output]
running bdist_wheel
running build
running build_py
package init file 'ciso8601_init_.py' not found (or not a regular file)
creating build
creating build\lib.win-amd64-cpython-39
creating build\lib.win-amd64-cpython-39\ciso8601
copying ciso8601_init_.pyi -> build\lib.win-amd64-cpython-39\ciso8601
copying ciso8601\py.typed -> build\lib.win-amd64-cpython-39\ciso8601
running build_ext
building 'ciso8601' extension
creating build\temp.win-amd64-cpython-39
creating build\temp.win-amd64-cpython-39\Release
"C:\Program Files (x86)\Microsoft Visual Studio 14.0\VC\BIN\x86_amd64\cl.exe" /c /nologo /O2 /W3 /GL /DNDEBUG /MD -DCISO8601_VERSION=2.2.0 -DCISO8601_CACHING_ENABLED=1 -IC:\Users\jbalodhi172197.AMAT\Anaconda3\include -IC:\Users\jbalodhi172197.AMAT\Anaconda3\Include "-IC:\Program Files (x86)\Microsoft Visual Studio 14.0\VC\INCLUDE" "-IC:\Program Files (x86)\Microsoft Visual Studio 14.0\VC\ATLMFC\INCLUDE" "-IC:\Program Files (x86)\Windows Kits\10\include\10.0.10240.0\ucrt" /Tcmodule.c /Fobuild\temp.win-amd64-cpython-39\Release\module.obj
module.c
c:\users\jbalodhi172197.amat\anaconda3\include\pyconfig.h(200): fatal error C1083: Cannot open include file: 'basetsd.h': No such file or directory
error: command 'C:\Program Files (x86)\Microsoft Visual Studio 14.0\VC\BIN\x86_amd64\cl.exe' failed with exit code 2
[end of output]

note: This error originates from a subprocess, and is likely not a problem with pip.
ERROR: Failed building wheel for ciso8601
Failed to build ciso8601
ERROR: Could not build wheels for ciso8601, which is required to install pyproject.toml-based projects

Error when toggle stream_results=True

How can i get data by chunks?

Code:

    conn = await connect(
        host=settings.CLICKHOUSE_BACKEND_HOST,
        port=settings.CLICKHOUSE_BACKEND_TCP_PORT,
        user=settings.CLICKHOUSE_BACKEND_USER,
        password=settings.CLICKHOUSE_BACKEND_PASSWORD,
        database="datasets",
        send_receive_timeout=1000,
    )

    async with conn.cursor(cursor=DictCursor) as cursor:
        cursor.set_stream_results(stream_results=True, max_row_buffer=1000)
        await cursor.execute(
            """
                SELECT *
                FROM ontime
        """
        )
        for row in cursor.fetchall():
            print(row)

Traceback:


  File "C:\Users\vasya\PycharmProjects\mielpops\app\hooks.py", line 21, in clickhouse_exec
    await cursor.execute(
          │      └ <function Cursor.execute at 0x000002A45AFB2040>
          └ <asynch.cursors.Cursor object at 0x000002A45BC59790>

  File "C:\Users\vasya\PycharmProjects\mielpops\venv\lib\site-packages\asynch\cursors.py", line 59, in execute
    execute, execute_kwargs = self._prepare()
                              │    └ <function Cursor._prepare at 0x000002A45AFB2430>
                              └ <asynch.cursors.Cursor object at 0x000002A45BC59790>

  File "C:\Users\vasya\PycharmProjects\mielpops\venv\lib\site-packages\asynch\cursors.py", line 181, in _prepare
    self.settings = self.settings or {}
    │               └ <asynch.cursors.Cursor object at 0x000002A45BC59790>
    └ <asynch.cursors.Cursor object at 0x000002A45BC59790>

AttributeError: 'Cursor' object has no attribute 'settings'

`None` input to a `String` field raise an `TypeError`

Error is:

   await self.write_items(prepared)
  File "/usr/local/lib/python3.11/site-packages/asynch/proto/columns/stringcolumn.py", line 13, in write_items
    await self.writer.write_strings(items)
  File "/usr/local/lib/python3.11/site-packages/asynch/proto/streams/buffered.py", line 49, in write_strings
    await self.write_varint(len(packet))
                            ^^^^^^^^^^^
TypeError: object of type 'NoneType' has no len()

Connection pool with predefined connections

Let's say that I have cluster of 6 nodes.
I would like to create a connection pool with 6 connections, one per each node.

Can I implement such behavior with asynch.create_pool()?

`Unknown type Date32` when inserting data into a `Date32` field

Getting asynch.errors.UnknownTypeError when inserting data into a Date32 field:

  File "lib/python3.11/site-packages/asynch/proto/columns/__init__.py", line 144, in get_column_by_spec
    raise UnknownTypeError("Unknown type {}".format(e.args[0]))
asynch.errors.UnknownTypeError: Code: 50. Unknown type Date32

Here is a minimal script to reproduce the bug:

import asyncio
from datetime import date

import asynch

async def main():
    pool = await asynch.create_pool(minsize=5)
    async with pool.acquire() as conn:
        async with conn.cursor() as cursor:
            await cursor.execute('CREATE DATABASE IF NOT EXISTS test')
            await cursor.execute('CREATE TABLE if not exists test.test (date Date32) ENGINE = MergeTree order by date')

            await cursor.execute('INSERT INTO test.test (date) VALUES', [{'date': date.today()}])

asyncio.run(main())

[BUG] Stale connection in connection pool cannot properly recover itself

Steps to reproduce the bug:

  • Start an aiohttp server, initiate an asynch connection pool (without configuring max/min connection)
  • Leave it running for an hour or two
  • Run any query by acquiring a connection in the pool
  • The server may raise an exception: bytearray index out of range
    • Here is an actual log from my server:
      Error handling request
      Traceback (most recent call last):
        <stack trace from aiohttp, mainly processes the request and assembles the query>
        File "/opt/pyenv/versions/3.9.6/lib/python3.9/site-packages/asynch/cursors.py", line 61, in execute
          response = await execute(query, args=args, with_column_types=True, **execute_kwargs)
        File "/opt/pyenv/versions/3.9.6/lib/python3.9/site-packages/asynch/proto/connection.py", line 526, in execute
          await self.force_connect()
        File "/opt/pyenv/versions/3.9.6/lib/python3.9/site-packages/asynch/proto/connection.py", line 575, in force_connect
          elif not await self.ping():
        File "/opt/pyenv/versions/3.9.6/lib/python3.9/site-packages/asynch/proto/connection.py", line 246, in ping
          packet_type = await self.reader.read_varint()
        File "/opt/pyenv/versions/3.9.6/lib/python3.9/site-packages/asynch/proto/io.py", line 127, in read_varint
          packet = self._read_one()
        File "/opt/pyenv/versions/3.9.6/lib/python3.9/site-packages/asynch/proto/io.py", line 117, in _read_one
          packet = self.buffer[self.position]
      IndexError: bytearray index out of range
      

Environment:

  • Ubuntu 20.10 on amd64
  • python 3.9.6 (pyenv install)
  • ClickHouse (docker install, tag 21.7.3.14-alpine)

Preliminary Analysis:

It seems that proto/connection::ping would read a varint from ClickHouse after sending the ping packet. However, in my case ClickHouse did not reply anything after a ping request, and thus buffer reader failed to extract anything, raising the error stating that the buffer is empty. I am unsure if the connection is actually closed, or in whatever weird state, since ClickHouse does not have proper specification on its native TCP protocol.

Not able to connect using secure=True

Getting the below exception when passing secure=True as follows:

    conn = await connect(
        host="<HOST>",
        port=9440,
        database="<DB>",
        user="<USER>",
        password="<PASSWORD>", secure=True
    )

  File "/home/deebug/PycharmProjects/pythonProject/asyncch_test.py", line 27, in <module>
    asyncio.run(connect_database())
  File "/home/deebug/miniconda3/lib/python3.7/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "/home/deebug/miniconda3/lib/python3.7/asyncio/base_events.py", line 587, in run_until_complete
    return future.result()
  File "/home/deebug/PycharmProjects/pythonProject/asyncch_test.py", line 11, in connect_database
    password="<PASSWORD>", secure=True
  File "/home/deebug/PycharmProjects/pythonProject/venv3.7/lib/python3.7/site-packages/asynch/connection.py", line 199, in connect
    await conn.connect()
  File "/home/deebug/PycharmProjects/pythonProject/venv3.7/lib/python3.7/site-packages/asynch/connection.py", line 97, in connect
    await self._connection.connect()
  File "/home/deebug/PycharmProjects/pythonProject/venv3.7/lib/python3.7/site-packages/asynch/proto/connection.py", line 470, in connect
    return await self._init_connection(host, port)
  File "/home/deebug/PycharmProjects/pythonProject/venv3.7/lib/python3.7/site-packages/asynch/proto/connection.py", line 457, in _init_connection
    await self.receive_hello()
  File "/home/deebug/PycharmProjects/pythonProject/venv3.7/lib/python3.7/site-packages/asynch/proto/connection.py", line 199, in receive_hello
    packet_type = await self.reader.read_varint()
  File "/home/deebug/PycharmProjects/pythonProject/venv3.7/lib/python3.7/site-packages/asynch/proto/io.py", line 127, in read_varint
    packet = self._read_one()
  File "/home/deebug/PycharmProjects/pythonProject/venv3.7/lib/python3.7/site-packages/asynch/proto/io.py", line 117, in _read_one
    packet = self.buffer[self.position]
IndexError: bytearray index out of range

Can you please provide a documentation or an example involving connecting using SSL?

AttributeError error of the fetchone function of the DictCursor class

Hello! In version 0.2.2, the fetchone function raised an AttributeError error when row == None. In version 0.2.3, when row == None, {} is returned.
I would like to know the reason for this change. After the update, the logic in my application broke down.
Thank you in advance.

class DictCursor(Cursor):
    async def fetchone(self):
        row = await super(DictCursor, self).fetchone()
        if self._columns and row:
            return dict(zip(self._columns, row))
        else:
            raise AttributeError("Not fould valid columns")
async def fetchone(self):
        row = await super(DictCursor, self).fetchone()
        if self._columns:
            return dict(zip(self._columns, row)) if row else {}
        else:
            raise AttributeError("Invalid columns.")

Request release v0.2.4

0.2.3 is 30 commits behind master.
It contains many fixes. e.g. (bytearray index out of range #90)
It would be great to have a release with these fixes

Can not connect to Clickhouse version 22.3.8.39

I want to use asynch library with one of the latest Clickhouse versions 22.3.8.39. Got exception

 File "/home/zexxonn/DEV/Quantor/momote/backend/app/context.py", line 81, in create_asynch_pool
    asynch_pool: "AsynchPool" = await asynch_pool_exit_stack.enter_async_context(
  File "/home/zexxonn/.pyenv/versions/3.9.12/lib/python3.9/contextlib.py", line 575, in enter_async_context
    result = await _cm_type.__aenter__(cm)
  File "/home/zexxonn/.cache/pypoetry/virtualenvs/backend-Ief6SS80-py3.9/lib/python3.9/site-packages/asynch/pool.py", line 53, in __aenter__
    self._obj = await self._coro
  File "/home/zexxonn/.cache/pypoetry/virtualenvs/backend-Ief6SS80-py3.9/lib/python3.9/site-packages/asynch/pool.py", line 242, in _create_pool
    await pool.initialize()
  File "/home/zexxonn/.cache/pypoetry/virtualenvs/backend-Ief6SS80-py3.9/lib/python3.9/site-packages/asynch/pool.py", line 176, in initialize
    conn = await connect(**self._connection_kwargs)
  File "/home/zexxonn/.cache/pypoetry/virtualenvs/backend-Ief6SS80-py3.9/lib/python3.9/site-packages/asynch/connection.py", line 199, in connect
    await conn.connect()
  File "/home/zexxonn/.cache/pypoetry/virtualenvs/backend-Ief6SS80-py3.9/lib/python3.9/site-packages/asynch/connection.py", line 97, in connect
    await self._connection.connect()
  File "/home/zexxonn/.cache/pypoetry/virtualenvs/backend-Ief6SS80-py3.9/lib/python3.9/site-packages/asynch/proto/connection.py", line 470, in connect
    return await self._init_connection(host, port)
  File "/home/zexxonn/.cache/pypoetry/virtualenvs/backend-Ief6SS80-py3.9/lib/python3.9/site-packages/asynch/proto/connection.py", line 457, in _init_connection
    await self.receive_hello()
  File "/home/zexxonn/.cache/pypoetry/virtualenvs/backend-Ief6SS80-py3.9/lib/python3.9/site-packages/asynch/proto/connection.py", line 241, in receive_hello
    raise UnexpectedPacketFromServerError(message)
asynch.errors.UnexpectedPacketFromServerError: Code: 102. Unexpected packet from server rc1b-ceve8zbnw6zcqb0d.mdb.yandexcloud.net:9440 (expected Hello or Exception, got Unknown packet)

With Clickhouse 21.12.2.17 everything work well.
I used debugger to understand difference between versions during connection and found that here https://github.com/long2ice/asynch/blob/dev/asynch/proto/connection.py#L226 for CH 22.3.8.39 asynch client receives packet_type == 21.
I am connecting with ca_certs and secure=True.

How connection problem can be fixed?

with_column_types=True, but there are no column types

Why code:

import asyncio
from asynch import connect


async def main():
    conn = await connect(
        host="127.0.0.1",
        port=2714,
        database="default",
    )

    async with conn.cursor() as cursor:
        await cursor.execute("SELECT 1")
        ret = cursor.fetchone()
        print(ret)
		
		
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([main()]))

returns:
(1,)

with no column types?

In the line

response = await execute(query, args=args, with_column_types=True, **execute_kwargs)
, I can see with_column_types=True, but you are not returning them.

Connection live time

Hi, what is the best way to use a connection in a long-lived asynchronous application? Is it better to create a new connection and open the cursor for each request, or is it possible to create a connection as a singleton and only open the cursor at each request? Thank you in advance.

New release

The last release on PyPi was more than year ago. There are some crucial fixes in repository since that time. So I suggest to make new release

LowCardinality column #2

This has been an issue before but I have still encountered it. (#2)

Int types are being initialized within LowCardinality, they themselves requiring a reader and writer too.

I could make this work on my machine by passing them to the Int types in asynch/proto/columns/lowcardinalitycolumn.py line 87 and 106.

eg:

int_column = self.int_types[int_type](
    reader=self.nested_column.reader, writer=self.nested_column.writer
)

DictCursor fetch* methods raise exceptions when no rows are returned

The Cursor fetch methods return an empty list when no rows are returned from the query e.g.

But the DictCursor's methods will raise an AttributeError("Not fould valid columns") if no rows are returned, because of the if self._columns and row conditions. It should return an empty list instead, perhaps with something like this:

if self._columns:
            return dict(zip(self._columns, row)) if row else []

I can create a PR if that works for you.

pandas.DataFrame support

@long2ice
Do you plan to support pandas dataframes that are heavily used by data scientists.
It would be very nice to be able to select directly to dataframe.
Is such feature on your roadmap?

Connection pool initialization

In initialize function there is condition
while self.size < self.minsize:
where self.size is self.freesize + len(self._used) and this leads to the fact that there will never be more than minsize connections in the dequeue, and maxsize connections count never be reached.
May be there should be condition like this:
while self.freesize < self.minsize:
?

Exception In select with LowCardinalityColumn

lib/python3.8/site-packages/asynch/proto/columns/base.py:102: in read_data
    return await self._read_data(n_items, nulls_map=nulls_map)
lib/python3.8/site-packages/asynch/proto/columns/lowcardinalitycolumn.py:106: in _read_data
    keys_column = self.int_types[key_type]()
lib/python3.8/site-packages/asynch/proto/columns/intcolumn.py:41: in __init__
    super(UIntColumn, self).__init__(types_check=types_check, **kwargs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <asynch.proto.columns.intcolumn.UInt8Column object at 0x7f3176a4c880>
types_check = False, kwargs = {}

    def __init__(self, types_check=False, **kwargs):
>       super(IntColumn, self).__init__(types_check=types_check, **kwargs)
E       TypeError: __init__() missing 2 required positional arguments: 'reader' and 'writer'

lib/python3.8/site-packages/asynch/proto/columns/intcolumn.py:12: TypeError

Quick fix is to add required kwargs here:

keys_column = self.int_types[key_type]()

keys_column = self.int_types[key_type](reader=self.reader, writer=self.writer)

ProgrammingError on async race condition when using pool execute many

when using execute many to insert multiple entries into clickhouse with the following code to rapidly insert records:

while values_batch in value_batches:
    async with self._db_pool.acquire() as conn:
                    async with conn.cursor() as cursor:
                         await cursor.executemany(ch_query, values_batch)

we get the following exeption:
ProgrammingError("records have not fetched, fetch all before execute next")

probably also an issue with a single execute

The cursor closes the connection incorrectly

The cursor closes the connection incorrectly. I'm not sure it should do it at all.

Code example:

logging.basicConfig(level=logging.DEBUG)
pool = await asynch.create_pool(maxsize=1)

async with pool.acquire() as conn:
    assert conn._is_closed is False  # <- ok
    async with conn.cursor() as cursor:
        await cursor.execute("SELECT 1")
    assert conn._is_closed is True  # <- not ok

async with pool.acquire() as conn:
    assert conn._is_closed is True  # <- not ok
    async with conn.cursor() as cursor:
        await cursor.execute("SELECT 1")    # <- but it works

You can see two stages of connection in the log:

DEBUG:asynch.proto.connection:Connecting. Database: default. User: default
DEBUG:asynch.proto.connection:Connecting to 127.0.0.1:9000
DEBUG:asynch.proto.connection:Connected to ClickHouse server version 22.1.3, revision: 54455
DEBUG:asynch.proto.connection:Query: SELECT 1
DEBUG:asynch.proto.connection:Connecting. Database: default. User: default
DEBUG:asynch.proto.connection:Connecting to 127.0.0.1:9000
DEBUG:asynch.proto.connection:Connected to ClickHouse server version 22.1.3, revision: 54455
DEBUG:asynch.proto.connection:Query: SELECT 1

Cannot connect to clickhouse server, got asynch.errors.UnexpectedPacketFromServerError: Code: 102. Unexpected packet from server hn.cpd.com:31123 (expected Hello or Exception, got Unknown packet)

Here is my script

import asyncio
from asynch import connect

async def connect_database():
    conn = await connect(
        host = "hn.cpd.com",
        port = 31123,
        database = "demo",
        user = "admin",
        password = "03c6203e73a",
    ) # Exception caught here

    cursor = conn.cursor() 
    await cursor.execute("SELECT * FROM table LIMIT 10")
    rows = await cursor.fetchall()
    print(rows)

try:
    asyncio.run(connect_database())
except Exception as e:
    print(e)

I have error Code: 102. Unexpected packet from server hn.cpd.com:31123 (expected Hello or Exception, got Unknown packet) when try to connect to clickhouse server

I used python3.11.5 with

  • asyncio == "3.4.3"
  • asynch == "0.2.3"

The creds works fine when I connect to clickhouse by other tools like DataGrid, ...

asynch.errors.UnknownTypeError: Code: 50. Unknown type Date32

Python: 3.12.0
ashynch: 0.2.3
ClickHouse: 23.8.5.16

Code

import asyncio

from asynch import connect


async def connect_database():
    conn = await connect("")
    async with conn.cursor() as cursor:
        await cursor.execute("""select makeDate32(1950, 06, 06);""")
        ret = await cursor.fetchall()
        print(ret)

asyncio.run(connect_database())

Traceback

Traceback (most recent call last):
  File "C:\Users\Cortel\AppData\Local\pypoetry\Cache\virtualenvs\test-asynch-date-issue-CodK8wwF-py3.12\Lib\site-packages\asynch\proto\columns\__init__.py", line 140, in get_column_by_spec
    cls = column_by_type[spec]
          ~~~~~~~~~~~~~~^^^^^^
KeyError: 'Date32'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\Cortel\Projects\Useless\test_asynch_date_issue\main.py", line 13, in <module>
    asyncio.run(connect_database())
  File "C:\Users\Cortel\AppData\Local\Programs\Python\Python312\Lib\asyncio\runners.py", line 194, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "C:\Users\Cortel\AppData\Local\Programs\Python\Python312\Lib\asyncio\runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Cortel\AppData\Local\Programs\Python\Python312\Lib\asyncio\base_events.py", line 664, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "C:\Users\Cortel\Projects\Useless\test_asynch_date_issue\main.py", line 9, in connect_database
    await cursor.execute("""select makeDate32(1950, 06, 06);""")
  File "C:\Users\Cortel\AppData\Local\pypoetry\Cache\virtualenvs\test-asynch-date-issue-CodK8wwF-py3.12\Lib\site-packages\asynch\cursors.py", line 60, in execute
    response = await execute(query, args=args, with_column_types=True, **execute_kwargs)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Cortel\AppData\Local\pypoetry\Cache\virtualenvs\test-asynch-date-issue-CodK8wwF-py3.12\Lib\site-packages\asynch\proto\connection.py", line 633, in execute
    async with ExecuteContext(self, query, settings):
  File "C:\Users\Cortel\AppData\Local\pypoetry\Cache\virtualenvs\test-asynch-date-issue-CodK8wwF-py3.12\Lib\site-packages\asynch\proto\context.py", line 60, in __aexit__
    raise exc_val
  File "C:\Users\Cortel\AppData\Local\pypoetry\Cache\virtualenvs\test-asynch-date-issue-CodK8wwF-py3.12\Lib\site-packages\asynch\proto\connection.py", line 648, in execute
    rv = await self.process_ordinary_query(
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Cortel\AppData\Local\pypoetry\Cache\virtualenvs\test-asynch-date-issue-CodK8wwF-py3.12\Lib\site-packages\asynch\proto\connection.py", line 775, in process_ordinary_query
    return await self.receive_result(with_column_types=with_column_types, columnar=columnar)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Cortel\AppData\Local\pypoetry\Cache\virtualenvs\test-asynch-date-issue-CodK8wwF-py3.12\Lib\site-packages\asynch\proto\connection.py", line 498, in receive_result
    return await result.get_result()
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Cortel\AppData\Local\pypoetry\Cache\virtualenvs\test-asynch-date-issue-CodK8wwF-py3.12\Lib\site-packages\asynch\proto\result.py", line 56, in get_result
    async for packet in self.packet_generator:
  File "C:\Users\Cortel\AppData\Local\pypoetry\Cache\virtualenvs\test-asynch-date-issue-CodK8wwF-py3.12\Lib\site-packages\asynch\proto\connection.py", line 477, in packet_generator
    packet = await self.receive_packet()
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Cortel\AppData\Local\pypoetry\Cache\virtualenvs\test-asynch-date-issue-CodK8wwF-py3.12\Lib\site-packages\asynch\proto\connection.py", line 403, in receive_packet
    packet = await self._receive_packet()
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Cortel\AppData\Local\pypoetry\Cache\virtualenvs\test-asynch-date-issue-CodK8wwF-py3.12\Lib\site-packages\asynch\proto\connection.py", line 437, in _receive_packet
    packet.block = await self.receive_data()
                   ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Cortel\AppData\Local\pypoetry\Cache\virtualenvs\test-asynch-date-issue-CodK8wwF-py3.12\Lib\site-packages\asynch\proto\connection.py", line 340, in receive_data
    return await (self.block_reader_raw if raw else self.block_reader).read()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Cortel\AppData\Local\pypoetry\Cache\virtualenvs\test-asynch-date-issue-CodK8wwF-py3.12\Lib\site-packages\asynch\proto\streams\block.py", line 82, in read
    column = await read_column(
             ^^^^^^^^^^^^^^^^^^
  File "C:\Users\Cortel\AppData\Local\pypoetry\Cache\virtualenvs\test-asynch-date-issue-CodK8wwF-py3.12\Lib\site-packages\asynch\proto\columns\__init__.py", line 155, in read_column
    column = get_column_by_spec(column_spec, column_options)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Cortel\AppData\Local\pypoetry\Cache\virtualenvs\test-asynch-date-issue-CodK8wwF-py3.12\Lib\site-packages\asynch\proto\columns\__init__.py", line 144, in get_column_by_spec
    raise UnknownTypeError("Unknown type {}".format(e.args[0]))
asynch.errors.UnknownTypeError: Code: 50. Unknown type Date32

Exception

File "/usr/local/lib/python3.8/site-packages/asynch/cursors.py", line 61, in execute
    response = await execute(query, args=args, with_column_types=True, **execute_kwargs)
  File "/usr/local/lib/python3.8/site-packages/asynch/proto/connection.py", line 534, in execute
    rv = await self.process_insert_query(
  File "/usr/local/lib/python3.8/site-packages/asynch/proto/connection.py", line 637, in process_insert_query
    rv = await self.send_data(
  File "/usr/local/lib/python3.8/site-packages/asynch/proto/connection.py", line 679, in send_data
    await self.send_block(block)
  File "/usr/local/lib/python3.8/site-packages/asynch/proto/connection.py", line 614, in send_block
    await self.block_out_stream.write(block)
  File "/usr/local/lib/python3.8/site-packages/asynch/proto/streams/native.py", line 36, in write
    await write_column(
  File "/usr/local/lib/python3.8/site-packages/asynch/proto/columns/__init__.py", line 137, in write_column
    column = get_column_by_spec(column_spec, column_options)
  File "/usr/local/lib/python3.8/site-packages/asynch/proto/columns/__init__.py", line 78, in get_column_by_spec
    return create_string_column(spec, column_options)
  File "/usr/local/lib/python3.8/site-packages/asynch/proto/columns/stringcolumn.py", line 57, in create_string_column
    return cls(**column_options)
TypeError: __init__() missing 1 required positional argument: 'length'

RuntimeError: read() called while another coroutine is already waiting for incoming data

Hi!

I get the following error:

Traceback (most recent call last):
  File "ch_test_asynch.py", line 55, in <module>
    asyncio.run(main())
  File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "ch_test_asynch.py", line 52, in main
    await asyncio.gather(*inserts)
  File "ch_test_asynch.py", line 36, in insert_data
    await cursor.execute(
  File "/usr/local/lib/python3.8/dist-packages/asynch/cursors.py", line 61, in execute
    response = await execute(query, args=args, with_column_types=True, **execute_kwargs)
  File "/usr/local/lib/python3.8/dist-packages/asynch/proto/connection.py", line 526, in execute
    await self.force_connect()
  File "/usr/local/lib/python3.8/dist-packages/asynch/proto/connection.py", line 573, in force_connect
    await self.connect()
  File "/usr/local/lib/python3.8/dist-packages/asynch/proto/connection.py", line 470, in connect
    return await self._init_connection(host, port)
  File "/usr/local/lib/python3.8/dist-packages/asynch/proto/connection.py", line 457, in _init_connection
    await self.receive_hello()
  File "/usr/local/lib/python3.8/dist-packages/asynch/proto/connection.py", line 201, in receive_hello
    server_name = await self.reader.read_str()
  File "/usr/local/lib/python3.8/dist-packages/asynch/proto/io.py", line 138, in read_str
    length = await self.read_varint()
  File "/usr/local/lib/python3.8/dist-packages/asynch/proto/io.py", line 124, in read_varint
    await self._read_into_buffer()
  File "/usr/local/lib/python3.8/dist-packages/asynch/proto/io.py", line 112, in _read_into_buffer
    packet = await self.reader.read(self.buffer_max_size)
  File "/usr/lib/python3.8/asyncio/streams.py", line 684, in read
    await self._wait_for_data('read')
  File "/usr/lib/python3.8/asyncio/streams.py", line 503, in _wait_for_data
    raise RuntimeError(
RuntimeError: read() called while another coroutine is already waiting for incoming data

With this minimal code example:

import asyncio

import string

from asynch import connect
from asynch.cursors import DictCursor


async def connect_database():
    conn = await connect(
        host = "0.0.0.0",
        port = 9000,
        database = "waf",
    )
    return conn


async def create_table(client):
    async with client.cursor(cursor=DictCursor) as cursor:
        await cursor.execute('DROP TABLE IF EXISTS test')
        await cursor.execute("""
        CREATE TABLE if not exists waf.test
            (
                `id`       Int32,
                `decimal`  Decimal(10, 2),
                `string`   String

            )
            ENGINE = MergeTree
                ORDER BY id"""
        )


async def insert_data(client):
    async with client.cursor(cursor=DictCursor) as cursor:
        await cursor.execute(
            """INSERT INTO waf.test(id,decimal,string) VALUES""",
            [
                (
                    1,
                    123.45,
                    'a str',
                ),
            ],
        )


async def main():
    client = await connect_database()
    await create_table(client)
    inserts = [insert_data(client) for f in range(0, 2)]
    await asyncio.gather(*inserts)

if __name__ == '__main__':
    asyncio.run(main())

ClickHouse client version 21.6.6.51 (official build).
asynch-0.1.9

Tuple column

First

  File "/home/ikrivosheev/.pyenv/versions/ptms-3.8.2/lib/python3.8/site-packages/asynch/proto/columns/tuplecolumn.py", line 55, in create_tuple_column
    return TupleColumn([column_by_spec_getter(x) for x in nested_columns])
  File "/home/ikrivosheev/.pyenv/versions/ptms-3.8.2/lib/python3.8/site-packages/asynch/proto/columns/tuplecolumn.py", line 9, in __init__
    super(TupleColumn, self).__init__(**kwargs)
TypeError: __init__() missing 2 required positional arguments: 'reader' and 'writer'

Second
I create simple table:

CREATE TABLE IF NOT EXISTS test.test (
    id UInt64,
    values Array(Tuple(String, String))
)
...

and then i execute:

    async with conn.cursor(cursor=DictCursor) as cursor:
        ret = await cursor.execute("""INSERT INTO test.test VALUES""", [{"id": 1, "values": []}])

I got error:

  File "/home/ikrivosheev/.pyenv/versions/ptms-3.8.2/lib/python3.8/site-packages/asynch/proto/columns/tuplecolumn.py", line 22, in write_items
    return await self.write_data(items,)
  File "/home/ikrivosheev/.pyenv/versions/ptms-3.8.2/lib/python3.8/site-packages/asynch/proto/columns/tuplecolumn.py", line 17, in write_data
    await x.write_data(list(items[i]),)
IndexError: list index out of range

Bug IndexError

class AsynchContexManager:
    def __init__(self, url: str):
        self.url = url
        self.connection = None

    async def __aenter__(self) -> asynch.connection.Connection:
        self.connection = await asynch.connect(self.url)
        return self.connection

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.connection.close()


def clickhouse_cursor(func: Callable[Concatenate[asynch.connection.Connection, P], T]) -> Callable[P, T]:
    url = f"{clickhouse.get_connection_url()}?os_thread_priority=19&max_threads=4"

    @functools.wraps(func)
    @backoff.on_exception(backoff.constant, (ClickHouseException, DatabaseError), max_tries=3, interval=90)
    async def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
        async with AsynchContexManager(url) as connection:
            async with connection.cursor(cursor=cursors.DictCursor) as cursor:
                return await func(*args, **kwargs, cursor=cursor)

    return wrapper
class Puller:
    @clickhouse_cursor
    @backoff.on_exception(backoff.constant, IndexError, interval=10)
    async def pull(self, cursor: asynch.cursors.Cursor) -> DataFrame:
        await cursor.execute(f"""                                                                                                 # <- Index error
            select * 
            from {self.table_name}
            limit {self.limit}
            offset {self.pointer}
        """)
File "/opt/project/jobs/sync_attribution/utils/puller.py", line 94, in pull
  await cursor.execute(f"""
File "/usr/local/lib/python3.10/site-packages/asynch/cursors.py", line 64, in execute
  response = await execute(query, args=args, with_column_types=True, **execute_kwargs)
File "/usr/local/lib/python3.10/site-packages/asynch/proto/connection.py", line 623, in execute
  async with ExecuteContext(self, query, settings):
File "/usr/local/lib/python3.10/site-packages/asynch/proto/context.py", line 56, in __aexit__
  raise exc_val
File "/usr/local/lib/python3.10/site-packages/asynch/proto/connection.py", line 639, in execute
  rv = await self.process_ordinary_query(
File "/usr/local/lib/python3.10/site-packages/asynch/proto/connection.py", line 768, in process_ordinary_query
  return await self.receive_result(with_column_types=with_column_types, columnar=columnar)
File "/usr/local/lib/python3.10/site-packages/asynch/proto/connection.py", line 487, in receive_result
  return await result.get_result()
File "/usr/local/lib/python3.10/site-packages/asynch/proto/result.py", line 56, in get_result
  async for packet in self.packet_generator:
File "/usr/local/lib/python3.10/site-packages/asynch/proto/connection.py", line 465, in packet_generator
  packet = await self.receive_packet()
File "/usr/local/lib/python3.10/site-packages/asynch/proto/connection.py", line 391, in receive_packet
  packet = await self._receive_packet()
File "/usr/local/lib/python3.10/site-packages/asynch/proto/connection.py", line 425, in _receive_packet
  packet.block = await self.receive_data()
File "/usr/local/lib/python3.10/site-packages/asynch/proto/connection.py", line 328, in receive_data
  return await (self.block_reader_raw if raw else self.block_reader).read()
File "/usr/local/lib/python3.10/site-packages/asynch/proto/streams/block.py", line 82, in read
  column = await read_column(
File "/usr/local/lib/python3.10/site-packages/asynch/proto/columns/__init__.py", line 155, in read_column
  return await column.read_data(
File "/usr/local/lib/python3.10/site-packages/asynch/proto/columns/base.py", line 112, in read_data
  return await self._read_data(n_items, nulls_map=nulls_map)
File "/usr/local/lib/python3.10/site-packages/asynch/proto/columns/base.py", line 115, in _read_data
  items = await self.read_items(
File "/usr/local/lib/python3.10/site-packages/asynch/proto/columns/stringcolumn.py", line 18, in read_items
  ret.append(await self.reader.read_str(as_bytes=self.read_as_bytes))
File "/usr/local/lib/python3.10/site-packages/asynch/proto/streams/buffered.py", line 141, in read_str
  length = await self.read_varint()
File "/usr/local/lib/python3.10/site-packages/asynch/proto/streams/buffered.py", line 130, in read_varint
  packet = self._read_one()
File "/usr/local/lib/python3.10/site-packages/asynch/proto/streams/buffered.py", line 120, in _read_one
  packet = self.buffer[self.position]
dexError: bytearray index out of range

clickhouse: 23.3.1.2823
python: 3.10
asynch: 0.2.2

`Code: 50. Unknown type Object('json')` when using JSON type

It would be great to be able to insert data to a JSON type, at the moment it's causing an error.

Minimal Reproduction
import asyncio

from asynch import connect
from asynch.cursors import DictCursor


async def demo():
    conn = await connect()
    async with conn.cursor(cursor=DictCursor) as cursor:
        await cursor.execute('create database if not exists test')
        await cursor.execute('SET allow_experimental_object_type=1')
        await cursor.execute("""
        CREATE TABLE if not exists test.json_type (
                id       Int32,
                json_data JSON
            )
            ENGINE = MergeTree ORDER BY id"""
        )
        ret = await cursor.execute(
            """INSERT INTO test.json_type(id, json_data) VALUES""",
            [
                {
                    "id": 1,
                    "json_data": b'{"foo": "bar"}'
                }
            ],
        )
        print(ret)


if __name__ == '__main__':
    asyncio.run(demo())

Support for Query Cancellation

Does the library support query cancellation? We often run slow queries that need to be altered during execution, and ideally, we'd like to be able to cancel an ongoing query and start a new one.

For reference, the clickhouse-driver supports this feature via client.cancel().

If this is not currently supported, I'm interested in contributing to implement this feature and would appreciate some guidance.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.