Comments (13)
I'm having another issue where I would expect client to reconnect. Sometimes the connection is in a weird state that I cannot get any answer from the node therefore I get KafkaTimeoutError
. This results to endless heatbeat loop because coordinator calls GroupCoordinator.ensure_coordinator_known
and it fails in sending request. This repeats every 40,1 s because of timeout in AIOKafkaConnection._request_timeout_ms == 40000
and GroupCoordinator._retry_backoff_ms == 100
. It seems that AIOKafkaConnection.connected()
still holds true because it does not delete connection in Client.send()
(from ready
from _get_conn
).
I think that on KafkaTimeoutError
client should reconnect because this way it just disconnects and we have to restart the service. After restart everything runs smoothly until KafkaTimeoutError
appears again.
Here I present excerpt of our logs:
March 28th 2017, 19:17:04.985 Error sending GroupCoordinatorRequest_v0 to node 0 [KafkaTimeoutError] -- marking coordinator dead
March 28th 2017, 19:17:04.986 Group Coordinator Request failed: KafkaTimeoutError
March 28th 2017, 19:17:45.088 Error sending GroupCoordinatorRequest_v0 to node 0 [KafkaTimeoutError] -- marking coordinator dead
March 28th 2017, 19:17:45.088 Group Coordinator Request failed: KafkaTimeoutError
from aiokafka.
Hey there, that's new, it should just reconnect. How do you actually reproduce this?
To answer your question of where it reconnects:
- https://github.com/aio-libs/aiokafka/blob/master/aiokafka/client.py#L309 - here we check if our connection is still
.connected()
, if not we reconnect. - https://github.com/aio-libs/aiokafka/blob/master/aiokafka/conn.py#L180 - Here's where the connection actually notices that it's disconnected.
from aiokafka.
Ah, I'm sorry for wrong conclusion. It wasn't working for me because I didn't catch exceptions from get()
and commit()
. It seems that aiokafka
is working flawlessly. Only thing that confused me is missing reconnect.backoff.ms
. I was searching for that in the codebase and didn't find it so I concluded that aiokafka
is not reconnecting. Sorry for my mistake and thanks for your response.
from aiokafka.
@Artimi NP). Glad it was resolved.
Somehow we end up using retry_backoff_ms
for both reconnect and error backoff. Don't know if that's a big issue yet.
from aiokafka.
Could you provide full configuration for the consumer?
from aiokafka.
Sure:
self._consumer = aiokafka.AIOKafkaConsumer(
loop = loop,
bootstrap_servers = hosts,
group_id = consumer_group,
enable_auto_commit = False,
auto_offset_reset = 'earliest',
heartbeat_interval_ms = 3 * 1000,
metadata_max_age_ms = 30 * 1000,
)
from aiokafka.
Then where did you get GroupCoordinator._retry_backoff_ms == 10000
?
from aiokafka.
You are right, sorry it should be 100 as the default value is. I edited the comment.
from aiokafka.
So to sum it up, there is a case where Kafka can still have the socket in open state, but requests timeout. That's weird, but probably possible. So the fix should be to discard connections, that have timeouts.
from aiokafka.
Yes, exactly. Just delete the connection if it has timeout and reconnect again. This should do no harm, right?
from aiokafka.
Hey @Drizzt1991 thanks for merge. I just wanted to ask when do you plan next release?
from aiokafka.
@Artimi Great thanks for submitting issues, really helps. Release will be this week, probably on weekend.
from aiokafka.
@Artimi The release is in progress. It should be OK as of 0.2.2.
from aiokafka.
Related Issues (20)
- Inconsistent producer start/stop
- AIOKafkaAdminClient.create_topics fails randomly because it selects a node at random HOT 1
- performance degradation of producer when having many topics HOT 1
- Can I use Azure Event Hubs with `aiokafka`? HOT 1
- 0.9.0 zstd codec depends on cramjam but missing in documentation HOT 1
- Let's put `aiokafka` under the `aio-libs` org on PyPI HOT 2
- Add create_acls function for kafka admin client
- AIOKafkaProducer failed to produce message with headers HOT 3
- I keep getting MessageSizeTooLargeError, error message gives size much bigger than actual message were given to producer. HOT 6
- Add delete_records to the admin client HOT 5
- Can't connect to kafka docker HOT 1
- asyncio.exceptions.CancelledError
- Regarding Kafka Connection
- invalid Type AioKafkaAdminClient create_partitions
- Proposal to Add Type Hints HOT 12
- Consumer stopped consuming, task Fetcher._fetch_task has finished HOT 5
- High Incoming request sum on Azure Event Hub
- [QUESTION] Unable connect to node with id: X: [Errno 111]: Connection refused
- admin client - failure to create topics (error code 41) HOT 3
- [QUESTION] How to get old messages from topics? HOT 4
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from aiokafka.