Giter Club home page Giter Club logo

brod's People

Contributors

ates avatar axs-mvd avatar belltoy avatar bjosv avatar brigadier avatar dianaolympos avatar dszoboszlay avatar eloraburns avatar epsylonix avatar id avatar indrekj avatar jesperes avatar k32 avatar kianmeng avatar kivra-pauoli avatar kjellwinblad avatar mikpe avatar onno-vos-dev avatar qzhuyan avatar robertoaloi avatar robsonpeixoto avatar serikdm avatar spencerdcarlson avatar ssepml avatar thalesmg avatar tpitale avatar urmastalimaa avatar v0idpwn avatar xxdavid avatar zmstone avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

brod's Issues

create start_client return PID

Hi
Whether it can support the start_client without parameters, start_client function returns the client PID, which will help pre-access connection pool

required_acks=0 only processes first message

Hi,

I've just started exploring the driver. I notice that when required_acks = 0 (which if I'm not mistaken basically tells kafka to not send any acknowledgments), only the first message gets processed by the consumer. All succeeding messages are not consumed.

Upon further investigation, it seems as if the driver is stuck on expecting a response or something? I'm not 100% sure as I have to delve into abit of code. But just incase you guys have any idea?

I'm basically just calling

brod:produce(brod1, ?SESSION_TOPIC, ?RANDOM_PARTITION, <<>>, J).

sync_produce will result in permanent waiting of response.

Please let me know.

Thanks!
Byron

consumer offset reset policy

when OffsetOutOfRange exception is received by the consumer, the current implementation will send #kafka_fetch_error{} to the subscriber and expect the subscriber to re-subscribe with valid offsets.

a offset_reset_policy should allow brod_consumer to reset to a valid offset without taking it to the subscriber.

Problem compiling Snappyer in v2.2.1 in Mac OS X

Hello, I'm trying to update brod to v2.2.1 and I've got error when trying to compile snappyer on Mac OS X (El Capitan). I'm using Hex in an Elixir project.

A co-worker that also uses Mac OS X has the same problem. Meanwhile, it seems that compiles perfectly in another co-worker's machine that uses Ubuntu (but our acceptance tests stop working with message parse errors ...)

With version 2.1.8 everything works perfectly.

This is the output that I got:

==> snappyer (compile)
DEPEND snappyer.d
ERLC snappyer.erl
APP snappyer.app.src
CPP snappy-sinksource.cc
clang: warning: optimization flag '-finline-functions' is not supported
CPP snappy-stubs-internal.cc
clang: warning: optimization flag '-finline-functions' is not supported
CPP snappy.cc
clang: warning: optimization flag '-finline-functions' is not supported
CPP snappyer.cc
clang: warning: optimization flag '-finline-functions' is not supported
LD snappyer.so
Undefined symbols for architecture x86_64:
"enif_alloc_binary", referenced from:
SnappyNifSink::SnappyNifSink(enif_environment_t
) in snappyer.o
SnappyNifSink::SnappyNifSink(enif_environment_t
) in snappyer.o
_snappy_compress_erl in snappyer.o
_snappy_decompress_erl in snappyer.o
"_enif_inspect_iolist_as_binary", referenced from:
_snappy_compress_erl in snappyer.o
_snappy_decompress_erl in snappyer.o
_snappy_uncompressed_length_erl in snappyer.o
_snappy_is_valid in snappyer.o
"_enif_make_atom", referenced from:
_snappy_compress_erl in snappyer.o
_snappy_decompress_erl in snappyer.o
_snappy_uncompressed_length_erl in snappyer.o
_snappy_is_valid in snappyer.o
"_enif_make_badarg", referenced from:
_snappy_compress_erl in snappyer.o
_snappy_decompress_erl in snappyer.o
_snappy_uncompressed_length_erl in snappyer.o
_snappy_is_valid in snappyer.o
"_enif_make_binary", referenced from:
_snappy_compress_erl in snappyer.o
_snappy_decompress_erl in snappyer.o
"_enif_make_existing_atom", referenced from:
_snappy_compress_erl in snappyer.o
_snappy_decompress_erl in snappyer.o
_snappy_uncompressed_length_erl in snappyer.o
_snappy_is_valid in snappyer.o
"_enif_make_tuple", referenced from:
_snappy_compress_erl in snappyer.o
_snappy_decompress_erl in snappyer.o
_snappy_uncompressed_length_erl in snappyer.o
_snappy_is_valid in snappyer.o
"_enif_make_ulong", referenced from:
_snappy_uncompressed_length_erl in snappyer.o
"enif_realloc_binary", referenced from:
SnappyNifSink::GetAppendBuffer(unsigned long, char
) in snappyer.o
SnappyNifSink::getBin() in snappyer.o
_snappy_compress_erl in snappyer.o
"_enif_release_binary", referenced from:
SnappyNifSink::~SnappyNifSink() in snappyer.o
SnappyNifSink::~SnappyNifSink() in snappyer.o
SnappyNifSink::~SnappyNifSink() in snappyer.o
snappy_compress_erl in snappyer.o
ld: symbol(s) not found for architecture x86_64
clang: error: linker command failed with exit code 1 (use -v to see invocation)
make: *
* [/Users/samuel/cfadapter/deps/snappyer/priv/snappyer.so] Error 1
ERROR: Command [compile] failed!
==> cfadapter
** (Mix) Could not compile dependency :snappyer, "/Users/samuel/.mix/rebar compile skip_deps=true deps_dir="/Users/samuel/cfadapter/build/dev/lib"" command failed. You can recompile this dependency with "mix deps.compile snappyer", update it with "mix deps.update snappyer" or clean it with "mix deps.clean snappyer"

I'm not used to NIFs in Erlang/Elixir, so I'm not sure where is the problem. Any idea???

Thank you.

maybe we need flow control of messages ?

i print process_info of one brod_sock process:

([email protected])133> erlang:process_info(list_to_pid("<0.16546.12>")).
[{current_function,{kpro,do_decode_fields,4}},
 {initial_call,{proc_lib,init_p,5}},
 {status,running},
 {message_queue_len,9672},
 {messages,[{tcp,#Port<0.1512>,
                 <<"16686B05A27C0\t93294\t-1.00\t-1.00\t60.00\t19.54\t1135087616.00\t763130112.00"...>>},
            {tcp,#Port<0.1512>,
                 <<"qqgame\tD649ADF73C9E2A681324C0A85FA2DEB8\t93294\t-1.00\t-1.00\t111.53\t2.4"...>>},
            {tcp,#Port<0.1512>,
                 <<"t\tgame_name\tagent_name\tpf_name\tpf\tstep\tmtime\ttime_diff\tstep_name"...>>},
            {tcp,#Port<0.1512>,<<"table&t_log_client_device"...>>},
            {tcp,#Port<0.1512>,
                 <<"or      \t4\t3194\t1000201977856\t505531793408\tAMD Radeon HD"...>>},
            {tcp,#Port<0.1512>,
                 <<3,31,15,0,0,255,255,255,255,0,0,1,140,...>>},
            {tcp,#Port<0.1512>,
                 <<"\tpf\tstep\tmtime\ttime_diff\tstep_name\tcversion\tfirs"...>>},
            {tcp,#Port<0.1512>,
                 <<"r_level\tmtime\tpf\taccount_name\tversion\tmin_fp"...>>},
            {tcp,#Port<0.1512>,
                 <<"81DAC5E4\t93294\t-1.00\t-1.00\t784.31\t63.67\t"...>>},
            {tcp,#Port<0.1512>,
                 <<"_id=0/dt=2016-10-11\ntable\"t_log_clie"...>>},
            {tcp,#Port<0.1512>,
                 <<"mac\tos\tdevice_name\tmemory\tremain"...>>}

the message queue is too long

the default socket options is:

SockOpts = [{active, true}, {packet, raw}, binary, {nodelay, true}],

maybe {active, false} or {active, once} is more suitable ?

feature-request: reset committed offsets for consumer groups

It would make a lot of consumer's lives easier if brod can support a offset-reset feature.
a easy way is stop all other clients, and start ONE single member to commit the prepared (CLI or file) offsets.
Or a member that keeps re-joining the group until it gets elected as group leader then do not assign any partition to other members before committing the prepared offsets.

inconsistent behaviour of brod:producer_sync/5

When no client is created:
1>brod:produce_sync(noclient, <<"Topic">>, 0, <<"Key">>, <<"Data">>).
{error,client_down}
2> brod:produce_sync(noclient, <<"Topic">>, fun(_) -> 0 end, <<"Key">>, <<"Data">>).
** exception error: bad argument
in function ets:lookup/2
called as ets:lookup(noclient,{topic_metadata,<<"Topic">>})
in call from brod_client:lookup_partitions_count_cache/2 (src/brod_client.erl, line 742)
in call from brod_client:get_partitions_count/3 (src/brod_client.erl, line 722)
in call from brod:produce/5 (src/brod.erl, line 279)
in call from brod:produce_sync/5 (src/brod.erl, line 317)

Can we have either returning {error, client_down} or crashing both cases please?
Thanks in advance

Consumer offset in kafka topic

Hi!

Would be amazing to have something like the high-level consumer that can use Kafka itself to manage offsets. Is it planned for the future?

Group subscriber supervision

I implemented a pool of group subscribers as described in the Readme (and the demo module) to consume a topic in kafka. I didn't use the topic subscribers as we can have several agents running who share the load.

What I've been seeing however is that my group subscribers are crashing because of timeouts writing the offsets to kafka (not necessarily all at once). The subscribers are never restarted, meaning that after a few timeouts I don't have any subscribers left.

I've been working with brod 2.1.6, and I saw that in brod 2.1.9 subscribers should no longer crash if they can't write their offsets to kafka. However that doesn't solve the subscribers not being restarted if they crash some other way.

As such, how should I be supervising the group subscribers to ensure they don't die permanently?

Bad Hex Package

The package on hex does not contain the dependencies: https://hex.pm/packages/brod

Dependencies must also be hex packages. Without a fix for this anyone using the hex package must manually include the 2 dependencies required by brod in their top level config.

Handle error in brod_group_coordinator commit_offsets

In brod_group_coordinator in handle_info(?LO_CMD_COMMIT_OFFSETS and in handle_call(commit_offsets you try to catch throw and stabilize.
But in do_commit_offsets_ you raise exception with erlang:error(EC), but not throw.
As a result this error is not catched and server crashes without attempt to stabilize.
Is it correct/expected behavior or we should catch this kafka error?

consumer may receive stale acks from subscriber

in some cases brod_consumer may receive stale 'ack' messages.
e.g. when a subscriber delegates 'ack's to other processes, and itself re-subscribe with a new begin_offset (which cause a buffer reset in brod_consumer).

example:
when pending-acks = [4,5,6]
the subscriber delegate the messages to another process, while the messages are being processed,
the subscriber does a new subscribe call with new begin_offset, which causes buffer reset in brod_consumer.
brod_consumer quickly consumed messages [1, 2, 3] and put to pending-acks.
if the 'ack' message for 4, 5 or 6 is received after the reset,
all newly consumed messages will be removed from pending list.

Batching happens only opportunistically

It looks like brod will try to send produce requests as quickly as possible. Notably, in the case where requiredAcks = 0 it will just send everything individually. Many other kafka libraries seem to offer something like lingerMS which lets the library batch things more effectively. Then a produce request is only set after lingerMS milliseconds have passed or the buffer gets over some predetermined size. The benefits of this are reduced network traffic and load on the brokers, and it makes the compression options much more effective.

Is this be something brod might support? It's something I could potentially work on if it turns out to be helpful for my company, but it's a slightly larger feature so I thought I'd check first that it wasn't being worked on and would potentially be accepted.

How to track the acknowledged offsets from a brod topic subscriber

Hello! I'm making a thin Elixir wrapper around brod for local Kafka work. I've got a Consumer module using brod_topic_subscriber to consume messages just fine. Right now I'm using the {:ok, :ack, state} flow so messages are acknowledged immediately after processing.

My expectation was that if something goes wrong during message processing then the given message(s) would be unacknowledged and redelivered.

That seems to be the case while my application is running. Messages fail to process and new messages add to the message set that's pending processing.

But if I restart the Elixir application then those unacknowledged messages are not reprocessed.

I've traced it to my setting [] as the uncommitted offsets in the init function of my brod topic subscriber module. Is there a way to have the topic subscriber track the offsets that have been acknowledged? Or is a topic subscribe expected to maintain its own acknowledged offsets? Do I really want to be using a group subscriber here?

gen_tcp:send return value bad match

2016-12-12 17:39:43.028 [error] <0.6283.0> CRASH REPORT Process <0.6283.0> with 0 neighbours exited with reason: no match of right hand value {error,closed} in brod_sock:handle_msg/3 line 272 in brod_sock:init/5 line 185
2016-12-12 17:39:43.030 [info] <0.2596.0> client mstream_source_kafka: payload socket down hd4.mingchao.com:9092
reason:{{badmatch,{error,closed}},[{brod_sock,handle_msg,3,[{file,"/code/git/mstream/_build/default/lib/brod/src/brod_sock.erl"},{line,272}]},{brod_sock,init,5,[{file,"/code/git/mstream/_build/default/lib/brod/src/brod_sock.erl"},{line,182}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,240}]}]}

maybe we need case clause

track consumer lag?

Is there way to track the consumer lag? Consumer responses tell you the offset high watermark, so you can subtract the that by the current offset you've seen to determine how far behind you are.

brod_demo_group_subscriber_loc has error in kafka 0.8.2

Hey, when i use brod (new, 2.0) as a consumer client to connect kafka server(0.8.2) as brod_demo_group_subscriber_loc.erl/brod_demo_group_subscriber_koc.erl

the kafka server has error :

[2016-04-25 16:01:38,744] ERROR Closing socket for /127.0.0.1 because of error (kafka.network.Processor)
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'topics': java.nio.BufferUnderflowException
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:66)
at org.apache.kafka.common.requests.JoinGroupRequest.parse(JoinGroupRequest.java:85)
at kafka.api.JoinGroupRequestAndHeader$.readFrom(JoinGroupRequestAndHeader.scala:29)
at kafka.api.RequestKeys$$anonfun$12.apply(RequestKeys.scala:50)
at kafka.api.RequestKeys$$anonfun$12.apply(RequestKeys.scala:50)
at kafka.network.RequestChannel$Request.(RequestChannel.scala:50)
at kafka.network.Processor.read(SocketServer.scala:450)
at kafka.network.Processor.run(SocketServer.scala:340)

and the client has error:

=INFO REPORT==== 25-Apr-2016::16:01:36 ===
group controller (groupId=brod-demo-group-subscriber-loc-group-id,memberId=,generation=0,pid=<5384.23225.2>):
re-joining group, reason:{sock_down,tcp_closed}** at node [email protected] **

=INFO REPORT==== 25-Apr-2016::16:01:36 ===
group controller (groupId=brod-demo-group-subscriber-loc-group-id,memberId=,generation=0,pid=<5384.23223.2>):
connected to group coordinator localhost:9092** at node [email protected] **

=INFO REPORT==== 25-Apr-2016::16:01:36 ===
group controller (groupId=brod-demo-group-subscriber-loc-group-id,memberId=,generation=0,pid=<5384.23223.2>):
failed to join group
reason:{sock_down,tcp_closed}** at node [email protected] **

i dont know the kafka protocol details so i dont know what mistake i make.
sorry for my poor english , if have some thing confusion we can communicate by chinese @ zaiming

Thanks

filter already consumed messages based on begin_offset

Compressed message sets are delivered in compressed batches to consumers.
Requesting to fetch from an offset in the middle of a compressed sets will get the whole set delivered.
This should be transparent to Brod subscribers: brod_consumer should filter the already consumed messages based on begin_offset after each new subscription.

error in brod_group_subscriber:handle_ack and else

brod: 2.2.3

after started server, i got this error:

2016-10-13 21:05:29.654 [info] <0.572.0> group coordinator (groupId=flume_hive_table_v1-mstream_source_kafka-group-id,memberId='[email protected]'/<0.572.0>-51c3b7e6-a8ab-426a-8159-e1b3c25eff7d,generation=2,pid=<0.572.0>):
assignments received:
flume_hive_table_v1:
    partition=2 begin_offset=160291
2016-10-13 21:05:29.664 [error] <0.571.0> gen_server <0.571.0> terminated with reason: no match of right hand value {error,consumer_down} in brod_group_subscriber:handle_ack/2 line 398
2016-10-13 21:05:29.670 [error] <0.571.0> CRASH REPORT Process <0.571.0> with 0 neighbours exited with reason: no match of right hand value {error,consumer_down} in brod_group_subscriber:handle_ack/2 line 398 in gen_server:terminate/7 line 826
2016-10-13 21:05:29.670 [info] <0.572.0> group coordinator (groupId=flume_hive_table_v1-mstream_source_kafka-group-id,memberId='[email protected]'/<0.572.0>-51c3b7e6-a8ab-426a-8159-e1b3c25eff7d,generation=2,pid=<0.572.0>):
leaving group, reason {{badmatch,{error,consumer_down}},[{brod_group_subscriber,handle_ack,2,[{file,"/media/psf/AllFiles/code/git/mstream/_build/default/lib/brod/src/brod_group_subscriber.erl"},{line,398}]},{brod_group_subscriber,handle_cast,2,[{file,"/media/psf/AllFiles/code/git/mstream/_build/default/lib/brod/src/brod_group_subscriber.erl"},{line,317}]},{gen_server,try_dispatch,4,[{file,"gen_server.erl"},{line,615}]},{gen_server,handle_msg,5,[{file,"gen_server.erl"},{line,681}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,240}]}]}
2016-10-13 21:05:29.670 [error] <0.572.0> gen_server <0.572.0> terminated with reason: no match of right hand value {error,consumer_down} in brod_group_subscriber:handle_ack/2 line 398
2016-10-13 21:05:29.672 [error] <0.572.0> CRASH REPORT Process <0.572.0> with 0 neighbours exited with reason: no match of right hand value {error,consumer_down} in brod_group_subscriber:handle_ack/2 line 398 in gen_server:terminate/7 line 826

several mins later , i got this:

2016-10-13 21:24:01.360 [info] <0.582.0> group coordinator (groupId=flume_hive_table_v1-mstream_source_kafka-group-id,memberId='[email protected]'/<0.582.0>-edf2c54d-0722-448e-a321-920aac7f4d87,generation=5,pid=<0.582.0>):
leaving group, reason {{{function_clause,[{gen,do_for_proc,[{down,"2016-10-13:13:23:52.214585",{already_subscribed_by,<0.573.0>}},#Fun<gen.0.132519590>],[{file,"gen.erl"},{line,252}]},{gen_server,call,3,[{file,"gen_server.erl"},{line,208}]},{brod_consumer,safe_gen_call,3,[{file,"/media/psf/AllFiles/code/git/mstream/_build/default/lib/brod/src/brod_consumer.erl"},{line,594}]},{brod_group_subscriber,'-handle_call/3-fun-0-',1,[{file,"/media/psf/AllFiles/code/git/mstream/_build/default/lib/brod/src/brod_group_subscriber.erl"},{line,306}]},{lists,foreach,2,[{file,"lists.erl"},{line,1337}]},{brod_group_subscriber,handle_call,3,[{file,"/media/psf/AllFiles/code/git/mstream/_build/default/lib/brod/src/brod_group_subscriber.erl"},{line,302}]},{gen_server,try_handle_call,4,[{file,"gen_server.erl"},{line,629}]},{gen_server,handle_msg,5,[{file,"gen_server.erl"},{line,661}]}]},{gen_server,call,[{down,"2016-10-13:13:23:52.214585",{already_subscribed_by,<0.573.0>}},{unsubscribe,<0.581.0>},infinity]}},{gen_server,call,[<0.581.0>,unsubscribe_all_partitions,infinity]}}
2016-10-13 21:24:01.361 [error] <0.582.0> gen_server <0.582.0> terminated with reason: no match of right hand value {error,{sock_down,noproc}} in brod_group_coordinator:stop_socket/1 line 505

during the error time, it seems that all subscibe cannt receive messages.

Very little high-level documentation

I'm evaluating whether I can use a range of Kafka clients for a project I'm developing, and README.md doesn't answer the following basic questions about Brod:

  • Are these bindings to a Java client or an independent implementation? (answered by looking at all of the code)
  • Does it require ZooKeeper (locally or at all)?
  • Does it support the high-level consumer or it's an equivalent of the Simple Comsumer? (Turns out simple only because there is an issue for that)
  • Does it support message batching? (If not I could probably contribute it, sounds simple to implement)

Batch producing doesn't work with compression

Having {compression, gzip} and trying to use the new batch producing API causes a badarg:


** Reason for termination == 
** {badarg,[{erlang,size,
                    [[{<<"KEY">>,
                       [<<"VALUE">>,
                        <<"VALUE">>,
                        <<"VALUE">>]}]],
                    []},
            {brod_producer,batch_size,1,
                           [{file,"src/brod_producer.erl"},{line,376}]},
            {brod_producer,'-init/1-fun-0-',3,
                           [{file,"src/brod_producer.erl"},{line,195}]},
            {brod_producer,'-init/1-fun-1-',7,
                           [{file,"src/brod_producer.erl"},{line,204}]},
            {brod_producer_buffer,do_send,3,
                                  [{file,"src/brod_producer_buffer.erl"},
                                   {line,264}]},
            {brod_producer,maybe_produce,1,
                           [{file,"src/brod_producer.erl"},{line,337}]},
            {brod_producer,handle_produce,4,
                           [{file,"src/brod_producer.erl"},{line,317}]},
            {gen_server,handle_msg,5,[{file,"gen_server.erl"},{line,599}]}]}.

I will try to fix this this week if you all don't beat me to it.

calling brod:start_client(Hosts, undefined) returns already_started

1> H = [{"localhost",9092}].
[{"localhost",9092}]
2> brod:start_client(H, undefined).
{error,{{already_started,undefined},
{child,undefined,undefined,
{brod_client,start_link,[[{"localhost",9092}],undefined,[]]},
{permanent,10},
5000,worker,
[brod_client]}}}
3> brod:start_client(H, anyclient).
ok

Add an option to subscribe on a topic

Right now we only have brod:subscribe/5 which allows users to subscribe on specific partition. And user application must also monitor brod_consumer process.

  • User's process should be able to subscribe on a topic in one call
  • User's process should not be required to monitor brod internal processes in order to work reliable

"metadata socket down" in the 2.0-dev branch

I'm trying to prototype against Kafka 0.9 using the 2.0-dev branch and I'm running into the following in erl:

> {ok, ClientPid} = brod:start_link_client([{"docker", 9092}]).
{ok,<0.35.0>}
> brod:start_producer(ClientPid, <<"test">>, []).

=ERROR REPORT==== 13-Apr-2016::16:46:40 ===
client brod_default_client metadata socket down docker:9092
Reason:{sock_down,
           {undef,
               [{kpro,next_corr_id,[0],[]},
                {brod_kafka_requests,add,2,
                    [{file,"src/brod_kafka_requests.erl"},{line,66}]},
                {brod_sock,handle_msg,3,
                    [{file,"src/brod_sock.erl"},{line,217}]},
                {brod_sock,init,5,[{file,"src/brod_sock.erl"},{line,138}]},
                {proc_lib,init_p_do_apply,3,
                    [{file,"proc_lib.erl"},{line,239}]}]}}

Just trying to fetch metadata:

> Hosts = [{"docker", 9092}].
> brod:get_metadata(Hosts).
** exception error: no match of right hand side value {error,{sock_down,noproc}}
     in function  brod_utils:get_metadata/2 (src/brod_utils.erl, line 49)

On master:

> Hosts = [{"docker", 9092}].
> brod:get_metadata(Hosts).
{ok,{metadata_response,[{broker_metadata,1001,"docker",
                                         9092}],
                       [{topic_metadata,no_error,<<"test">>,
                                        [{partition_metadata,no_error,0,1001,[1001],[1001]}]}]}}

Is there some configuration piece or intermediate command for the 2.0 version I'm missing?

Compression of messages

Hey,

Turns out brod doesnt support gzip/snappy which are part of the kafka wire protocol. I know you are heading towards 2.0. Will this be a feature in there? Do you want the code I wrote to support this?

I am looking forward to 2.0 and the high level consumer, I ended up writing quite a bit of code to do a simple consumer with the current version.

client topic producer relationship

Hello
   I asked a framework on the issue, a client, in the same topic, there can be only one producer. But in many production processes, a topic may require multiple producers, may I ask what is your good idea?

Producer leader caching

The way brod producer caches leaders leads to creating way too many sockets.

The Producer creates a proplist of [{{Topic, Partition}, brod_sock Pid} | Leaders ]

For a fairly typical case of having say 10 topics with 15 partitions each you would create 150 socket connections. Even if you have a typical 3 node kafka this ends up being 50 connections to each kafka.

We discovered this through testing brod producing to this setup

  • single kafka
  • 1 topic
  • 11 partitions
  • 10 poolboy workers each with a producer

After messing with our worker size we discovered that we can easily flood kafka with connections . After adding some debug to brod we saw that we are creating 11 sockets per producer when 1 would do.

Any thoughts?

Metadata socket down

Hi,

From what I understand, there is an occasional check for kafka metadata. However, I am getting these in the logs.

2016-05-30 08:33:49.887 [error] <0.282.0> CRASH REPORT Process <0.282.0> with 0 neighbours exited with reason: tcp_closed in brod_sock:handle_msg/3 line 214
2016-05-30 08:33:49.887 [error] <0.278.0> client brod1 metadata socket down 10.11.5.41:9092

If I'm not mistaken, tcp_closed is expected from the metadata socket after retrieval of inforight? Is this error expected as well? If this is expected, the only reason this error is appearing is because it's not properly handled?

Please enlighten me.

Thanks!

Best way to reliably connect clients for producing?

Hello!

If I create a bunch of separate processes using brod (in a riak post-commit hook specifically) I'm finding the start_link_client calls affect each other and I start getting {error,{already_started, <PID>}} responses.

I've tried simply using the PID from the error for the calls to produce, but then I get {error,{producer_down,shutdown}} when trying to produce to the PID. Presumably because the other process that started that connection is wrapping up. I get the same behavior if I give start_link_client an atom name and use that instead of the PID.

In this case I'm using only producing to Kafka. From what I see in the code it looks like the already_started response is from preparing for consumers. If that's right, is there a better way that I should be prepping a link for producing?

If not, should I be watching for the producer_down responses and trying to start a new client and producer in response?

Why I can not compile???

$ rebar3 compile
===> Verifying dependencies...
===> Fetching kafka_protocol ({pkg,<<"kafka_protocol">>,<<"0.7.3">>})
===> Download error, using cached file at c:/Users/Net/.cache/rebar3/hex/default/packages/kafka_protocol-0.7.3.tar
===> Fetching supervisor3 ({pkg,<<"supervisor3">>,<<"1.1.3">>})
===> Downloaded package, caching at c:/Users/Net/.cache/rebar3/hex/default/packages/supervisor3-1.1.3.tar
===> Fetching snappyer ({pkg,<<"snappyer">>,<<"1.1.3-1.0.4">>})
===> Downloaded package, caching at c:/Users/Net/.cache/rebar3/hex/default/packages/snappyer-1.1.3-1.0.4.tar
===> Compiling supervisor3
**===> Compiling snappyer
MAKE Version 5.2 Copyright (c) 1987, 2000 Borland
Error makefile 15: Command syntax error
Error makefile 18: Command syntax error
Error makefile 20: Command syntax error
* 3 errors during make ***
===> Hook for compile failed!

be lazy on establishing payload connections

in current (2.0-dev) implementation: if an idle payload connection is killed by kafka (or some load balancer etc.), brod_consumer will try to setup a new one after a (configurable) timeout.
this is not necessary when there is currently no subscriber listening.

it should allow subscribers to subscribe when there is no alive brod_sock (instead of return {error, no_connection}), and try to connect to the partition leader after the subscriber checks in.

brod 2.0-dev can't produce to an under-replicated topic

Setup:

  • brod: 2.0-dev / 697bbe8
  • Kafka: kafka_2.11-0.9.0.0 (2 brokers)
  • Zookeeper: 3.4.6 (3 servers)

Topic: "brod", 6 partitions, replication factor of 2.

If both Kafka brokers are up I can produce messages to the topic. However, if one of them goes down I can't:

Erlang/OTP 18 [erts-7.1] [source-2882b0c] [64-bit] [smp:8:8] [async-threads:10] [hipe] [kernel-poll:false]

Eshell V7.1  (abort with ^G)
1> f().
ok
2> Hosts = [{"kafka1.vagrant", 9092},{"kafka2.vagrant", 9092}].
[{"kafka1.vagrant",9092},{"kafka2.vagrant",9092}]
3> {ok, ClientPid} =
3>   brod:start_link_client(_ClientId  = brod_client_1,
3>                          _Endpoints = Hosts,
3>                          _Config    = [{restart_delay_seconds, 10}],
3>                          _Producers = [ { <<"brod">>
3>                                         , [ {topic_restart_delay_seconds, 10}
3>                                           , {partition_restart_delay_seconds, 2}
3>                                           , {required_acks, -1}
3>                                           ]
3>                                         }
3>                                       ]).
{ok,<0.38.0>}
4> 
=ERROR REPORT==== 2-Feb-2016::21:53:08 ===
** Generic server <0.44.0> terminating 
** Last message in was {post_init,self,brod_producers_sup,
                           {brod_producers_sup2,brod_client_1,<<"brod">>,
                               [{partition_restart_delay_seconds,2},
                                {required_acks,-1}]}}
** When Server state == {state,undefined,undefined,[],undefined,undefined,
                               undefined,[],undefined,undefined}
** Reason for termination == 
** {{badmatch,{error,9}},
    [{brod_producers_sup,post_init,1,
                         [{file,"src/brod_producers_sup.erl"},{line,93}]},
     {supervisor3,handle_info,2,[{file,"src/supervisor3.erl"},{line,717}]},
     {gen_server,try_dispatch,4,[{file,"gen_server.erl"},{line,615}]},
     {gen_server,handle_msg,5,[{file,"gen_server.erl"},{line,681}]},
     {proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,240}]}]}

Trying the same with the "new-consumer-api" branch (4edd0ec) gives me a "ReplicaNotAvailable" error:

Erlang/OTP 18 [erts-7.1] [source-2882b0c] [64-bit] [smp:8:8] [async-threads:10] [hipe] [kernel-poll:false]

Eshell V7.1  (abort with ^G)
1> f().
ok
2> Hosts = [{"kafka1.vagrant", 9092},{"kafka2.vagrant", 9092}].
[{"kafka1.vagrant",9092},{"kafka2.vagrant",9092}]
3> {ok, ClientPid} =
3>   brod:start_link_client(_ClientId  = brod_client_1,
3>                          _Endpoints = Hosts,
3>                          _Config    = [{restart_delay_seconds, 10}],
3>                          _Producers = [ { <<"brod">>
3>                                         , [ {topic_restart_delay_seconds, 10}
3>                                           , {partition_restart_delay_seconds, 2}
3>                                           , {required_acks, -1}
3>                                           ]
3>                                         }
3>                                       ]).
{ok,<0.38.0>}
4> 
=ERROR REPORT==== 2-Feb-2016::21:54:50 ===
** Generic server <0.44.0> terminating 
** Last message in was {post_init,self,brod_producers_sup,
                           {brod_producers_sup2,brod_client_1,<<"brod">>,
                               [{partition_restart_delay_seconds,2},
                                {required_acks,-1}]}}
** When Server state == {state,undefined,undefined,[],undefined,undefined,
                               undefined,[],undefined,undefined}
** Reason for termination == 
** {{badmatch,{error,'ReplicaNotAvailable'}},
    [{brod_producers_sup,post_init,1,
                         [{file,"src/brod_producers_sup.erl"},{line,93}]},
     {supervisor3,handle_info,2,[{file,"src/supervisor3.erl"},{line,717}]},
     {gen_server,try_dispatch,4,[{file,"gen_server.erl"},{line,615}]},
     {gen_server,handle_msg,5,[{file,"gen_server.erl"},{line,681}]},
     {proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,240}]}]}

brod 1.5.3 (742ef99) works.

Terminal session recording: https://asciinema.org/a/0prwuld7v6lwclpizbnn03y6n

Am I not using brod 2.0-dev correctly or is this a bug?

Topic creation/partitions

get_metadata will create a topic if it doesn't already exist, but are we missing something about how to create a certain number of partitions on this topic? Producing to a nonzero partition number generates errors.

Is there a set way to declare a partition and topic as we produce to it?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.