Giter Club home page Giter Club logo

kafka-protocol-rs's Introduction

Kafka-Protocol Build crates.io docs.rs

Rust implementation of the Kafka wire protocol.

Unlike other Kafka protocol implementations, this project uses code generation to cover the entire Kafka API surface, including different protocol versions. See Kafka's repo for an example of protocol schema.

Versioning

Protocol messages are generated against a recent stable Kafka release, currently 3.7.0.

Although the Kafka protocol remains relatively stable and strives to be backwards compatible, new fields are occasionally added. In order to ensure forward compatibility with the protocol, this crate marks all exported items as #[non-exhaustive]. Protocol messages can be constructed using Default::default and updated with builder style methods.

Working with messages

Using Default::default:

use kafka_protocol::messages::{ApiKey, MetadataRequest, RequestHeader};
use kafka_protocol::protocol::StrBytes;

let mut header = RequestHeader::default();
header.client_id = Some(StrBytes::from_static_str("my-client"));
header.request_api_key = ApiKey::MetadataKey as i16;
header.request_api_version = 12;

let mut request = MetadataRequest::default();
request.topics = None;
request.allow_auto_topic_creation = true;

Using builder style methods:

use kafka_protocol::messages::{ApiKey, MetadataRequest, RequestHeader};
use kafka_protocol::protocol::StrBytes;

let header = RequestHeader::default()
    .with_client_id(Some(StrBytes::from_static_str("my-client")))
    .with_request_api_key(ApiKey::MetadataKey as i16)
    .with_request_api_version(12);

let request = MetadataRequest::default()
    .with_topics(None)
    .with_allow_auto_topic_creation(true);

Serialization

Once a message has been created, it can be serialized using Encodable, writing the struct to a provided bytes::BytesMut. The API version for the given message matching the version specified in the request header must be provided.

use bytes::BytesMut;
use kafka_protocol::messages::MetadataRequest;
use kafka_protocol::protocol::Encodable;

let mut bytes = BytesMut::new();
let request = MetadataRequest::default();
request.encode(&mut bytes, 12).unwrap();

Deserialization

Messages can be decoded using Decodable and providing the matching API version from their corresponding request.

use bytes::Bytes;
use kafka_protocol::messages::ApiVersionsRequest;
use kafka_protocol::protocol::Decodable;

let bytes: [u8; 25] = [
        0x12, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2d,
        0x6b, 0x61, 0x66, 0x6b, 0x61, 0x2d, 0x6a, 0x61,
        0x76, 0x61, 0x06, 0x32, 0x2e, 0x38, 0x2e, 0x30,
        0x00
];
 
let res = ApiVersionsRequest::decode(&mut Bytes::from(bytes.to_vec()), 3).unwrap();

Development

Run cargo run -p protocol_codegen in the root path of this repo to generate/update the Rust codes via the latest Kafka protocol schema.

Originally implemented by @Diggsey in a minimal Kafka client implementation Franz

kafka-protocol-rs's People

Contributors

belltoy avatar bkirwi avatar davide-baldo avatar diggsey avatar etorreborre avatar gleyba avatar hackzzila avatar iamazy avatar pdeva avatar rukai avatar thedodd avatar tychedelia avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

kafka-protocol-rs's Issues

Potential issue use of default struct as "nil" comparison value for versioned serialization

Updates to protocol added structs in responses that previously had been unnecessary for serialization logic. Because structs don't have an obvious zero value, this makes it difficult to generate an error when a struct has been erroneously modified when encoding a version that does not support that struct. To work around this, we are using Default::default as a zero value to compare if the struct has been modified, but it's unclear whether this is a safe indicator, or whether default can represent a valid state for some structs. In practice, this may not be an issue.

StrBytes / TopicName ergonomics

Overall I've found the generated message types seem quite easy to work with so far - with the exception of StrBytes wrappers like TopicName.

From what I can tell, the most straightforward way to construct a TopicName from a String is:

let bytes = Bytes::from(string);
let string = <StrBytes as string::TryFrom<Bytes>>::try_from(bytes).expect("bytes from valid string");
string.into()
  • That's a lot of code for a presumably-very-common operation. (The inverse, getting a String from a TopicName, is (**topic).to_owned()... better but still pretty obscure.)
  • This requires me to re-validate the bytes as UTF8, even though I'm constructing it from valid String data.
  • TryFrom is not the standard library implementation of TryFrom; it's a custom version in string. The issue to fix this has been open for years. This also requires clients to take an explicit dependency on string to be able to call the try_from method.

A few extra methods on / impls for TopicName would probably help with this. The first point looks like it would require calling an unsafe method on string::String, though, at which point it's probably worth just reimplementing the relevant handful of lines in this codebase and cutting out an unmaintained dep.

Thoughts on making Encoder/Decoder traits public?

I want to reimplement certain encoders/decoders to improve performance by skipping parts I dont need.
It would be nice to be able to reuse some Encoder/Decoder implementations on types like VarInt, but that is currently impossible even though VarInt is public since the Encoder and Decoder traits are private.

Any chance of them being made public?

Decoding failures

I have a basic implementation of a server which is able to successfully parse header requests; however, when attempting to proceed with parsing ApiVersionsRequests, MetadataRequests and ProduceRequests, decoding fails at various different locations for each. This is true with:

I'm happy to proceed with testing other clients, maybe a few from other languages. However, just to provide some concrete examples:

  • When attempting to decode PartitionProduceData in the impl Decodable for PartitionProduceData block, decoding fails to decode records as types::Bytes.decode(buf)?, because it attempts to parse a length first, and in the cases I've been debugging, the length includes the 4 bytes describing the length. So when this crate attempts to decode those bytes, it always sees the payload as being 4 bytes too short. I'll need to dig a bit deeper to see if there is an easy fix.
  • In another example, attempting to decode an ApiVersionsRequest, decoding fails on the client_software_version as a types::CompactString.decode(buf)?. The following line: let strbuf = StrBytes::try_from(buf.try_get_bytes((n - 1) as usize)?)?; fails, also due to a length vs buffer length mismatch.

Run Clippy (and Rustfmt?) in CI

We missed a pretty useful suggestion, noticed in #49. We should be running Clippy in our CI, and potentially rustfmt, although I know we've had questions before about whether the codegen files should be formatted since it can make reading them a bit more confusing.

Compile failed when building kafka-protocol

I want to bump kafka-protocol to 0.10 in kafkas , but compiled failed on MacOS M1.

error message as follows:

   Compiling flate2 v1.0.28
   Compiling snap v1.1.1
   Compiling url v2.5.0
error[E0658]: use of unstable library feature 'stdsimd'
  --> /Users/xxx/.cargo/registry/src/rsproxy.cn-8f6827c7555bfaf8/crc32c-0.6.5/src/hw_aarch64.rs:50:33
   |
50 |         .fold(crc, |crc, &next| simd::__crc32cb(crc, next))
   |                                 ^^^^^^^^^^^^^^^
   |
   = note: see issue #48556 <https://github.com/rust-lang/rust/issues/48556> for more information
   = help: add `#![feature(stdsimd)]` to the crate attributes to enable

error[E0658]: use of unstable library feature 'stdsimd'
  --> /Users/xxx/.cargo/registry/src/rsproxy.cn-8f6827c7555bfaf8/crc32c-0.6.5/src/hw_aarch64.rs:62:5
   |
62 |     simd::__crc32cd(crc, next)
   |     ^^^^^^^^^^^^^^^
   |
   = note: see issue #48556 <https://github.com/rust-lang/rust/issues/48556> for more information
   = help: add `#![feature(stdsimd)]` to the crate attributes to enable

error[E0635]: unknown feature `stdarch_arm_crc32`
  --> /Users/xxx/.cargo/registry/src/rsproxy.cn-8f6827c7555bfaf8/crc32c-0.6.5/src/lib.rs:23:60
   |
23 | #![cfg_attr(all(target_arch = "aarch64", nightly), feature(stdarch_arm_crc32))]
   |                                                            ^^^^^^^^^^^^^^^^^

Some errors have detailed explanations: E0635, E0658.
For more information about an error, try `rustc --explain E0635`.
error: could not compile `crc32c` (lib) due to 3 previous errors
warning: build failed, waiting for other jobs to finish...

How to fix this error?
Thanks!

encode compute_size method is very slow.

In my usage of kafka-protocol I just directly use Encodable::encode without first reserving any bytes in the BytesMut.
Given that there is an Encodable::compute_size method I figured it must be faster to first use that first and then reserve the needed space.
However doing that resulted in my benchmarks regressing!

I guess the compute_size method is just more expensive than avoiding reallocating.
I'll do a further investigation myself, as I really would like compute_size to improve performance, currently this issue is mostly a headsup to others that the problem exists.

Zero copy message decoding

This would add a next level of performance to parsing incoming Kafka requests. The main idea:

  • Request payloads would be parsed / validate in a way which is not too dissimilar to how it is currently done in this crate as of 0.8.x.
  • Instead of allocating new collections to produce owned copies of the decoded messages, instead we would produce messages which can borrow data from the backing Bytes buffer.
  • This pattern of always expecting a backing Bytes buffer will be quite nice, because then the type signatures for the zero-copy types will not need to be generic over lifetimes, instead they will simply embed the Bytes buffer.
  • There are a few more difficult patterns which we will have to tackle, indexmaps, vectors, things of that nature; however, a lot of the work could likely be amortized:
    • The zero-copy message types could embed state where needed. Offsets into the buffer. Version info. Things of that nature.
    • Amortizing lookups and offsets will be much less expensive that copying data and allocating storage.
  • BONUS: support direct mutation of data without having to copy. This would per particularly helpful in cases where record offsets need to be updated, and things of that nature.

Other projects which have explored this space:

One thing that could help bypass a lot of the difficulty with alignment and the like: just use accessors to access data. Don't attempt to build structs which are backed by the buffer. Instead, access fields of data via methods on a struct which simply embeds the Bytes buffer. Definitely still edge cases and things to work through; however, that alone will bypass a large portion of alignment issues.

Thoughts?

Fallible message builder lacks ergonomics

As far as I can tell, there is no reason for the builders to be fallible. The framework being used has this pattern in place to support things like validation and other patterns; however, none of that appears to be applicable here.

I've found myself using let mut res = ResponseMsg::default(); and then manually building the response object this way, because it is not fallible, and can be done in fewer lines of code.

IMHO, it would be a win to replace the builder framework with something non-fallible which starts with a default instance.

Thoughts?

Decode error when decoding records which sent from native kafka client

In kafkas, when I consume records which produced from native kafka client, it always got Decode Error. but everything is OK when I use Producer in kafkas.

Currently I ignore the DecodeError with a little changes like this.

    pub fn decode<B: ByteBuf>(buf: &mut B) -> Result<Vec<Record>, DecodeError> {
        let mut records = Vec::new();
        while buf.has_remaining() {
-            Self::decode_batch(buf, &mut records)?;
+            if let Err(_) = Self::decode_batch(buf, &mut records) {
+               break;
+            }
        }
        Ok(records)
    }

There seems has no record lost.

Version intersection logic generating incorrect decode

Using kafka-topics.sh --create --bootstrap-server 127.0.0.1:8844 --topic test first sends an ApiVersionsRequest, after decoding header, there is a trailing byte that isn't decoded, which causes decoding ApiVersionsRequest to error:

0x00, 0x12, 0x00, 0x03, 0x00, 0x00, 0x00, 0x65, 0x00, 0x0d, 0x61, 0x64, 0x6d, 0x69, 0x6e, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x2d, 0x31, 0x00, 0x12, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2d, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x2d, 0x6a, 0x61, 0x76, 0x61, 0x06, 0x32, 0x2e, 0x38, 0x2e, 0x30, 0x00

The second byte 0x12 is the length of the first string in ApiVersionsRequest and the beginning of the message. See here, where this is covered by existing test: https://github.com/0x1991babe/kafka-protocol-rs/blob/main/tests/api_versions.rs#L8

It seems like the extra byte should be decoded here: https://github.com/0x1991babe/kafka-protocol-rs/blob/main/src/messages/request_header.rs#L98

kafka-topics.sh is sending API version 3, so we don't hit this block.

Need to investigate, but it looks like https://github.com/0x1991babe/kafka-protocol-rs/blob/main/protocol_codegen/src/generate_messages/spec.rs#L107 should return Since(b), which would cover 2+ as designated by the spec here https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/RequestHeader.json#L25

Client examples

I'm interested in utilizing this crate but I'm curious if there are any other existing crates building out higher-level client functionality? It would be awesome if there were some examples showing how to make requests and decode the responses.

Error handling

Error handling is a bit of a mess right now. We emit two different serde errors with very little context, that also don't impl std::error::Error which makes them hard to interact with.

Codegen for encoding unknown tagged fields is incorrect

As can be seen in the code below, if version >= 1 then we encode at least one byte even if no tagged fields are present. I do believe that this is incorrect according to the spec. Unknown tagged fields should not take up any space on the wire at all if they are not being used.

impl Encodable for ResponseHeader {
    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<(), EncodeError> {
        types::Int32.encode(buf, &self.correlation_id)?;
        if version >= 1 {
            let num_tagged_fields = self.unknown_tagged_fields.len();
            if num_tagged_fields > std::u32::MAX as usize {
                error!("Too many tagged fields to encode ({} fields)", num_tagged_fields);
                return Err(EncodeError);
            }
            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;

            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
        }
        Ok(())
    }

That code should likely be updated to:

    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<(), EncodeError> {
        types::Int32.encode(buf, &self.correlation_id)?;
        if version >= 1 {
            let num_tagged_fields = self.unknown_tagged_fields.len();
            if num_tagged_fields > std::u32::MAX as usize {
                error!("Too many tagged fields to encode ({} fields)", num_tagged_fields);
                return Err(EncodeError);
            }
            if !self.unknown_tagged_fields.is_empty() {
                types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
                write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
            }
        }
        Ok(())
    }

KIP-482:

Tagged fields are always optional. When they are not present in a message, they do not take up any space.

With the current implementation shown here, they will always take up at least 1 byte, even when no tagged fields are used.

Filter out internal messages from codegen

There's a lot of messages related to Raft that are unlikely to ever be useful to someone using this crate. We should have a mechanism to filter out messages that are purely used for internal purposes within Kafka. I'm not sure if they have a document which identifies which messages could be sent to a client.

Code gen Encodable impl on ResponseKind enum

Generating an Encodable impl on the ResponseKind enum would be a nice ergonomics win. All it would do is just match over the response kind, and then call encode on one of the inner branches. NBD, but that is a lot of code which everyone won't have to write if we generate it here.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.