Comments (9)
How do you serialize with avro ? Avro is a format where a schema is required for the producer to serialize and the consumer to deserialize, otherwise is just gibberish bytes. Usually the strategy with kafka is to store schema in some sort of central registery, then put in kafka header the reference to the schema used to produce the message. It is what confluent is doing with their schema registry.
As far as I know aiokafka
doesn't provide anything avro related, so the serializer/deserializer must be from your own implementation. Depending on what you are doing, it might be that your serialized message is containing both the schema and the data
from aiokafka.
@vmaurin I'm using kafkit library for serialization and communication with schema registry.
from aiokafka.
Maybe try to dump the message you serialized before passing it to aiokafka
? Otherwise, as far as I can see, the size is checked per message https://github.com/aio-libs/aiokafka/blob/master/aiokafka/producer/producer.py#L411 (even if then messages might be batched)
The formula seems to be : overhead + len(key) + len(value)
Headers seems ignored
from aiokafka.
@vmaurin Can I specify max_request_size for producer bigger than broker's relevant value if I have compression enabled?
from aiokafka.
You mean max.message.bytes on broker/topic ? It might be then it seems to be applied after compression, but then it is also applied to a batch of message, while the check in aiokafka
is just for a single message
from aiokafka.
@vmaurin yes, but I use send_and_wait() to send immediately, so I hope batch will not exceed max.message.bytes
too.
from aiokafka.
@ant0nk @vmaurin did you figure this out? Having similar issue:
- enabled
zstd
comression - sending 1.5mb message
- aiokafka responds with
The message is ... bytes when serialized which is larger than the maximum request size ... 1048576
- aiokafka seems to be checking message size before compression as manually compressed message is approx ~700kb in size
from aiokafka.
Got around the problem by disabling aiokafka
message size validation.
aiokafka
validates message size before compression - rejecting otherwise valid messages. Setting max_request_size
to huge value disables aiokafka
validation. Validation is still performed by kafka. So, if compressed message is too big kafka.errors.MessageSizeTooLargeError
is raised.
from aiokafka.
@Symas1 Your approach may not work if you send messages quickly enough, as aiokafka combines multiple messages into batches and raising this setting may lead to huge requests being rejected by broker.
[…] This setting will limit the number of record batches the producer will send in a single request to avoid sending huge requests.
from aiokafka.
Related Issues (20)
- [QUESTION] How to check readiness of kafka to receive msgs from producer?
- seek_to_committed does not work when committed offset is 0
- Unexpected error during batch delivery HOT 1
- IncompatibleBrokerVersion: Kafka broker does not support the 'CreateTopicsRequest_v0' Kafka protocol. HOT 1
- [QUESTION] Restarting `AIOKafkaConsumer` after `AIOKafkaConsumer.stop()` HOT 3
- Let's put `aiokafka` under the `aio-libs` org on PyPI HOT 2
- Add create_acls function for kafka admin client
- AIOKafkaProducer failed to produce message with headers HOT 3
- Add delete_records to the admin client HOT 5
- Can't connect to kafka docker HOT 1
- asyncio.exceptions.CancelledError
- Regarding Kafka Connection
- invalid Type AioKafkaAdminClient create_partitions
- Proposal to Add Type Hints HOT 12
- Consumer stopped consuming, task Fetcher._fetch_task has finished HOT 8
- High Incoming request sum on Azure Event Hub
- [QUESTION] Unable connect to node with id: X: [Errno 111]: Connection refused
- admin client - failure to create topics (error code 41) HOT 3
- [QUESTION] How to get old messages from topics? HOT 4
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from aiokafka.