Giter Club home page Giter Club logo

kafka_ex's Introduction

KafkaEx

CI Tests CI Checks Coverage Status Hex.pm version Hex.pm downloads License API Docs

KafkaEx is an Elixir client for Apache Kafka with support for Kafka versions 0.8.0 and newer. KafkaEx requires Elixir 1.6+ and Erlang OTP 19+.

See http://hexdocs.pm/kafka_ex/ for documentation, https://github.com/kafkaex/kafka_ex/ for code.

KafkaEx supports the following Kafka features:

  • Broker and Topic Metadata
  • Produce Messages
  • Fetch Messages
  • Message Compression with Snappy and gzip
  • Offset Management (fetch / commit / autocommit)
  • Consumer Groups
  • Topics Management (create / delete)

See Kafka Protocol Documentation and A Guide to the Kafka Protocol for details of these features.

IMPORTANT - Kayrock and The Future of KafkaEx

TL;DR:

  • This is new implementation and we need people to test it!
  • Set kafka_version: "kayrock" to use the new client implementation.
  • The new client should be compatible with existing code when used this way.
  • Many functions now support an api_version parameter, see below for details, e.g., how to store offsets in Kafka instead of Zookeeper.
  • Version 1.0 of KafkaEx will be based on Kayrock and have a cleaner API - you can start testing this API by using modules from the KafkaEx.New namespace. See below for details.
  • Version 0.11.0+ of KafkaEx is required to use Kayrock.

To support some oft-requested features (offset storage in Kafka, message timestamps), we have integrated KafkaEx with Kayrock which is a library that handles serialization and deserialization of the Kafka message protocol in a way that can grow as Kafka does.

Unfortunately, the existing KafkaEx API is built in such a way that it doesn't easily support this growth. This, combined with a number of other existing warts in the current API, has led us to the conclusion that v1.0 of KafkaEx should have a new and cleaner API.

The path we have planned to get to v1.0 is:

  1. Add a Kayrock compatibility layer for the existing KafkaEx API (DONE, not released).
  2. Expose Kayrock's API versioning through a select handful of KafkaEx API functions so that users can get access to the most-requested features (e.g., offset storage in Kafka and message timestamps) (DONE, not released).
  3. Begin designing and implementing the new API in parallel in the KafkaEx.New namespace (EARLY PROGRESS).
  4. Incrementally release the new API alongside the legacy API so that early adopters can test it.
  5. Once the new API is complete and stable, move it to the KafkaEx namespace (i.e., drop the New part) and it will replace the legacy API. This will be released as v1.0.

Users of KafkaEx can help a lot by testing the new code. At first, we need people to test the Kayrock-based client using compatibility mode. You can do this by simply setting kafka_version: "kayrock" in your configuration. That should be all you need to change. If you want to test new features enabled by api_versions options then that is also very valuable to us (see below for links to details). Then, as work on the new API ramps up, users can contribute feedback to pull requests (or even contribute pull requests!) and test out the new API as it becomes available.

For more information on using the Kayrock-based client, see

For more information on the v1.0 API, see

Using KafkaEx in an Elixir project

The standard approach for adding dependencies to an Elixir application applies: add KafkaEx to the deps list in your project's mix.exs file. You may also optionally add snappyer (required only if you want to use snappy compression).

# mix.exs
defmodule MyApp.Mixfile do
  # ...

  defp deps do
    [
      # add to your existing deps
      {:kafka_ex, "~> 0.11"},
      # If using snappy-erlang-nif (snappy) compression
      {:snappy, git: "https://github.com/fdmanana/snappy-erlang-nif"}
      # if using snappyer (snappy) compression
      {:snappyer, "~> 1.2"}
    ]
  end
end

Then run mix deps.get to fetch dependencies.

Configuration

See config/config.exs or KafkaEx.Config for a description of configuration variables, including the Kafka broker list and default consumer group.

You can also override options when creating a worker, see below.

Timeouts with SSL

When using certain versions of OTP, random timeouts can occur if using SSL.

Impacted versions:

  • OTP 21.3.8.1 -> 21.3.8.14
  • OTP 22.1 -> 22.3.1

Upgrade respectively to 21.3.8.15 or 22.3.2 to solve this.

Usage Examples

Consumer Groups

To use a consumer group, first implement a handler module using KafkaEx.GenConsumer.

defmodule ExampleGenConsumer do
  use KafkaEx.GenConsumer

  alias KafkaEx.Protocol.Fetch.Message

  require Logger

  # note - messages are delivered in batches
  def handle_message_set(message_set, state) do
    for %Message{value: message} <- message_set do
      Logger.debug(fn -> "message: " <> inspect(message) end)
    end
    {:async_commit, state}
  end
end

Then add a KafkaEx.ConsumerGroup to your application's supervision tree and configure it to use the implementation module.

See the KafkaEx.GenConsumer and KafkaEx.ConsumerGroup documentation for details.

Create a KafkaEx Worker

KafkaEx worker processes manage the state of the connection to the Kafka broker.

iex> KafkaEx.create_worker(:pr) # where :pr is the process name of the created worker
{:ok, #PID<0.171.0>}

With custom options:

iex> uris = [{"localhost", 9092}, {"localhost", 9093}, {"localhost", 9094}]
[{"localhost", 9092}, {"localhost", 9093}, {"localhost", 9094}]
iex> KafkaEx.create_worker(:pr, [uris: uris, consumer_group: "kafka_ex", consumer_group_update_interval: 100])
{:ok, #PID<0.172.0>}

Create an unnamed KafkaEx worker

You may find you want to create many workers, say in conjunction with a poolboy pool. In this scenario you usually won't want to name these worker processes.

To create an unnamed worked with create_worker:

iex> KafkaEx.create_worker(:no_name) # indicates to the server process not to name the process
{:ok, #PID<0.171.0>}

Use KafkaEx with a pooling library

Note that KafkaEx has a supervisor to manage its workers. If you are using Poolboy or a similar library, you will want to manually create a worker so that it is not supervised by KafkaEx.Supervisor. To do this, you will need to call:

GenServer.start_link(KafkaEx.Config.server_impl,
  [
    [uris: KafkaEx.Config.brokers(),
     consumer_group: Application.get_env(:kafka_ex, :consumer_group)],
    :no_name
  ]
)

Alternatively, you can call

KafkaEx.start_link_worker(:no_name)

Retrieve kafka metadata

For all metadata

iex> KafkaEx.metadata
%KafkaEx.Protocol.Metadata.Response{brokers: [%KafkaEx.Protocol.Metadata.Broker{host:
 "192.168.59.103",
   node_id: 49162, port: 49162, socket: nil}],
 topic_metadatas: [%KafkaEx.Protocol.Metadata.TopicMetadata{error_code: :no_error,
   partition_metadatas: [%KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error,
     isrs: [49162], leader: 49162, partition_id: 0, replicas: [49162]}],
   topic: "LRCYFQDVWUFEIUCCTFGP"},
  %KafkaEx.Protocol.Metadata.TopicMetadata{error_code: :no_error,
   partition_metadatas: [%KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error,
     isrs: [49162], leader: 49162, partition_id: 0, replicas: [49162]}],
   topic: "JSIMKCLQYTWXMSIGESYL"},
  %KafkaEx.Protocol.Metadata.TopicMetadata{error_code: :no_error,
   partition_metadatas: [%KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error,
     isrs: [49162], leader: 49162, partition_id: 0, replicas: [49162]}],
   topic: "SCFRRXXLDFPOWSPQQMSD"},
  %KafkaEx.Protocol.Metadata.TopicMetadata{error_code: :no_error,
...

For a specific topic

iex> KafkaEx.metadata(topic: "foo")
%KafkaEx.Protocol.Metadata.Response{brokers: [%KafkaEx.Protocol.Metadata.Broker{host: "192.168.59.103",
   node_id: 49162, port: 49162, socket: nil}],
 topic_metadatas: [%KafkaEx.Protocol.Metadata.TopicMetadata{error_code: :no_error,
   partition_metadatas: [%KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error,
     isrs: [49162], leader: 49162, partition_id: 0, replicas: [49162]}],
   topic: "foo"}]}

Retrieve offset from a particular time

Kafka will get the starting offset of the log segment that is created no later than the given timestamp. Due to this, and since the offset request is served only at segment granularity, the offset fetch request returns less accurate results for larger segment sizes.

iex> KafkaEx.offset("foo", 0, {{2015, 3, 29}, {23, 56, 40}}) # Note that the time specified should match/be ahead of time on the server that kafka runs
[%KafkaEx.Protocol.Offset.Response{partition_offsets: [%{error_code: :no_error, offset: [256], partition: 0}], topic: "foo"}]

Retrieve the latest offset

iex> KafkaEx.latest_offset("foo", 0) # where 0 is the partition
[%KafkaEx.Protocol.Offset.Response{partition_offsets: [%{error_code: :no_error, offset: [16], partition: 0}], topic: "foo"}]

Retrieve the earliest offset

iex> KafkaEx.earliest_offset("foo", 0) # where 0 is the partition
[%KafkaEx.Protocol.Offset.Response{partition_offsets: [%{error_code: :no_error, offset: [0], partition: 0}], topic: "foo"}]

Fetch kafka logs

NOTE You must pass auto_commit: false in the options for fetch/3 when using Kafka < 0.8.2 or when using :no_consumer_group.

iex> KafkaEx.fetch("foo", 0, offset: 5) # where 0 is the partition and 5 is the offset we want to start fetching from
[%KafkaEx.Protocol.Fetch.Response{partitions: [%{error_code: :no_error,
     hw_mark_offset: 115,
     message_set: [
      %KafkaEx.Protocol.Fetch.Message{attributes: 0, crc: 4264455069, key: nil, offset: 5, value: "hey"},
      %KafkaEx.Protocol.Fetch.Message{attributes: 0, crc: 4264455069, key: nil, offset: 6, value: "hey"},
      %KafkaEx.Protocol.Fetch.Message{attributes: 0, crc: 4264455069, key: nil, offset: 7, value: "hey"},
      %KafkaEx.Protocol.Fetch.Message{attributes: 0, crc: 4264455069, key: nil, offset: 8, value: "hey"},
      %KafkaEx.Protocol.Fetch.Message{attributes: 0, crc: 4264455069, key: nil, offset: 9, value: "hey"}
...], partition: 0}], topic: "foo"}]

Produce kafka logs

iex> KafkaEx.produce("foo", 0, "hey") # where "foo" is the topic and "hey" is the message
:ok

Stream kafka logs

See the KafkaEx.stream/3 documentation for details on streaming.

iex> KafkaEx.produce("foo", 0, "hey")
:ok
iex> KafkaEx.produce("foo", 0, "hi")
:ok
iex> KafkaEx.stream("foo", 0, offset: 0) |> Enum.take(2)
[%{attributes: 0, crc: 4264455069, key: nil, offset: 0, value: "hey"},
 %{attributes: 0, crc: 4251893211, key: nil, offset: 1, value: "hi"}]

For Kafka < 0.8.2 the stream/3 requires auto_commit: false

iex> KafkaEx.stream("foo", 0, offset: 0, auto_commit: false) |> Enum.take(2)

Compression

Snappy and gzip compression is supported. Example usage for producing compressed messages:

message1 = %KafkaEx.Protocol.Produce.Message{value: "value 1"}
message2 = %KafkaEx.Protocol.Produce.Message{key: "key 2", value: "value 2"}
messages = [message1, message2]

#snappy
produce_request = %KafkaEx.Protocol.Produce.Request{
  topic: "test_topic",
  partition: 0,
  required_acks: 1,
  compression: :snappy,
  messages: messages}
KafkaEx.produce(produce_request)

#gzip
produce_request = %KafkaEx.Protocol.Produce.Request{
  topic: "test_topic",
  partition: 0,
  required_acks: 1,
  compression: :gzip,
  messages: messages}
KafkaEx.produce(produce_request)

Compression is handled automatically on the consuming/fetching end.

Testing

It is strongly recommended to test using the Dockerized test cluster described below. This is required for contributions to KafkaEx.

NOTE You may have to run the test suite twice to get tests to pass. Due to asynchronous issues, the test suite sometimes fails on the first try.

Dockerized Test Cluster

Testing KafkaEx requires a local SSL-enabled Kafka cluster with 3 nodes: one node listening on each port 9092, 9093, and 9093. The easiest way to do this is using the scripts in this repository that utilize Docker and Docker Compose (both of which are freely available). This is the method we use for our CI testing of KafkaEx.

To launch the included test cluster, run

./scripts/docker_up.sh

The docker_up.sh script will attempt to determine an IP address for your computer on an active network interface.

The test cluster runs Kafka 0.11.0.1.

Running the KafkaEx Tests

The KafkaEx tests are split up using tags to handle testing multiple scenarios and Kafka versions.

Unit tests

These tests do not require a Kafka cluster to be running (see test/test_helper.exs:3 for the tags excluded when running this).

mix test --no-start

Integration tests

If you are not using the Docker test cluster, you may need to modify config/config.exs for your set up.

The full test suite requires Kafka 2.1.0+.

Kafka >= 0.9.0

The 0.9 client includes functionality that cannot be tested with older clusters.

./scripts/all_tests.sh
Kafka = 0.9.0

The 0.9 client includes functionality that cannot be tested with older clusters.

mix test --include integration --include consumer_group --include server_0_p_9_p_0
Kafka >= 0.8.2 and < 0.9.0

Kafka 0.8.2 introduced the consumer group API.

mix test --include consumer_group --include integration
Kafka < 0.8.2

If your test cluster is older, the consumer group tests must be omitted.

mix test --include integration --include server_0_p_8_p_0

Static analysis

mix dialyzer

Contributing

All contributions are managed through the kafkaex github repo.

If you find a bug or would like to contribute, please open an issue or submit a pull request. Please refer to CONTRIBUTING.md for our contribution process.

KafkaEx has a Slack channel: #kafkaex on elixir-lang.slack.com. You can request an invite via http://bit.ly/slackelixir. The Slack channel is appropriate for quick questions or general design discussions. The Slack discussion is archived at http://slack.elixirhq.com/kafkaex.

Default snappy algorithm use snappyer package

It can be changed to snappy by using this:

config :kafka_ex, snappy_module: :snappy

Snappy erlang nif is deprecated and will be dropped 1.0.0 release.

kafka_ex's People

Contributors

argonus avatar b1az avatar bjhaid avatar braintreeps avatar cdegroot avatar comogo avatar dantswain avatar davidrusu avatar dcuddeback avatar divtxt avatar eduardoghdez avatar efcasado avatar eloraburns avatar gerbal avatar getong avatar habutre avatar hairyhum avatar hassox avatar ibizaman avatar jacklund avatar jbruggem avatar joshuawscott avatar kianmeng avatar localshred avatar markusfeyh avatar mjparrott avatar thiamsantos avatar ukrbublik avatar vitortrin avatar zvkemp avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka_ex's Issues

A flag to disable this app while testing other applications

I don't mean tests for this project. I mean projects that have this application as a dependency.

I tried running mix test --no-start in my Phoenix project's directory, but I got:

$ mix test --no-start                                                                                                                                                                                      
** (exit) exited in: GenServer.call(Gide.Repo.Pool, {:query, :begin, &Gide.Repo.log/1, [timeout: 5000]}, 5000)
    ** (EXIT) no process
    (elixir) lib/gen_server.ex:356: GenServer.call/3
    (ecto) lib/ecto/adapters/sql.ex:391: Ecto.Adapters.SQL.test_transaction/4
    (elixir) lib/code.ex:307: Code.require_file/2
    (elixir) lib/enum.ex:537: Enum."-each/2-lists^foreach/1-0-"/2
    (elixir) lib/enum.ex:537: Enum.each/2

If I try to run mix test without Kafka running, I get:

17:10:42.539 [error] Could not connect to broker "192.168.59.103" on port 9092

=INFO REPORT==== 6-Oct-2015::17:10:42 ===
    application: logger
    exited: stopped
    type: temporary
** (Mix) Could not start application kafka_ex: KafkaEx.start(:normal, []) returned an error: an exception was raised:
    ** (FunctionClauseError) no function clause matching in :gen_tcp.send/2
        (kernel) gen_tcp.erl:261: :gen_tcp.send(nil, <<0, 3, 0, 0, 0, 0, 0, 0, 0, 8, 107, 97, 102, 107, 97, 95, 101, 120, 0, 0, 0, 0>>)
        (kafka_ex) lib/kafka_ex/network_client.ex:30: KafkaEx.NetworkClient.send_sync_request/3
        (elixir) lib/enum.ex:1987: Enum.do_find_value/3
        (kafka_ex) lib/kafka_ex/server.ex:246: KafkaEx.Server.metadata/5
        (kafka_ex) lib/kafka_ex/server.ex:26: KafkaEx.Server.init/1
        (stdlib) gen_server.erl:306: :gen_server.init_it/6
        (stdlib) proc_lib.erl:237: :proc_lib.init_p_do_apply/3

It would be really cool if there was a flag I could set so I didn't have to build and run a Kafka cluster every time I test my Phoenix application.

Exception raised when producing

Hi,

I'm using kafka_ex 0.0.2 in a Phoenix app and I get an exception when I try to produce a message:

Here's the output:

iex(1)> KafkaEx.create_worker(:feeds)
{:ok, #PID<0.299.0>}

iex(3)> KafkaEx.produce("feeds", 0, "hello")
** (exit) exited in: GenServer.call(KafkaEx.Server, {:produce, "feeds", 0, "hello", nil, 0, 100}, 5000)
    ** (EXIT) an exception was raised:
        ** (FunctionClauseError) no function clause matching in :gen_tcp.send/2
            (kernel) gen_tcp.erl:261: :gen_tcp.send(nil, <<0, 0, 0, 0, 0, 0, 0, 1, 0, 8, 107, 97, 102, 107, 97, 95, 101, 120, 0, 0, 0, 0, 0, 100, 0, 0, 0, 1, 0, 5, 102, 101, 101, 100, 115, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 31, 0, 0, 0, ...>>)
            (kafka_ex) lib/kafka_ex/server.ex:126: KafkaEx.Server.send_data/2
            (kafka_ex) lib/kafka_ex/server.ex:21: KafkaEx.Server.handle_call/3
            (stdlib) gen_server.erl:607: :gen_server.try_handle_call/4
            (stdlib) gen_server.erl:639: :gen_server.handle_msg/5
            (stdlib) proc_lib.erl:237: :proc_lib.init_p_do_apply/3
    (elixir) lib/gen_server.ex:356: GenServer.call/3
[error] GenServer KafkaEx.Server terminating
Last message: {:produce, "feeds", 0, "hello", nil, 0, 100}
State: {1, %{brokers: %{0 => {"10.1.4.131", 9092}}, topics: %{"feeds" => %{error_code: 0, partitions: %{0 => %{error_code: 0, isrs: [0], leader: 0, replicas: [0]}}}}}, %{{"localhost", 9092} => #Port<0.5877>}, nil}
** (exit) an exception was raised:
    ** (FunctionClauseError) no function clause matching in :gen_tcp.send/2
        (kernel) gen_tcp.erl:261: :gen_tcp.send(nil, <<0, 0, 0, 0, 0, 0, 0, 1, 0, 8, 107, 97, 102, 107, 97, 95, 101, 120, 0, 0, 0, 0, 0, 100, 0, 0, 0, 1, 0, 5, 102, 101, 101, 100, 115, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 31, 0, 0, 0, ...>>)
        (kafka_ex) lib/kafka_ex/server.ex:126: KafkaEx.Server.send_data/2
        (kafka_ex) lib/kafka_ex/server.ex:21: KafkaEx.Server.handle_call/3
        (stdlib) gen_server.erl:607: :gen_server.try_handle_call/4
        (stdlib) gen_server.erl:639: :gen_server.handle_msg/5
        (stdlib) proc_lib.erl:237: :proc_lib.init_p_do_apply/3

Any idea what's wrong?

Fetch offset

KafkaEx.fetch fetches messages using the last_offset provided by the last_offset api. Kafka looks to be returning messages starting from that offset (as opposed to starting after that offset), meaning that fetches will always include the last message consumed. I would have expected that the consumer would only fetch new messages.

  1. Is this the expected behavior? Do other kafka libraries do this?
  2. Should my code be tracking the offsets instead?

Sync timeouts greater than 5 seconds are not useful

Calls that pass through KafkaEx.Server will time out at 5 seconds regardless of the setting of sync_timeout because of the GenServer.call's default timeout.

At first I thought we could just grab the value of sync_timeout and modify all of the GenServer.calls to have an appropriate timeout, but this gets messy if you ever want to support different timeout values to different KafkaEx.Server instances.

I can think of three basic solutions here:

  • Support a single global (still configurable) value for sync timeout. I think if you want to go this route we should remove the sync timeout value from the GenServer state b/c it isn't actually state anymore.
  • Support a per-worker timeout value. This would mean allowing the value to be passed into create_worker. It would also make modifying the GenServer.calls to support higher timeout values tricky because the timeout value would have to be tailored to each process somehow.
  • Add optional timeout arguments to API calls that interact with the GenServer processes. We could optionally pass these timeouts through to NetworkClient.

The third option could be combined with either of the first two. What do you think, @bjhaid?

Mix.exs points to the wrong repository

  defp package do
    [maintainers: ["Abejide Ayodele", "Jack Lund"],
     files: ["lib", "mix.exs", "README.md"],
     licenses: ["MIT"],
     links: %{"Github" => "https://github.com/jacklund/kafka_ex"}]
  end

Should be an easy fix, just capturing the task since I noticed it.

Multi-Partition Timeout

We're attempting to setup a group of workers to watch all of our partitions for certain events, transform them, and dump them into rethinkdb. We're running into an issue where certain workers are timing out and not being correctly restarted by the supervisor. Are we running into a bug with the supervision strategy or are we just doing this wrong? Would appreciate your assistance when you've got the time.

The code looks like this
The failure errors look like this
The observer tree looks like this

Crash when one broker in a cluster goes down

I'm doing some torture testing with my application and found this. I haven't had time to really dig into it, but it looks solvable, so I wanted to capture it here.

I run 3 brokers with replication and am testing that my consumer can work when a broker goes down. So I'm basically just killing one of the broker processes and watching what happens. I think the failure scenario depends on the state of the worker server, so this is just one possible error.

Here's the error output dump when I killed a broker:

15:13:39.009 [error] GenServer :manual_testing terminating
** (FunctionClauseError) no function clause matching in :prim_inet.setopts/2
    :prim_inet.setopts(nil, [:binary, {:packet, 4}, {:active, false}])
    (kafka_ex) lib/kafka_ex/network_client.ex:34: KafkaEx.NetworkClient.send_sync_request/3
    (elixir) lib/enum.ex:2355: Enum.do_find_value/3
    (kafka_ex) lib/kafka_ex/server.ex:237: KafkaEx.Server.update_consumer_metadata/3
    (kafka_ex) lib/kafka_ex/server.ex:335: KafkaEx.Server.offset_commit/2
    (kafka_ex) lib/kafka_ex/server.ex:325: KafkaEx.Server.fetch/8
    (kafka_ex) lib/kafka_ex/server.ex:92: KafkaEx.Server.handle_call/3
    (stdlib) gen_server.erl:629: :gen_server.try_handle_call/4
Last message: {:fetch, "manual_consumer_test", 2, 281, 10, 1, 1048576, true}
State: %KafkaEx.Server.State{brokers: [%KafkaEx.Protocol.Metadata.Broker{host: "localhost", node_id: 0, port: 9092, socket: nil}, %KafkaEx.Protocol.Metadata.Broker{host: "localhost", node_id: 1, port: 9093, socket: #Port<0.25021>}, %KafkaEx.Protocol.Metadata.Broker{host: "localhost", node_id: 0, port: 9094, socket: #Port<0.5568>}], consumer_group: "manual_test_consumer_group", consumer_group_update_interval: 30000, consumer_metadata: %KafkaEx.Protocol.ConsumerMetadata.Response{coordinator_host: "localhost", coordinator_id: 0, coordinator_port: 9092, error_code: 0}, correlation_id: 5265, event_pid: nil, metadata: %KafkaEx.Protocol.Metadata.Response{brokers: [%KafkaEx.Protocol.Metadata.Broker{host: "localhost", node_id: 2, port: 9094, socket: nil}, %KafkaEx.Protocol.Metadata.Broker{host: "localhost", node_id: 1, port: 9093, socket: nil}, %KafkaEx.Protocol.Metadata.Broker{host: "localhost", node_id: 0, port: 9092, socket: nil}], topic_metadatas: [%KafkaEx.Protocol.Metadata.TopicMetadata{error_code: 0, partition_metadatas: [%KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: 0, isrs: [2, 1], leader: 2, partition_id: 0, replicas: [0, 1, 2]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: 0, isrs: [2, 1], leader: 2, partition_id: 6, replicas: [0, 1, 2]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: 0, isrs: [2, 1], leader: 2, partition_id: 24, replicas: [0, 1, 2]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: 0, isrs: [2, 1], leader: 2, partition_id: 33, replicas: [0, 2, 1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: 0, isrs: [2, 1], leader: 2, partition_id: 42, replicas: [0, 1, 2]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: 0, isrs: [2, 1], leader: 2, partition_id: 15, replicas: [0, 2, 1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: 0, isrs: [2, 1], leader: 2, partition_id: 39, replicas: [0, 2, 1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: 0, isrs: [2, 1], leader: 2, partition_id: 30, replicas: [0, 1, 2]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: 0, isrs: [2, 1], leader: 2, partition_id: 12, replicas: [0, 1, 2]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: 0, isrs: [2, 1], leader: 2, partition_id: 3, replicas: [0, 2, 1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: 0, isrs: [2, 1], leader: 2, partition_id: 48, replicas: [0, 1, 2]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: 0, isrs: [2, 1], leader: 2, partition_id: 21, replicas: [0, 2, 1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: 0, isrs: [2, 1], leader: 2, partition_id: 18, replicas: [0, 1, 2]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: 0, isrs: [2, 1], leader: 2, partition_id: 9, replicas: [0, 2, 1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: 0, isrs: [2, 1], leader: 2, partition_id: 27, replicas: [0, 2, 1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: 0, isrs: [2, 1], leader: 2, partition_id: 36, replicas: [0, 1, 2]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: 0, isrs: [2, 1], leader: 2, partition_id: 45, replicas: [0, 2, 1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: 0, isrs: [2, 1], leader: 1, partition_id: 19, replicas: [1, 2, 0]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: 0, isrs: [2, 1], leader: 1, partition_id: 28, replicas: [1, 0, 2]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: 0, isrs: [2, 1], leader: 1, partition_id: 1, replicas: [1, 2, 0]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: 0, isrs: [2, 1], leader: 1, partition_id: 37, replicas: [1, 2, 0]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: 0, isrs: [2, 1], leader: 1, partition_id: 10, replicas: [1, 0, 2]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: 0, isrs: [2, 1], leader: 1, partition_id: 34, replicas: [1, 0, 2]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: 0, isrs: [2, 1], leader: 1, partition_id: 25, replicas: [1, 2, 0]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: 0, isrs: [2, 1], leader: 1, partition_id: 43, replicas: [1, 2, 0]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: 0, isrs: [2, 1], leader: 1, partition_id: 7, replicas: [1, 2, 0]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: 0, isrs: [2, 1], leader: 1, partition_id: 16, replicas: [1, 0, 2]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: 0, isrs: [2, 1], leader: 1, partition_id: 31, replicas: [1, 2, 0]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: 0, isrs: [2, 1], leader: 1, partition_id: 22, replicas: [1, 0, 2]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: 0, isrs: [2, 1], leader: 1, partition_id: 4, replicas: [1, 0, 2]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: 0, isrs: [2, 1], leader: 1, partition_id: 13, replicas: [1, 2, 0]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: 0, isrs: [2, 1], leader: 1, partition_id: 49, replicas: [1, 2, 0]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: 0, isrs: [2, 1], leader: 1, partition_id: 40, replicas: [1, 0, ...]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: 0, isrs: [2, 1], leader: 1, partition_id: 46, replicas: [1, ...]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: 0, isrs: [2, 1], leader: 2, partition_id: 14, ...}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: 0, isrs: [2, ...], leader: 2, ...}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: 0, isrs: [...], ...}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: 0, ...}, %KafkaEx.Protocol.Metadata.PartitionMetadata{...}, ...], topic: "__consumer_offsets"}, %KafkaEx.Protocol.Metadata.TopicMetadata{error_code: 0, partition_metadatas: [%KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: 0, isrs: [2], leader: 2, partition_id: 0, replicas: [0, 2]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: 0, isrs: [2], leader: 2, partition_id: 6, replicas: [0, 2]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: 0, isrs: [1], leader: 1, partition_id: 3, replicas: [0, 1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: 0, isrs: [1], leader: 1, partition_id: 9, replicas: [0, 1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: 0, isrs: [1], leader: 1, partition_id: 1, replicas: [1, 0]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: 0, isrs: [2, 1], leader: 1, partition_id: 10, replicas: [1, 2]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: 0, isrs: [1], leader: 1, partition_id: 7, replicas: [1, 0]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: 0, isrs: [2, 1], leader: 1, partition_id: 4, replicas: [1, 2]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: 0, isrs: [2], leader: 2, partition_id: 5, replicas: [2, 0]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: 0, isrs: [2], leader: 2, partition_id: 2, replicas: [2, 1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: 0, isrs: [2], leader: 2, partition_id: 11, replicas: [2, 0]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: 0, isrs: [2], leader: 2, partition_id: 8, replicas: [2, 1]}], topic: "manual_consumer_test"}]}, metadata_update_interval: 30000, sync_timeout: 1000, worker_name: :manual_testi (truncated)

Snappy not handling chunked compression data

When the message size gets over around 1kb, snappy uses chunked compression and the decompress operation is not working correctly.

I should have a patch for this within the next day or so (it's blocking me at work).

When fetching: `auto_commit` should default to false if `:no_consumer_group` is set

If I have consumer_group: :no_consumer_group then I get errors when fetching from a topic:

KafkaEx.fetch("foo", 0, offset: 0)
** (exit) exited in: GenServer.call(KafkaEx.Server, {:fetch, "foo", 0, 0, 10, 1, 1000000, true}, 5000)
    ** (EXIT) an exception was raised:
        ** (MatchError) no match of right hand side value: false
            (kafka_ex) lib/kafka_ex/server.ex:91: KafkaEx.Server.handle_call/3
            (stdlib) gen_server.erl:629: :gen_server.try_handle_call/4
            (stdlib) gen_server.erl:661: :gen_server.handle_msg/5
            (stdlib) proc_lib.erl:240: :proc_lib.init_p_do_apply/3
    (elixir) lib/gen_server.ex:564: GenServer.call/3

** (MatchError) no match of right hand side value: false
    (kafka_ex) lib/kafka_ex/server.ex:91: KafkaEx.Server.handle_call/3
    (stdlib) gen_server.erl:629: :gen_server.try_handle_call/4
    (stdlib) gen_server.erl:661: :gen_server.handle_msg/5
    (stdlib) proc_lib.erl:240: :proc_lib.init_p_do_apply/3
Last message: {:fetch, "foo", 0, 0, 10, 1, 1000000, true}
State: %KafkaEx.Server.State{brokers: [%KafkaEx.Protocol.Metadata.Broker{host: "docker", node_id: 0, port: 9092, socket: #Port<0.5371>}], consumer_group: :no_consumer_group, consumer_group_update_interval: 30000, consumer_metadata: %KafkaEx.Protocol.ConsumerMetadata.Response{coordinator_host: "", coordinator_id: 0, coordinator_port: 0, error_code: 0}, correlation_id: 1, event_pid: nil, metadata: %KafkaEx.Protocol.Metadata.Response{brokers: [%KafkaEx.Protocol.Metadata.Broker{host: "docker", node_id: 1001, port: 9092, socket: nil}], topic_metadatas: [%KafkaEx.Protocol.Metadata.TopicMetadata{error_code: 0, partition_metadatas: [%KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: 0, isrs: [1001], leader: 1001, partition_id: 0, replicas: [1001]}], topic: "commitlog"}, %KafkaEx.Protocol.Metadata.TopicMetadata{error_code: 0, partition_metadatas: [%KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: 0, isrs: [1001], leader: 1001, partition_id: 0, replicas: [1001]}], topic: "foo"}]}, metadata_update_interval: 30000, sync_timeout: 1000, worker_name: KafkaEx.Server}

The problem is that handle_call/3 assumes clients are in a consumer group. Fetching works fine with auto_commit: false explicitly set in the call:

KafkaEx.fetch("foo", 0, offset: 0, auto_commit: false)

It'd be nice for fetch to check if :no_consumer_group is set for the consumer group id and then automatically set auto_commit: false.

CI?

Is there CI set up for this project? The github readme displays that the build is passing even though some tests are failing (in consumer group)

Consuming unstable when broker in cluster goes down

I am currently evaluating kafka for internal purposes.

I set up a kafka cluster with 3 nodes (kafka.1, kafka.2 and kafka.3). the topic (name: "topic") I am subscribing to has 3 partitions and is replicated over 3 nodes:

$ /opt/kafka/bin/kafka-topics.sh --describe --zookeeper $KAFKA_ZOOKEEPER_CONNECT --topic topic
Topic:topic  PartitionCount:3        ReplicationFactor:3     Configs:
        Topic: topic Partition: 0    Leader: 3       Replicas: 3,1,2 Isr: 2,3,1
        Topic: topic Partition: 1    Leader: 1       Replicas: 1,2,3 Isr: 2,3,1
        Topic: topic Partition: 2    Leader: 2       Replicas: 2,3,1 Isr: 2,3,1

It's all great I am subscribing to the topic in a very simple way (endless loop, fetch all the time):

defp fetch_loop(topic, partition, offset) do
  new_offset = KafkaEx.fetch(topic, partition, offset: offset) |> handle_batch
  persist_offset(new_offset)
  fetch_loop(topic, partition, new_offset)
end

This works fine.

But now I also wanted to test what happens if one of the nodes goes down. In theory that should not matter as the cluster will redistribute the leaders.

However my script will crash and display this message:

16:02:39.053 [error] Sending data to broker "kafka.3", 9094 failed with :timeout

16:02:39.054 [debug] Shutting down worker KafkaEx.Server

16:02:39.081 [debug] Succesfully connected to "kafka" on port 9092
** (exit) exited in: GenServer.call(KafkaEx.Server, {:fetch, "topic", 2, 63194, 10, 1, 1000000, true}, 5000)
    ** (EXIT) an exception was raised:
        ** (FunctionClauseError) no function clause matching in KafkaEx.Protocol.Fetch.parse_response/1
            (kafka_ex) lib/kafka_ex/protocol/fetch.ex:18: KafkaEx.Protocol.Fetch.parse_response(nil)
            (kafka_ex) lib/kafka_ex/server.ex:324: KafkaEx.Server.fetch/8
            (kafka_ex) lib/kafka_ex/server.ex:92: KafkaEx.Server.handle_call/3
            (stdlib) gen_server.erl:629: :gen_server.try_handle_call/4
            (stdlib) gen_server.erl:661: :gen_server.handle_msg/5
            (stdlib) proc_lib.erl:240: :proc_lib.init_p_do_apply/3
    (elixir) lib/gen_server.ex:564: GenServer.call/3
    (credit) lib/script.ex:12: Credit.fetch_loop/3
    (elixir) lib/code.ex:363: Code.require_file/2
    (mix) lib/mix/tasks/run.ex:68: Mix.Tasks.Run.run/1
    (mix) lib/mix/cli.ex:58: Mix.CLI.run_task/2

Even when I restart my script I'm running in this error eventually. Only when I start the node again the script runs reliable.

Shouldn't the fetch message just return a Protocoll.Message with a specific error code (if at all) and not raise an exception?

kafka_ex server refactor

Motivation

The consumer_group API (storing of offsets) in Kafka's API has changed continuously in versions on Kafka since the Kafka core team decided to stop storing offsets in zookeeper. This change in API leaves a client implementer with either adding new features and not supporting older versions of Kafka, or codifying backwards compatibility and not breaking the client for folks who can't afford an upgrade.

I have decided that the later option is better allowing folks who can't afford a Kafka upgrade to continuously enjoy using newer versions of kafka_ex (which can include bug fixes for parts that affects them), while folks who run the latest version of Kafka get the latest features we implement.

Proposed Solution

Create a server for each version (or range of versions with similar API) removing consumer_group related code completely from server for kafka < 0.8.2, move the current consumer_group logic into 0.8.{2,3} server and open the way for >= 0.9 implementations (via their own servers), the user supplies the version in configuration at worker creation we spawn a server matching their config. When they attempt an operation not supported for the version of kafka they configured we should raise an exception and crash the worker.

There is a WIP branch which should serve has a base for the proposed solution at ee89d66

Producer/Consumer partition assignments

I am not sure if this is really an issue, but I would like to discuss how the partition assignments are handled with this library. Both KafkaEx.produce and KafkaEx.fetch require the partition number to perform the request.

For example: If i have a topic with 20 partitions and 2 consumers belonging to the same consumer_group: It is expected that each consumer will consume 10 partitions each. It seems the library do not offer this level of abstraction yet.

Do you plan on adding this logic of partition assignment later on? Or maybe Kafka 0.9.x will resolve this problem?

At the moment I am using Kafka 0.8.2.1 and performing the partition assignment using Redis (ensuring message being read at least and at most once). If you are interested by the logic I could either :

  • create a wrapper for the kafka_ex library
  • integrate the logic in a pull request to this project (maybe with an optional dependency for redis lib)

Allow init arguments for stream handler

It would be awesome if we could give custom arguments to GenEvent handlers started by calling stream.

This way I could add an initial state for example, or a callback function to be called by a custom handler.

Would you accept a PR doing that ?

Snappy not resolving as a dependency

With a new project and

  def application do
    [applications: [:logger, :kafka_ex]]
  end

  defp deps do
    [{:kafka_ex, "~> 0.2"}]
  end
end

in the mix.exs file, I do a mix deps.get, mix deps.compile, and iex fails with

** (Mix) Could not start application snappy: could not find application file: snappy.app

Inspection of mix.lock shows that snappy was not resolved as a dependency.

I'm not sure why this is happening - I put snappy in the KafkaEx mix.exs. Manually adding snappy as a dependency to the downstream project should resolve it in an emergency, but this should be fixed :(

Failing test for disable_default_worker

When you run mix test --no-start one of the tests fails.

  1) test Setting disable_default_worker to true removes the KafkaEx.Server worker (KafkaExTest)
     test/kafka_ex_test.exs:4
     ** (MatchError) no match of right hand side value: {:error, {:not_started, :kafka_ex}}
     stacktrace:
       test/kafka_ex_test.exs:6

The reason for the failing test is that the test tries to run Application.stop(:kafka_ex) but since we run mix test with --no-start there is no :kafka_ex application to stop. I talked to @bjhaid in the kafka_ex chat room and he's cool with moving that test in with the other integration tests.

I'll try to send a PR today.

Does not work correctly when a string hostname contains four sets of numbers

I'm trying to connect over an SSH tunnel, and have told my /etc/hosts file to resolve the configured broker host ip-10-4-1-11 to resolve to 127.0.0.1. However, since KafkaEx tries to parse IP addresses, this does not work, and it tries to connect to 10.4.1.11 instead (bypassing my hosts-file).

The issue seems to be here

iex(1)> Regex.scan(~r/\d+/, "ip-10-4-1-11")
[["10"], ["4"], ["1"], ["11"]]

Fix function and variable name overlap

Subset of #100

┃ 
┃ [W] ↗ Assigned variable `metadata` has same name as a private function in the same module.
┃       lib/kafka_ex/server.ex:160:22 (KafkaEx.Server.handle_call)
┃ [W] ↗ Assigned variable `stream` has same name as a function in the same module.
┃       lib/kafka_ex.ex:282:5 (KafkaEx.stream)
┃ [W] ↗ Assigned variable `offset` has same name as a function in the same module.
┃       lib/kafka_ex.ex:272:5 (KafkaEx.stream)
┃ [W] ↗ Assigned variable `offset` has same name as a function in the same module.
┃       lib/kafka_ex.ex:267:5 (KafkaEx.stream)
┃ [W] ↗ Assigned variable `offset` has same name as a function in the same module.
┃       lib/kafka_ex.ex:168:5 (KafkaEx.fetch)
┃ [W] ↗ Assigned variable `offset` has same name as a function in the same module.
┃       lib/kafka_ex.ex:162:5 (KafkaEx.fetch)
┃ [W] ↗ Assigned variable `metadata` has same name as a private function in the same module.
┃       lib/kafka_ex/server.ex:278:22 (KafkaEx.Server.update_metadata)
┃ [W] ↗ Assigned variable `metadata` has same name as a private function in the same module.
┃       lib/kafka_ex/server.ex:53:22 (KafkaEx.Server.init)
┃ [W] ↗ Parameter `consumer_group` has same name as a function in the same module.
┃       lib/kafka_ex.ex:86:44 (KafkaEx.consumer_group_metadata)
┃ [W] ↗ Assigned variable `consumer_group` has same name as a function in the same module.
┃       lib/kafka_ex.ex:298:5 (KafkaEx.build_worker_options)

Stream consumer not starting

Do you guys have some guidelines for creating the consumer. I spotted that when i lunch a consumer directly on app start it is unable to find the topic. From what i succeed to see it might be linked to the fact that it can't find a broker. In KafkaEx.Protocol.ConsumerMetadata on line 7 the consumer_group_metadata.coordinator_host is nil

If i create a GenServer after few seconds and launch the consumer after some time i suceed to consume the messages. So should i delalay the start of the consumer or maybe it is linked to something i didn't do ? ( I basically followed the readme to create a stream consumer)

Why use worker names instead of PIDs ?

Hi,

First of all thank you for this great library!

I was wondering why do you require a name for kafka_workers? Why not being simply anonymous? KafkaEx.create_worker returns a PID after all.
This can be cumbersome for implementing a pool of workers for example. And furthermore it is considered bad practice to create a lot of atoms since they live forever in memory.

Make elixir handle kafka daemons

For now testing the app means first launching 3 separate kafka daemons by hand and also cleaning up by hand. Furthermore, there is no way to tweak the ports used by the daemons. In my case, one of the port clashes with the "production" server.

I propose to make elixir setup and monitor the kafka daemons. I don't see any cons for that apart from the coding time. The pros are:

  • dynamic configuration of ports in the test setup;
  • automatic setup of one or multiple kafka daemons;
  • having control over the lifetime of the daemon;
  • being able to clean the state of the kafka server when we want to;
  • and not the least - making it easy for users of the library to setup their own tests with a kafka daemon.

I already worked on that for my project - not finished yet - so I'm asking mainly if you are interested by a PR with this?

Use structs rather than maps for the metadata

Right now, we're using nested maps as the data structure for the metadata we get back, which directly parallels the data being returned from Kafka. We would like to, instead, return structs for the metadata, the topics, and the partitions.

Support optional startup of default worker

In my production scenario, I need to be able to run multiple consumer groups (potentially across multiple broker clusters) from the same VM.

I can probably accomplish this by breaking my consumer groups up and starting one via the default :kafka_ex worker on startup and the rest as part of my boot process. This seems klunky, though. Ideally I think one could optionally start the application with no worker servers running and then launch their own set of workers on demand and name them as desired.

I think this is also contributing to some of the problems I'm seeing with consumer group management. I think the existence of the :kafka_ex worker by default has made it easier to miss bugs in KafkaEx.Server - if we set up a test case and forget to route everything through the test worker, we could find that the default worker is handling some request that we did not expect it to handle and therefore masking what should really be a test failure.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.