Giter Club home page Giter Club logo

sidekiq-backend's Introduction

karafka logo

Build Status Gem Version Join the chat at https://slack.karafka.io

About Karafka

Karafka is a Ruby and Rails multi-threaded efficient Kafka processing framework that:

# Define what topics you want to consume with which consumers in karafka.rb
Karafka::App.routes.draw do
  topic 'system_events' do
    consumer EventsConsumer
  end
end

# And create your consumers, within which your messages will be processed
class EventsConsumer < ApplicationConsumer
  # Example that utilizes ActiveRecord#insert_all and Karafka batch processing
  def consume
    # Store all of the incoming Kafka events locally in an efficient way
    Event.insert_all messages.payloads
  end
end

Karafka uses threads to handle many messages simultaneously in the same process. It does not require Rails but will integrate tightly with any Ruby on Rails applications to make event processing dead simple.

Getting started

karafka web ui

If you're entirely new to the subject, you can start with our "Kafka on Rails" articles series, which will get you up and running with the terminology and basic ideas behind using Kafka:

If you want to get started with Kafka and Karafka as fast as possible, then the best idea is to visit our Getting started guides and the example apps repository.

We also maintain many integration specs illustrating various use-cases and features of the framework.

TL;DR (1 minute from setup to publishing and consuming messages)

Prerequisites: Kafka running. You can start it by following instructions from here.

  1. Add and install Karafka:
# Make sure to install Karafka 2.4
bundle add karafka --version ">= 2.4.0"

bundle exec karafka install
  1. Dispatch a message to the example topic using the Rails or Ruby console:
Karafka.producer.produce_sync(topic: 'example', payload: { 'ping' => 'pong' }.to_json)
  1. Run Karafka server and see the consumption magic happen:
bundle exec karafka server

[86d47f0b92f7] Polled 1 message in 1000ms
[3732873c8a74] Consume job for ExampleConsumer on example started
{"ping"=>"pong"}
[3732873c8a74] Consume job for ExampleConsumer on example finished in 0ms

Want to Upgrade? LGPL is not for you? Want to help?

I also sell Karafka Pro subscriptions. It includes a commercial-friendly license, priority support, architecture consultations, enhanced Web UI and high throughput data processing-related features (virtual partitions, long-running jobs, and more).

10% of the income will be distributed back to other OSS projects that Karafka uses under the hood.

Help me provide high-quality open-source software. Please see the Karafka homepage for more details.

Support

Karafka has Wiki pages for almost everything and a pretty decent FAQ. It covers the installation, setup, and deployment, along with other useful details on how to run Karafka.

If you have questions about using Karafka, feel free to join our Slack channel.

Karafka has priority support for technical and architectural questions that is part of the Karafka Pro subscription.

sidekiq-backend's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

sidekiq-backend's Issues

Encoding::UndefinedConversionError from ASCII-8BIT to UTF-8

Hey.

We are having problems with message encoding. Seems that ruby-kafka is returning message value in ASCII-8BIT, and while it is not a problem for JSON.parse, it seems that sidekiq cannot really handle it. We have introduced interchanger as a solution which basically does force_encoding on message value

# frozen_string_literal: true

module Interchangers
  class CustomInterchanger < Karafka::Interchanger
    class << self
      def encode(params_batch)
        params_batch.to_a.tap do |batch|
          batch.each do |params|
            params['value'].force_encoding(Encoding.default_internal)
          end
        end
      end
    end
  end
end

But maybe there might be a better solution that might be built into karafka or sidekiq-backend?

Here is full stacktrace of the issue

/shared/bundle/ruby/2.5.0/gems/activesupport-5.2.1/lib/active_support/core_ext/object/json.rb line 38 in encode
/shared/bundle/ruby/2.5.0/gems/activesupport-5.2.1/lib/active_support/core_ext/object/json.rb line 38 in to_json
/shared/bundle/ruby/2.5.0/gems/activesupport-5.2.1/lib/active_support/core_ext/object/json.rb line 38 in to_json
/.rbenv/versions/2.5.1/lib/ruby/2.5.0/json/common.rb line 224 in generate
/.rbenv/versions/2.5.1/lib/ruby/2.5.0/json/common.rb line 224 in generate
/shared/bundle/ruby/2.5.0/gems/sidekiq-5.1.3/lib/sidekiq.rb line 182 in dump_json
/shared/bundle/ruby/2.5.0/gems/sidekiq-5.1.3/lib/sidekiq/client.rb line 201 in block in atomic_push
/shared/bundle/ruby/2.5.0/gems/sidekiq-5.1.3/lib/sidekiq/client.rb line 199 in map
/shared/bundle/ruby/2.5.0/gems/sidekiq-5.1.3/lib/sidekiq/client.rb line 199 in atomic_push
/shared/bundle/ruby/2.5.0/gems/sidekiq-5.1.3/lib/sidekiq/client.rb line 184 in block (2 levels) in raw_push
/shared/bundle/ruby/2.5.0/gems/redis-4.0.2/lib/redis.rb line 2362 in block in multi
/shared/bundle/ruby/2.5.0/gems/redis-4.0.2/lib/redis.rb line 45 in block in synchronize
/.rbenv/versions/2.5.1/lib/ruby/2.5.0/monitor.rb line 226 in mon_synchronize
/shared/bundle/ruby/2.5.0/gems/redis-4.0.2/lib/redis.rb line 45 in synchronize
/shared/bundle/ruby/2.5.0/gems/redis-4.0.2/lib/redis.rb line 2355 in multi
/shared/bundle/ruby/2.5.0/gems/sidekiq-5.1.3/lib/sidekiq/client.rb line 183 in block in raw_push
/shared/bundle/ruby/2.5.0/gems/connection_pool-2.2.2/lib/connection_pool.rb line 65 in block (2 levels) in with
/shared/bundle/ruby/2.5.0/gems/connection_pool-2.2.2/lib/connection_pool.rb line 64 in handle_interrupt
/shared/bundle/ruby/2.5.0/gems/connection_pool-2.2.2/lib/connection_pool.rb line 64 in block in with
/shared/bundle/ruby/2.5.0/gems/connection_pool-2.2.2/lib/connection_pool.rb line 61 in handle_interrupt
/shared/bundle/ruby/2.5.0/gems/connection_pool-2.2.2/lib/connection_pool.rb line 61 in with
/shared/bundle/ruby/2.5.0/gems/sidekiq-5.1.3/lib/sidekiq/client.rb line 182 in raw_push
/shared/bundle/ruby/2.5.0/gems/sidekiq-5.1.3/lib/sidekiq/client.rb line 74 in push
/shared/bundle/ruby/2.5.0/gems/sidekiq-5.1.3/lib/sidekiq/worker.rb line 143 in client_push
/shared/bundle/ruby/2.5.0/gems/sidekiq-5.1.3/lib/sidekiq/worker.rb line 87 in perform_async
/shared/bundle/ruby/2.5.0/gems/karafka-sidekiq-backend-1.2.0/lib/karafka/backends/sidekiq.rb line 17 in block in process
/shared/bundle/ruby/2.5.0/gems/dry-monitor-0.1.2/lib/dry/monitor/notifications.rb line 8 in measure
/shared/bundle/ruby/2.5.0/gems/dry-monitor-0.1.2/lib/dry/monitor/notifications.rb line 42 in instrument
/shared/bundle/ruby/2.5.0/gems/karafka-sidekiq-backend-1.2.0/lib/karafka/backends/sidekiq.rb line 16 in process
/shared/bundle/ruby/2.5.0/gems/karafka-1.2.4/lib/karafka/base_consumer.rb line 46 in call
/shared/bundle/ruby/2.5.0/gems/karafka-1.2.4/lib/karafka/connection/delegator.rb line 38 in block (2 levels) in call
/shared/bundle/ruby/2.5.0/gems/karafka-1.2.4/lib/karafka/connection/delegator.rb line 36 in each
/shared/bundle/ruby/2.5.0/gems/karafka-1.2.4/lib/karafka/connection/delegator.rb line 36 in block in call
/shared/bundle/ruby/2.5.0/gems/dry-monitor-0.1.2/lib/dry/monitor/notifications.rb line 8 in measure
/shared/bundle/ruby/2.5.0/gems/dry-monitor-0.1.2/lib/dry/monitor/notifications.rb line 42 in instrument
/shared/bundle/ruby/2.5.0/gems/karafka-1.2.4/lib/karafka/connection/delegator.rb line 24 in call
/shared/bundle/ruby/2.5.0/gems/karafka-1.2.4/lib/karafka/connection/listener.rb line 42 in block in fetch_loop
/shared/bundle/ruby/2.5.0/gems/karafka-1.2.4/lib/karafka/connection/client.rb line 32 in block in fetch_loop
/shared/bundle/ruby/2.5.0/gems/ruby-kafka-0.6.8/lib/kafka/consumer.rb line 224 in block (4 levels) in each_message
/shared/bundle/ruby/2.5.0/gems/activesupport-5.2.1/lib/active_support/notifications.rb line 168 in block in instrument
/shared/bundle/ruby/2.5.0/gems/activesupport-5.2.1/lib/active_support/notifications/instrumenter.rb line 23 in instrument
/shared/bundle/ruby/2.5.0/gems/activesupport-5.2.1/lib/active_support/notifications.rb line 168 in instrument
/shared/bundle/ruby/2.5.0/gems/ruby-kafka-0.6.8/lib/kafka/instrumenter.rb line 21 in instrument
/shared/bundle/ruby/2.5.0/gems/ruby-kafka-0.6.8/lib/kafka/instrumenter.rb line 35 in instrument
/shared/bundle/ruby/2.5.0/gems/ruby-kafka-0.6.8/lib/kafka/consumer.rb line 222 in block (3 levels) in each_message
/shared/bundle/ruby/2.5.0/gems/ruby-kafka-0.6.8/lib/kafka/consumer.rb line 207 in each
/shared/bundle/ruby/2.5.0/gems/ruby-kafka-0.6.8/lib/kafka/consumer.rb line 207 in block (2 levels) in each_message
/shared/bundle/ruby/2.5.0/gems/ruby-kafka-0.6.8/lib/kafka/consumer.rb line 206 in each
/shared/bundle/ruby/2.5.0/gems/ruby-kafka-0.6.8/lib/kafka/consumer.rb line 206 in block in each_message
/shared/bundle/ruby/2.5.0/gems/karafka-1.2.4/lib/karafka/patches/ruby_kafka.rb line 27 in block in consumer_loop
/shared/bundle/ruby/2.5.0/gems/ruby-kafka-0.6.8/lib/kafka/consumer.rb line 387 in block in consumer_loop
/shared/bundle/ruby/2.5.0/gems/activesupport-5.2.1/lib/active_support/notifications.rb line 170 in instrument
/shared/bundle/ruby/2.5.0/gems/ruby-kafka-0.6.8/lib/kafka/instrumenter.rb line 21 in instrument
/shared/bundle/ruby/2.5.0/gems/ruby-kafka-0.6.8/lib/kafka/instrumenter.rb line 35 in instrument
/shared/bundle/ruby/2.5.0/gems/ruby-kafka-0.6.8/lib/kafka/consumer.rb line 386 in consumer_loop
/shared/bundle/ruby/2.5.0/gems/karafka-1.2.4/lib/karafka/patches/ruby_kafka.rb line 15 in consumer_loop
/shared/bundle/ruby/2.5.0/gems/ruby-kafka-0.6.8/lib/kafka/consumer.rb line 203 in each_message
/shared/bundle/ruby/2.5.0/gems/karafka-1.2.4/lib/karafka/connection/client.rb line 32 in fetch_loop
/shared/bundle/ruby/2.5.0/gems/karafka-1.2.4/lib/karafka/connection/listener.rb line 39 in fetch_loop
/shared/bundle/ruby/2.5.0/gems/karafka-1.2.4/lib/karafka/connection/listener.rb line 23 in call
/shared/bundle/ruby/2.5.0/gems/karafka-1.2.4/lib/karafka/fetcher.rb line 19 in block (2 levels) in call 

Deprecate for 2.0

This won't be needed with multi-threaded 2.0 version. I don't see a reason to support that beyond 1.3 as perform_async isn't that big of a hustle (unless the community has valid arguments to keep it).

Consumer class names must have the word "Consumer" in it in order to work

BUG REPORT

Hi,

I found a bug when my application tries to consume a task using Sidekiq. I created an example here: https://github.com/elioncho/karafka_rails_example

The problem happens when the consumer classes do not have the word "Consumer" on their names. I have 4 consumers on my application

App.consumer_groups.draw do
  topic :consumer_is_here do
    consumer ConsumerIsHere
  end
  topic :no_magic_word do
    consumer NoMagicWord
  end
  topic :with_magic_word_consumer do
    consumer WithMagicWordConsumer
  end
  topic :without_the_magic_word do
    consumer WithoutTheMagicWord
  end
end

The 2 of them that include the word "Consumer" (ConsumerIsHere and WithMagicWordConsumer) work fine, they are executed by Sidekiq. The 2 others are not.

You can test this by doing the following:

First, launch the application

docker-compose up -d

Once all the services are running, run:

docker-compose exec app rails c

Try the following commands inside the console:

WaterDrop::AsyncProducer.call('{"message":"consumer_is_here"}',topic: "consumer_is_here")
WaterDrop::AsyncProducer.call('{"message":"no_magic_word"}',topic: "no_magic_word")
WaterDrop::AsyncProducer.call('{"message":"with_magic_word_consumer"}',topic: "with_magic_word_consumer")
WaterDrop::AsyncProducer.call('{"message":"without_the_magic_word"}',topic: "without_the_magic_word")

Open a new tab in your terminal and run:

docker-compose logs -f worker

You'll be able to see that Sidekiq only consumes the tasks whose name includes the word "Consumer".

Consider using tags/releases

Hey,

It would be great if you could use tags (or even releases) so it would be easier to review changes while updating dependency in one's projects. Tools like dependabot usually link to diff based on tags.

WDYT?

undefined method `fetch' for #<Karafka::Params::Params>

Hey! First things first: thanks for the great gem and its ecosystem! :)

Nice the last update we encounter an error while processing Kafka message via the Karafka/Sidekiq backend. Looks like the Karafka::Params::Params class does not provide a fetch method. The error is located here.

Full backtrace:

NoMethodError: undefined method `fetch' for #<Karafka::Params::Params:0x00007f8cf8cddb88>
[GEM_ROOT]/gems/karafka-sidekiq-backend-1.4.0/lib/karafka/extensions/params_builder.rb:14 :in `from_hash`
[GEM_ROOT]/gems/karafka-sidekiq-backend-1.4.0/lib/karafka/extensions/params_batch_builder.rb:14 :in `block in from_array`
[GEM_ROOT]/gems/karafka-sidekiq-backend-1.4.0/lib/karafka/extensions/params_batch_builder.rb:13 :in `map`
[GEM_ROOT]/gems/karafka-sidekiq-backend-1.4.0/lib/karafka/extensions/params_batch_builder.rb:13 :in `from_array`
[GEM_ROOT]/gems/karafka-sidekiq-backend-1.4.0/lib/karafka/base_worker.rb:51 :in `consumer`
[GEM_ROOT]/gems/karafka-sidekiq-backend-1.4.0/lib/karafka/base_worker.rb:31 :in `perform`

Versions in use:

1.4.0  karafka
1.4.0  karafka-sidekiq-backend
1.4.1  karafka-testing
6.1.2  sidekiq
1.3.0  ruby-kafka

Clarify possible interchanger values in the documentation

The documentation says:

or on a per topic level:

App.routes.draw do
  consumer_group :videos_consumer do
    topic :binary_video_details do
      consumer Videos::DetailsConsumer
      worker Workers::DetailsWorker
      interchanger Interchangers::MyCustomInterchanger
    end
  end
end

What are the possible interchanger names? What name should I provide to use sidekiq only for some topics and what name to process directly in other topics.

ApplicationWorker do not serialize to JSON safely

With Sidekiq 6.4.1, they added a deprecation warning
"Job arguments to ApplicationWorker do not serialize to JSON safely. This will raise an error in Sidekiq 7.0."
Is there a plan to get rid of that?
I know that there are no plans for further use after Karafka 2.0 but would be nice to have a solution also for users of versions below 2.0

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.