Giter Club home page Giter Club logo

kafkit's Introduction

Kafkit

Kafkit helps you write Kafka producers and consumers in Python with asyncio:

  • Kafkit provides a client for the Confluent Schema Registry's HTTP API. The RegistryApi client includes both high-level methods for managing subjects and schemas in a Registry, and direct low-level access to HTTP methods (GET, POST, PUT, PATCH, and DELETE). The high-level methods use caching so you can use the client as an integral part of your application's schema management. RegistryApi is implemented around aiohttp, but since the base class is designed with a sans IO architecture, a Registry client can be implemented with any asyncio HTTP library.

  • Kafkit provides Avro message serializers and deserializers that integrate with the Confluent Schema Registry: Deserializer, Serializer, and PolySerializer.

  • The RecordNameSchemaManager is a streamlined tool for serializing messages using the schemas maintained by your app, while also integrating with the Confluent Schema Registry.

  • The kafkit.ssl module helps you connect to SSL-secured Kafka brokers.

Learn more about Kafkit at https://kafkit.lsst.io.

kafkit's People

Contributors

hhromic avatar jonathansick avatar neophile-square[bot] avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

Forkers

ulrikjohansson

kafkit's Issues

How can I help?

I like this async kafka package.

I guess I think something wrong at manager.py line 96. I changed it to schema["name"] = f'{schema["name"]}{self._suffix}'

I am a new developer, where can I create a ticket and corresponding branch if I want to help?

Are you using this in production yourself yet?

Hi!

I just stumbled over this schema-registry/serializer project b.c. I was about to implement one myself =)

Where I work we have a working serializer/deserializer talking to the confluent schema registry, but it's a bit of a hack.
The schema fetching is done separately in a startup script before starting the consumer/producer, and the schemas are put into the aiohttp app container, so we have to lug that around everywhere.

So I was wondering if this library is ready for use yet? I noticed it's very new (january of this year).

kafkit fails to register schema under new subject if schema_id already exists in cache

Hi! Thanks again for a really useful library.

I'm working on moving centrally managed schemas into the application that actually owns them, and I've stumbled on an issue with the register_schema method.

We're using avro encoding for the keys as well as the values in the kafka messages, and in this instance the key schema is just a simple string schema. All schemas owned by the application share the same key schema.

I noticed that when testing schema creation from the application using kafkit, only the key schema for the first subject is actually created in the schema registry, the others get short circuited in the register_schema method here:

# look in cache first
try:
schema_id = self.schema_cache[schema]
return schema_id

This becomes a problem when other clients rely on the key schema_id being registered under the other subject as well in the schema registry, and they now crash when they can't find a key-schema for their subjects.

Update: A quick workaround for this problem is to wipe the schema cache in the registry manually when I loop through the schemas I need to make sure exist in the confluent schema registry, like so:

registry._schema_cache = SchemaCache()

I'm happy to provide a PR with a solution when I know a bit more about the rationale and use case for short circuiting the schema registering.

Maintainership?

This is a very promising and appealing library which I'd love to make use of in my projects at work, however I notice maintainership has dropped off in the last year or so. Is this library currently maintained? Are their plans to continue maintainership?

I'm happy to help move issues along and publish updates.

Fetch Schemas from schema registry url

I have a schema registry url and I want to build kafka consumers, Currently I don't have access to the schema files.
What should I do to fetch all relevant schemas for the topics I want to listen to?
I can only spot a register method to register schemas to the registry and serialize method to serialize messages.
What should I do so that I can deserialize messages?

I am using kafkit version 0.2.1

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.