Comments (5)
Thanks for reporting @proddata the fix will be available in 5.6.5
and 5.7.1
releases.
from crate.
What is suspicious:
- the first 6 executions work fine
- the 7th and following are partially failling
Looking at a wireshark recording there seems to be a behaviour change how asyncpg (3) is communicating with CrateDB all of a sudden (from second 349 - all logs before look the same):
Wireshark
![image](https://github.com/crate/crate/assets/23557193/e6dafdc0-6a06-4879-82d3-eab55509a683)
The inserts follow simple queries for the first executions, i.e.:
<Z (ready for query)
>Q (simple query)
<C (command completion INSERT 0 100)
<Z (ready for query)
from crate.
I now switched it to have auto commit enabled with
conn = await psycopg.AsyncConnection.connect(
dbname=database, user=user, password=password, host=host, autocommit=True)
Which works ... for the first 5 minutes ... then again ...
from crate.
Forcing prepared statement with execute(stmt1, prepare=True)
makes it fail quicker:
await cur.execute(stmt1, prepare=True)
result = cur.rowcount
print(f"Time: {datetime.now()}, Affected Rows: {result}")
# Second SQL statement
await cur.execute(stmt2 , prepare=True)
result = cur.rowcount
print(f"Time: {datetime.now()}, Affected Rows: {result}")
# Third SQL statement
await cur.execute(stmt3, prepare=True)
result = cur.rowcount
print(f"Time: {datetime.now()}, Affected Rows: {result}")
Executing at 2024-04-22 21:04:22.818460
Time: 2024-04-22 21:04:22.905894, Affected Rows: 12001
Time: 2024-04-22 21:04:30.486069, Affected Rows: 100
Time: 2024-04-22 21:04:30.500197, Affected Rows: -1
Executing at 2024-04-22 21:05:30.502028
Time: 2024-04-22 21:05:30.601565, Affected Rows: 12001
Time: 2024-04-22 21:05:30.611416, Affected Rows: 0 <-- fails already
Time: 2024-04-22 21:05:30.612167, Affected Rows: -1
Executing at 2024-04-22 21:06:30.613892
Time: 2024-04-22 21:06:30.737932, Affected Rows: 12001
Time: 2024-04-22 21:06:30.740307, Affected Rows: 0
Time: 2024-04-22 21:06:30.741570, Affected Rows: -1
Executing at 2024-04-22 21:07:30.743420
Time: 2024-04-22 21:07:30.854913, Affected Rows: 12001
Time: 2024-04-22 21:07:30.856737, Affected Rows: 0
Time: 2024-04-22 21:07:30.857663, Affected Rows: -1
from crate.
So this seems to relate to prepared statements and how CrateDB treats them.
Psycopg3 with default settings automatically will prepare statements if they are run more than 5 times on a connection, if not explictily disabled. This prepared statements seem to behave strange when used with CrateDB. It almost seems like CrateDB is keeping a state about what partitions are affected from a prepared statement, but never updates it. This can be replicated with the following code example which does roughly the following:
- Create a partitioned table
- Prepare and run DELETE statement -> nothing gets deleted as the table is empty
- Fill table with records
- Refresh table just to be sure
- Run same DELETE statement again -> nothing gets deleted, but the table is not empty 🤯
- lets wait 15 seconds and try again ...
- Run same DELETE statement again -> nothing gets deleted, but the table is not empty 🤯
Code Example
import asyncio
from datetime import datetime
import psycopg
# SQL statements for database operations
CREATE_TABLE = """
CREATE TABLE IF NOT EXISTS scheduler_issue.base (
ts TIMESTAMP,
ts_g GENERATED ALWAYS AS date_trunc('minute', ts)
) PARTITIONED BY (ts_g);
"""
INSERT_DATA = """
INSERT INTO scheduler_issue.base (ts)
SELECT ts
FROM generate_series(
date_trunc('minute', now()),
date_trunc('minute', now() + '1 minute'::INTERVAL),
'5 seconds'::INTERVAL
) g (ts);
"""
DELETE_OLD_DATA = """
DELETE FROM scheduler_issue.base
"""
REFRESH_TABLE = "REFRESH TABLE scheduler_issue.base;"
async def run():
# Database connection details
user = 'crate'
password = ''
database = 'crate'
host = 'localhost'
# Connect to the database asynchronously
conn = await psycopg.AsyncConnection.connect(
dbname=database, user=user, password=password, host=host)
print("Connected to the database.")
try:
async with conn: # Using the connection within an async context manager
async with conn.cursor() as cur:
# Executing SQL commands
await cur.execute(CREATE_TABLE)
print(f"Table created")
#await asyncio.sleep(30)
await cur.execute(DELETE_OLD_DATA, prepare=True)
print(f"Run prepared delete first time on empty table, Affected Rows: {cur.rowcount}")
await cur.execute(INSERT_DATA)
print(f"Data inserted at, Affected Rows: {cur.rowcount}")
await cur.execute(REFRESH_TABLE)
print(f"Table refreshed")
await cur.execute(DELETE_OLD_DATA, prepare=True)
print(f"Run prepared delete again on filled table, Affected Rows: {cur.rowcount}")
print("Sleeping for 15 seconds")
await asyncio.sleep(15)
await cur.execute(DELETE_OLD_DATA, prepare=True)
print(f"Run prepared delete again on filled table, Affected Rows: {cur.rowcount}")
except Exception as e:
print(f"An error occurred: {e}")
finally:
# Close the connection
if not conn.closed:
await conn.close()
print("Connection closed.")
# Running the asynchronous function
asyncio.run(run())
Output ❌
Connected to the database.
Table created
Run prepared delete first time on empty table, Affected Rows: 0
Data inserted at, Affected Rows: 13
Table refreshed
Run prepared delete again on filled table, Affected Rows: 0
Sleeping for 15 seconds
Run prepared delete again on filled table, Affected Rows: 0
Connection closed.
With a non-partitioned table it works ✅
Connected to the database.
Table created
Run prepared delete first time on empty table, Affected Rows: 0
Data inserted at, Affected Rows: 13
Table refreshed
Run prepared delete again on filled table, Affected Rows: 13
Sleeping for 15 seconds
Run prepared delete again on filled table, Affected Rows: 0
Connection closed.
from crate.
Related Issues (20)
- Allow users with `AL` privileges to manage session settings defaults for other users HOT 1
- `COUNT()` from a nested view should use `COUNT` operator instead of `COLLECT`
- StaticInformationSchemaQueryTest.testIsNotNull test failure HOT 2
- Success `statusCode` for failed bulk inserts with HTTP endpoint HOT 1
- Extra flags to provide further details on failed records on HTTP endpoint bulk insertion HOT 1
- Improve `ARRAY_LENGTH` performance for arrays within object arrays HOT 1
- Improve performance in HyperLogLogDistinctAggregation
- Invalid results when filtering with `ARRAY_LENGTH` on an array in array of objects HOT 2
- IndexOutOfBoundsException on query with ORDER BY in 3 nodes cluster HOT 3
- ShardInfo in NodeInfo MXBean is missing the schema name leading to duplicates HOT 4
- Table with dynamic column policy is silently failing first insert if it includes an empty array HOT 5
- Unexpected result when using empty `OBJECT` literal HOT 2
- Behaviour of numeric scale and casting inconsistent / unspecified HOT 2
- PostgreSQL wire protocol: type OID for text responses in prepared statements is invalid HOT 4
- PostgreSQL compatibility: SQL statement containing only comment not accepted HOT 3
- io.crate.integrationtests.LuceneQueryBuilderIntegrationTest#testNullOperators HOT 5
- Cannot add new column to a partitioned table after changing the number of shards HOT 2
- optimize lucene query generation for distinct from
- Avoid utf8->utf16 conversations (for grouping operations)
- UDFs with parameters incorrectly looked up with names all lowercase when used in GENERATED column HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from crate.