Giter Club home page Giter Club logo

grpcbox's Introduction

grpcbox

Tests codecov Hex.pm Hex.pm

Library for creating grpc services (client and server) in Erlang, based on the chatterbox http2 library.

Features

  • Unary, client stream, server stream and bidirectional rpcs
  • Client load balancing
  • Interceptors
  • Health check service
  • Reflection service
  • OpenCensus interceptors for stats and tracing
  • Plugin for generating clients and behaviour type specs for service server implementation

Implementing a Service Server

The quickest way to play around is with the test service and client that is used by grpcbox. Simply pull up a shell with, rebar3 as test shell and the route guide service will start on port 8080 and you'll have the client, routeguide_route_guide_client, in the path.

The easiest way to get started on your own project is using the plugin, grpcbox_plugin:

{deps, [grpcbox]}.

{grpc, [{protos, "protos"},
        {gpb_opts, [{module_name_suffix, "_pb"}]}]}.

{plugins, [grpcbox_plugin]}.

Currently grpcbox and the plugin are a bit picky and the gpb options will always include [use_packages, maps, {i, "."}, {o, "src"}].

Assuming the protos directory of your application has the route_guide.proto found in this repo, protos/route_guide.proto, the output from running the plugin will be:

$ rebar3 grpc gen
===> Writing src/route_guide_pb.erl
===> Writing src/grpcbox_route_guide_bhvr.erl

A behaviour is used because it provides a way to generate the interface and types without being where the actual implementation is also done. This way if a change happens to the proto you can regenerate the interface without any issues with the implementation of the service, simply then update the implementation callbacks to match the changed interface.

Runtime configuration for grpcbox can be done in sys.config, specifying the compiled proto modules to use for finding the services available, which services to actually enable for requests and what module implements them, acceptor pool and http server settings. See interop/config/sys.config for a working example.

In the interop config the portion for defining services to handle requests for is:

{grpcbox, [{servers, [#{grpc_opts => #{service_protos => [test_pb],
                                       services => #{'grpc.testing.TestService' => grpc_testing_test_service}}}]},
...

test_pb is the gpb generated module that exports get_service_names/0. The results of that function are used to construct the metadata needed for handling requests. The services map gives the module to call for handling methods of a service. If a service is not defined in that map it will result in the grpc error code 12, Unimplemented.

The services will be started when the application starts assuming the services are all configured in the sys.config and it is loaded. To manually start a service use either grpcbox:start_server/1 which will start a grpcbox_service_sup supervisor under the grpcbox_services_simple_sup simple one for one supervisor, or get a child spec grpcbox:server_child_spec(ServerOpts, GrpcOpts, ListenOpts, PoolOpts, TransportOpts) to include the service supervisor in your own supervision tree.

Unary RPC

Unary RPCs receive a single request and return a single response. The RPC GetFeature takes a single Point and returns the Feature at that point:

rpc GetFeature(Point) returns (Feature) {}

The callback generated by the grpcbox_plugin will look like:

-callback get_feature(ctx:ctx(), route_guide_pb:point()) ->
    {ok, route_guide_pb:feature(), ctx:ctx(} | grpcbox_stream:grpc_error_response().

And the implementation is as simple as an Erlang function that takes the arguments Ctx, the context of this current request, and a Point map, returning a Feature map:

get_feature(Ctx, Point) ->
    Feature = #{name => find_point(Point, data()),
                location => Point},
    {ok, Feature, Ctx}.

Streaming Output

Instead of returning a single feature the server can stream a response of multiple features by defining the RPC to have a stream Feature return:

rpc ListFeatures(Rectangle) returns (stream Feature) {}

In this case the callback still receives a map argument but also a grpcbox_stream argument:

-callback list_features(route_guide_pb:rectangle(), grpcbox_stream:t()) ->
    ok | {error, term()}.

The GrpcStream variable is passed to grpcbox_stream:send/2 for returning an individual feature over the stream to the client. The stream is ended by the server when the function completes.

list_features(_Message, GrpcStream) ->
    grpcbox_stream:send(#{name => <<"Tour Eiffel">>,
                                        location => #{latitude => 3,
                                                      longitude => 5}}, GrpcStream),
    grpcbox_stream:send(#{name => <<"Louvre">>,
                          location => #{latitude => 4,
                                        longitude => 5}}, GrpcStream),
    ok.

Streaming Input

The client can also stream a sequence of messages:

rpc RecordRoute(stream Point) returns (RouteSummary) {}

In this case the callback receives a reference() instead of a direct value from the client:

-callback record_route(reference(), grpcbox_stream:t()) ->
    {ok, route_guide_pb:route_summary()} | {error, term()}.

The process the callback is running in will receive the individual messages on the stream as tuples {reference(), route_guide_pb:point()}. The end of the stream is sent as the message {reference(), eos} at which point the function can return the response:

record_route(Ref, GrpcStream) ->
    record_route(Ref, #{t_start => erlang:system_time(1),
                            acc => []}, GrpcStream).

record_route(Ref, Data=#{t_start := T0, acc := Points}, GrpcStream) ->
    receive
        {Ref, eos} ->
            {ok, #{elapsed_time => erlang:system_time(1) - T0,
                   point_count => length(Points),
                   feature_count => count_features(Points),
                   distance => distance(Points)}, GrpcStream};
        {Ref, Point} ->
            record_route(Ref, Data#{acc => [Point | Points]}, GrpcStream)
    end.

Streaming In and Out

A bidrectional streaming RPC is defined when both input and output are streams:

rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
-callback route_chat(reference(), grpcbox_stream:t()) ->
    ok | {error, term()}.

The sequence of input messages will again be sent to the callback's process as Erlang messages and any output messages are sent to the client with grpcbox_stream:

route_chat(Ref, GrpcStream) ->
    route_chat(Ref, [], GrpcStream).

route_chat(Ref, Data, GrpcStream) ->
    receive
        {Ref, eos} ->
            ok;
        {Ref, #{location := Location} = P} ->
            Messages = proplists:get_all_values(Location, Data),
            [grpcbox_stream:send(Message, GrpcStream) || Message <- Messages],
            route_chat(Ref, [{Location, P} | Data], GrpcStream)
    end.

Interceptors

Unary Interceptor

A unary interceptor can be any function that accepts a context, decoded request body, server info map and the method function:

some_unary_interceptor(Ctx, Request, ServerInfo, Fun) ->
    %% do some interception stuff
    Fun(Ctx, Request).

The interceptor is configured in the grpc_opts set in the environment or passed to the supervisor start_child function. An example from the test suite sets grpc_opts in the application environment:

#{service_protos => [route_guide_pb],
  unary_interceptor => fun(Ctx, Req, _, Method) ->
                         Method(Ctx, #{latitude => 30,
                                       longitude => 90})
                       end}
Streaming Interceptor
Middleware

There is a provided interceptor grpcbox_chain_interceptor which accepts a list of interceptors to apply in order, with the final interceptor calling the method handler. An example from the test suite adds a trailer in each interceptor to show the chain working:

#{service_protos => [route_guide_pb],
  unary_interceptor =>
    grpcbox_chain_interceptor:unary([fun ?MODULE:one/4,
                                     fun ?MODULE:two/4,
                                     fun ?MODULE:three/4])}

Tracing

The provided interceptor grpcbox_trace supports the OpenCensus wire protocol using opencensus-erlang. It will use the trace_id, span_id and any options or tags from the trace context.

Configure as an interceptor:

#{service_protos => [route_guide_pb],
  unary_interceptor => {grpcbox_trace, unary}}

Or as a middleware in the chain interceptor:

#{service_protos => [route_guide_pb],
  unary_interceptor =>
    grpcbox_chain_interceptor:unary([..., 
                                     fun grpcbox_trace:unary/4, 
                                     ...])}

See opencensus-erlang for details on configuring reporters.

Statistics

Statistics are collected by implementing a stats handler module. A handler for OpenCensus stats (be sure to include OpenCensus as a dependency and make sure it starts on boot) is provided and can be enabled for the server with a config option:

{grpcbox, [{servers, [#{grpc_opts => #{stats_handler => grpcbox_oc_stats_handler
                                       ...}}]}]}

For the client the stats handler is a per-channel configuration, see the Defining Channels section below.

You can verify it is working by enabling the stdout exporter:

 {opencensus, [{stat, [{exporters, [{oc_stat_exporter_stdout, []}]}]}]}

For actual use, an exporter for Prometheus is available.

Details on all the metrics that are collected can be found in the OpenCensus gRPC Stats specification.

Metadata

Metadata is sent in headers and trailers.

Using a Service Client

For each service in the protos passed to rebar3 gprc gen it will generate a <service>_client module containing a function for each method in the service.

Defining Channels

Channels maintain connections to grpc servers and offer client side load balancing between servers with various methods, round robin, random, hash.

If no channel is specified in the options to a rpc call the default_channel is used. Setting the default to connect to localhost on port 8080 in your sys.config would look like:

{client, #{channels => [{default_channel, [{http, "localhost", 8080, []}], #{}}]}}

Unix sockets (UDS) may also be used with the same notation that is defined in gen_tcp. Considerations:

  • for UDS, only the http scheme is permitted
  • the port must strictly be 0
  • only available on POSIX operating systems
  • abstract UDS are only available on Linux, and such sockets' names must start with a zero byte
{client, #{channels => [{default_channel, [{http, {local, "/path/to/unix/socket_name"}, 0, []}], #{}}]}}
%% or to use an abstract Unix socket:
%% {client, #{channels => [{default_channel, [{http, {local,  [0 | "socket_name"]}, 0, []}], #{}}]}}

The empty map at the end can contain configuration for the load balancing algorithm, interceptors, statistics handling and compression:

#{balancer => round_robin | random | hash | direct | claim,
  encoding => identity | gzip | deflate | snappy | atom(),
  stats_handler => grpcbox_oc_stats_handler,
  unary_interceptor => term(),
  stream_interceptor => term()} 

The default balancer is round robin and encoding is identity (no compression). Encoding can also be passed in the options map to individual requests.

Calling Unary Client RPC

The RouteGuide service has a single unary method, GetFeature, in the client we have a function get_feature/2:

Point = #{latitude => 409146138, longitude => -746188906},
{ok, Feature, HeadersAndTrailers} = routeguide_route_guide_client:get_feature(Point).

Client Streaming RPC

{ok, S} = routeguide_route_guide_client:record_route(),
ok = grpcbox_client:send(S, #{latitude => 409146138, longitude => -746188906}),
ok = grpcbox_client:send(S, #{latitude => 234818903, longitude => -823423910}),
ok = grpcbox_client:close_send(S),
{ok, #{point_count := 2} = grpcbox_client:recv_data(S)).

Client with Server Streaming RPC

Rectangle = #{hi => #{latitude => 1, longitude => 2},
              lo => #{latitude => 3, longitude => 5}},
{ok, S} = routeguide_route_guide_client:list_features(Rectangle),
{ok, #{<<":status">> := <<"200">>}} = grpcbox_client:recv_headers(S),
{ok, #{name := _} = grpcbox_client:recv_data(S),
{ok, #{name := _}} = grpcbox_client:recv_data(S),
{ok, _} = grpcbox_client:recv_trailers(S).

Bidirectional RPC

{ok, S} = routeguide_route_guide_client:route_chat(),
ok = grpcbox_client:send(S, #{location => #{latitude => 1, longitude => 1}, message => <<"hello there">>}),
ok = grpcbox_client:send(S, #{location => #{latitude => 1, longitude => 1}, message => <<"hello there">>}),
{ok, #{message := <<"hello there">>}} = grpcbox_client:recv_data(S)),
ok = grpcbox_client:send(S, #{location => #{latitude => 1, longitude => 1}, message => <<"hello there">>}),
{ok, #{message := <<"hello there">>}}, grpcbox_client:close_and_recv(S)).

Context

Client calls optionally accept a context as the first argument. Contexts are used to set and propagate deadlines and OpenCensus tags.

Ctx = ctx:with_deadline_after(300, seconds),
Point = #{latitude => 409146138, longitude => -746188906},
{ok, Feature, HeadersAndTrailers} = routeguide_route_guide_client:get_feature(Ctx, Point).

CT Tests

To run the Common Test suite:

$ rebar3 ct

Interop Tests

The interop rebar3 profile builds with an implementation of the test.proto for grpc interop testing:

For testing grpcbox's server:

$ rebar3 as interop shell

With the shell running the tests can then be run from a script:

$ interop/run_server_tests.sh

The script by default uses the Go test client that can be installed with the following:

$ go get -u github.com/grpc/grpc-go/interop
$ go build -o $GOPATH/bin/go-grpc-interop-client github.com/grpc/grpc-go/interop/client

For testing the grpcbox client you can use the Go test server. But first, add _ "google.golang.org/grpc/encoding/gzip" to server.go imports or else the gzip tests will fail. Then simply build and run it:

$ go build -o $GOPATH/bin/go-grpc-interop-server github.com/grpc/grpc-go/interop/server
$ $GOPATH/bin/go-grpc-interop-server -port 8080

And run the interop client test suite:

rebar3 as interop ct

grpcbox's People

Contributors

albertored avatar d4no0 avatar janchochol avatar juise avatar kianmeng avatar lixen-wg2 avatar pleasantmachine9 avatar psalin avatar sebastiw avatar sergetupchiy avatar tomas-abrahamsson avatar tsloughter avatar vagabond avatar vasu-dasari avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

grpcbox's Issues

Unable to override client `content-type`

I'm attempting to use the gRPC API for Google Cloud PubSub but I continue to receive a 404 http_error. I think this is similar to this issue in the elixir-grpc library. The *.googleapis.com services expect a Content-Type of application/grpc and anything else results in a 404 Not Found.

I've tried setting the content-type in the metadata via an interceptor, but I believe it's being overridden by the default provided here.

No client stream callback for stream termination

If a stream terminates unexpectedly without END_STREAM, grpcbox does not learn about this and will leave the stream hanging without sending eos. For example if the h2_connection closes down due to socket errors, the grpcbox stream will not notice it.

GRPC Server not processing request

Hi @tsloughter ,

I am working on usage of grpcbox. i followed the implementation as mentioned. i am using unary request. and I am trying to fire the client calls using grpcurl.

But i see the first request is succesful and second request throws the below response.

`/grpcurl -v -plaintext -d '{"msg": "1"}' -import-path /home/aaaa/route/myapp/proto --proto dp_api.proto localhost:10001 dp.Route/Ping

Resolved method descriptor:
rpc Ping ( .dp.PingMsg ) returns ( .dp.PongMsg );

Request metadata to send:
(empty)

Response headers received:
content-type: application/grpc+proto
user-agent: grpc-erlang/0.1.0

Response trailers received:
(empty)
Sent 1 request and received 0 responses
ERROR:
Code: Internal
Message: stream terminated by RST_STREAM with error code: STREAM_CLOSED
`

Can u get some information, how to fix this. I am using Unary Operator

Receiving header block fragments referring to invalid indexes causes a crash

Currently, this can be reproduced by running the unary concurrency test that causes #45. This results in a couple of headers being undefined because of invalid indexes, which in turn causes the trace below:

=ERROR REPORT==== 17-Jan-2021::10:50:34.719500 ===
** State machine <0.731.0> terminating
** Last event = {cast,{recv_h,[{<<":status">>,<<"200">>},
                               undefined,undefined,
                               {<<"user-agent">>,<<"grpc-erlang/0.1.0">>}]}}
** When server state  = {half_closed_local,
                         {stream_state,117,<0.666.0>,
                          {gen_tcp,#Port<0.134>},
                          idle,
                          {[],[]},
                          [{<<":method">>,<<"POST">>},
                           {<<":path">>,
                            <<"/routeguide.RouteGuide/GetFeature">>},
                           {<<":scheme">>,<<"http">>},
                           {<<":authority">>,<<"localhost:8080">>},
                           {<<"grpc-encoding">>,<<"identity">>},
                           {<<"grpc-message-type">>,<<"routeguide.Point">>},
                           {<<"content-type">>,<<"application/grpc+proto">>},
                           {<<"user-agent">>,<<"grpc-erlang/0.9.2">>},
                           {<<"te">>,<<"trailers">>}],
                          undefined,0,false,false,[],[],undefined,false,false,
                          undefined,undefined,
                          #{buffer => <<>>,client_pid => <0.639.0>,
                            ctx =>
                             {ctx,
                              #{grpc_client_method =>
                                 <<"/routeguide.RouteGuide/GetFeature">>},
                              undefined},
                            marshal_fun =>
                             #Fun<routeguide_route_guide_client.0.7492253>,
                            path => <<"/routeguide.RouteGuide/GetFeature">>,
                            service => 'routeguide.RouteGuide',stats => #{},
                            stats_handler => undefined,stream_id => 117,
                            unmarshal_fun =>
                             #Fun<routeguide_route_guide_client.1.7492253>},
                          grpcbox_client_stream,client}}
** Reason for termination = error:function_clause
** Callback mode = state_functions
** Stacktrace =
**  [{h2_stream,'-no_upper_names/1-fun-0-',
                [undefined],
                [{file,"grpcbox/_build/default/lib/chatterbox/src/h2_stream.erl"},
                 {line,828}]},
     {lists,all,2,[{file,"lists.erl"},{line,1213}]},
     {h2_stream,is_valid_headers,2,
                [{file,"grpcbox/_build/default/lib/chatterbox/src/h2_stream.erl"},
                 {line,818}]},
     {h2_stream,half_closed_local,3,
                [{file,"grpcbox/_build/default/lib/chatterbox/src/h2_stream.erl"},
                 {line,589}]},
     {gen_statem,loop_state_callback,11,[{file,"gen_statem.erl"},{line,1159}]},
     {proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,249}]}]

Channels leak memory

gprcbox version is v0.11.0
The grpcbox library leaks memory for each message sent on an open channel. Closed streams accumulate in the data of gen_statem for the h2_connection of chatterbox:

{connected,{connection,client,[],undefined,
                       {gen_tcp,#Port<0.256>},
                       {settings,...}
                       {settings,...}
                       260494386,97150,
                       {hpack_context,...}
                       {hpack_context,...}
                       {[],[]},
                       12649,
                       {stream_set,client,
                                   {peer_subset,unlimited,1,0,12649,
                                                [{active_stream,1,...}
                                                 {closed_stream,3,...}
                                                 {closed_stream,5,...}
                                                 {closed_stream,7,...}
                                                 {closed_stream,9,...}
                                                 {closed_stream,11,...}
                                                 ...}

The h2_connection:get_response() function provides a way of marking closed streams as garbage in order to free them but this is not used by grpcbox. Grpcbox reads response data directly as it is streamed in, not in the end as one piece. Because of this grpcbox never calls h2_connection:get_response() and the memory is not freed. Under load the memory grows quickly and as the growing data structure is accessed repeatedly, also the CPU usage increases rapidly.

The connect timeout for a channel is infinite

In chatterbox, h2_connection:init does not set a timeout for connect() so in case there is no IP level response, it will take a long time before a unary request returns when the channel is in disconnected state.

A workaround of changing connect/3 to connect/4, specifying a hardcoded timeout does fix this. However, in both the fixed and unfixed situation a crash report is generated by the h2_connection gen_statem process exiting so it is not very graceful as is.

In general, being able to to control connect and read timeouts through options would be a nice addition, currently at least on the chatterbox side timeouts seem hardcoded.

There is deprecated funcs and usage in deps of grpcbox on erlang21

Env

Erlang && Rebar3

Rebar3 report
 version 3.6.1
 generated at 2018-07-10T02:45:04+00:00
=================
Please submit this along with your issue at https://github.com/erlang/rebar3/issues (and feel free to edit out private information, if any)
-----------------
Task: 
Entered as:
  
-----------------
Operating System: x86_64-pc-linux-gnu
ERTS: Erlang/OTP 21 [erts-10.0] [source] [64-bit] [smp:4:4] [ds:4:4:10] [async-threads:1]
Root Directory: /usr/lib/erlang
Library directory: /usr/lib/erlang/lib
-----------------
Loaded Applications:
bbmustache: 1.5.0
certifi: 2.0.0
cf: 0.2.2
common_test: 1.16
compiler: 7.2
crypto: 4.3
cth_readable: 1.4.2
dialyzer: 3.3
edoc: 0.9.3
erlware_commons: 1.2.0
eunit: 2.3.6
eunit_formatters: 0.5.0
getopt: 1.0.1
hipe: 3.18
inets: 7.0
kernel: 6.0
providers: 1.7.0
public_key: 1.6
relx: 3.26.0
sasl: 3.2
snmp: 5.2.11
ssl_verify_fun: 1.1.3
stdlib: 3.5
syntax_tools: 2.1.5
tools: 3.0

-----------------
Escript path: /home/johnzhuang/rebar3
Providers:
  app_discovery as clean compile compile config cover ct cut deps dialyzer do docs edoc erl_vsn escriptize eunit gen get-deps help info install install_deps key list lock new owner path pkgs publish release relup report search shell state tar tree unlock update upgrade upgrade upgrade user version xref 

grpcbox

 {<<"grpcbox">>,
  {git,"https://github.com/tsloughter/grpcbox.git",
       {ref,"8e464ba16c256ce144bae6db863ce2699f069464"}}

deprated report

===> Compiling chatterbox
===> Compiling _build/default/lib/chatterbox/src/h2_connection.erl failed
_build/default/lib/chatterbox/src/h2_connection.erl:351: ssl:ssl_accept/2: deprecated; use ssl:handshake/2 instead

Calling grpcbox_channel:stop() causes crash reports on active streams

grpcbox_channel:stop() uses gproc_pool:force_delete which will kill the processes and their children. This causes crash reports when there are active streams in the h2 connection. When dynamically opening and closing channels these crash reports can appear regularly. Since they do not mean an error in this case, an alternative, more silent way of shutting down the channel would be needed.

Interceptors

Including a chain interceptor and one for opencensus tracing/stats.

Errors loading plugin {grpc_plugin, "~> 0.7.0"} when running `rebar3 as test shell`

I pulled the latest version of the repo, and ran the rebar3 as test shell as instructed in the documentation. I am getting this error:

ryan@Ryans-MBP grpcbox % rebar3 as test shell DEBUG=1
===> Analyzing applications...
===> Compiling getopt
===> Compiling providers
===> Compiling gpb
===> Compiling grpcbox_plugin
===> Compiling _build/default/plugins/grpcbox_plugin/src/grpcbox_plugin_prv.erl failed
_build/default/plugins/grpcbox_plugin/src/grpcbox_plugin_prv.erl:5: can't find include lib "providers/include/providers.hrl"; Make sure providers is in your app file's 'applications' list
_build/default/plugins/grpcbox_plugin/src/grpcbox_plugin_prv.erl:142: undefined macro 'PRV_ERROR/1'

_build/default/plugins/grpcbox_plugin/src/grpcbox_plugin_prv.erl:59: function compile_pb/3 undefined

===> Errors loading plugin {grpcbox_plugin,"~> 0.7.0"}. Run rebar3 with DEBUG=1 set to see errors.
===> Verifying dependencies...
===> Analyzing applications...
===> Compiling grpcbox
Erlang/OTP 23 [erts-11.1.8] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:1] [hipe] [dtrace]

Eshell V11.1.8  (abort with ^G)
1> ===> Booted hpack
===> Booted chatterbox
===> Booted acceptor_pool
===> Booted gproc
===> Booted ctx
===> Booted grpcbox

The erlang version that I have installed is 23.2.7

Is there some dependency that I am missing?

Concurrency issue with server-side header encoding

When handling concurrent requests, the server may send out an HTTP/2 HEADERS frame that refers to headers in the hpack dynamic table that are not yet allocated at the client side. This causes the client to fail to decode the headers and in case of the grpcbox client it causes it to crash. This issue can be reproduced by running the unary concurrency test of PR #41.

The issue here can best be seen from a PCAP dump of the traffic generated by the test. In the attached dump, the first HEADERS sent by the server occurs in packet 324 (stream ID 101). Here the HTTP/2 Header Block Fragment contains the headers in long format as its the first time they are sent. The client decodes 3 dynamic headers and now its hpack dynamic table contains values at positions 62, 63 and 64.

Next in packet 325 the server sends a HEADERS for stream 127. This message has a Header Block Fragment of 88c2c1c0, of which c2,c1 and c0 are referring to the dynamic table positions 66, 65, and 64. Since the client has not received any headers yet for position 66 and 65, it fails to decode the headers. The message sent by the server is invalid.

The same error occurs in packet 327 for the next stream 117.

Finally in packet 328, the server sends the trailers for the original stream 101, these 2 are also in the long format and once the client has decoded them it will have values at positions 62-66. At this point the invalid packets (325 and 327) sent earlier match the dynamic headers they are referring to in a correct way. This likely means that the order that the server has encoded the messages is not guaranteed to be the order they are sent out in. If they were, packet 328 with the trailers would have been sent out before 325 and 327.

fail.pcap.gz

Client hangs forever on the first call if endpoint is unreachable

While making the first request the grpcbox_client_stream tries to obtain HTTP2 connection object with grpcbox_subchannel:conn/1.

Calling grpcbox_subchannel:conn/1 (being in idle state at first) causes grpcbox_subchannel to connect. But if h2_client:start_link fails, - for example, because the endpoint is not up now, - the state of grpcbox_subchannel remains unchanged (i.e. idle) in a little bit inconsistent state. It neither crashes nor attempts to re-connect.

I see that an idle_timeout attribute is introduced to the state, so I guess the intention was to do something after some period of time, but the attribute is not used.

Further observations:
The h2_client:start_link creates a link with grpcbox_subchannel so if the connection is successful at the initial connect, but closed further, the grpcbox_subchannel is expected to exit as well. BUT the process_flag(trap_exit) is set to true. That means that in ready state the statem receives {'EXIT',...} event which will be skipped by

handle_event(_, _, _) ->
keep_state_and_data.

and again the statem remains in ready state, but neither crashes nor attempts to re-connect.

Did I miss something?

How did you expected to handle problems with h2_connection?

This is a design decision so I find it better to discuss the issue before submitting the PR.

The connection can break on concurrent unary requests

When running concurrent unary requests, HTTP/2 HEADERS frames are sometimes sent in an order where a lower stream identifier is sent after an already sent higher one. This causes a HTTP/2 connection error and breaks communication when it happens.

-spec incorrect for enum generated erlang code

I have a .proto file with enum definitions, and when I generate the erlang code I find that the -spec definitions for fqbin_to_msg_name() and msg_name_to_fqbin() are swapped, see below snippet.

-spec msg_name_to_fqbin(_) -> no_return().
fqbin_to_msg_name(E) -> error({gpb_error, {badmsg, E}}).


-spec fqbin_to_msg_name(_) -> no_return().
msg_name_to_fqbin(E) -> error({gpb_error, {badmsg, E}}).

request new release containing client timeout

I have a need to specify the timeout when making a grpc client call. It appears the fix has been made to grpcbox, so I was wondering if it is possible to get a new release with the fix published.

Thanks,
Mark.

Requests get stuck without timeout when the HTTP/2 connection (re)connects

For unary requests, when the HTTP/2 connection disconnects, the next request will make grpcbox_subchannel:conn() try to reconnect it. Meanwhile all other requests sent to the same subchannel will block behind grpcbox_subchannel:conn() which is blocking until success or connect_timeout. If the HTTP/2 connect keeps timing out, the requests in the queue will take very long to return.

In our case we would have alternative channels that could have been used if requests always timed out within a specified time. Any ideas on how requests getting stuck in this situation could be prevented?

should client otel span be parent of server span

Using the opentelemetry grpcbox instrumentation library and I am seeing both the client and server spans in Zipkin. However, I expected to see one span, with the client span being the parent of the server span. Or perhaps the client span with the server span a sub-span of the client. Both my client and server are Erlang.

ctx deadline does not affect local request timeouts

For the client, the deadline set in the ctx only affects the grpc-timeout header sent to the server. It does not affect how requests time out on the client-side. Currently if the deadline is larger than the hard-coded 5s timeout, the request will timeout too soon. Also if the deadline is shorter than 5s, but the server does not reply according to the deadline, it will take 5s for the request to time out.

I think it would make sense for the deadline to affect the local timeouts as well, and not only the grpc-timeout header.

Always receive "stream terminated by RST_STREAM with error code: STREAM_CLOSED" after grpc request

When go grpc client or grpcbox client call gopcbox server, both see RST_STREAM in tcpdump log:

go client call:
image

grpcbox client call:
image

RST_STREAM detail:
image

when go grpc client or grpcbox client call go grpc server, not see RST_STREAM in tcpdump log:

go grpc call:
image

grpcbox call:
image

go grpc client don't handle RST_STREAM, it will return an error, but grpcbox won't.

go error:
image

grpcbox result:
image

My issue is: why RST_STREAM happened after the grpc response send?

Transfer big binary buffer very slow?

Hello, tsloughter

I tried to transfer a 1000MB buffer to localhost, but it seems very slow. I want to ask if you try this test before?

the speed is just 8MB/s.

There is a wried behaviour in of bidrectional streaming RPC client on erl shell

I found in rebar3 shell:

If create 'S' and send request function call seperated with comma , that's ok:

1> {ok, S1}=hellostreamingworld_multi_greeter_client:say_hello(ctx:new()),
1>   grpcbox_client:send(S1, #{name => "hi", num_greetings => "3"}),
2> grpcbox_client:recv_data(S1).
{ok,#{message => <<"Times: 3 Hello, hi ">>}}

but if seperated with period,send seems is not successful and receive_data always return timeout:

1> {ok, S1}=hellostreamingworld_multi_greeter_client:say_hello(ctx:new()).
{ok,#{channel => <0.626.0>,encoding => identity,
      monitor_ref => #Ref<0.1583415125.268697602.72154>,
      service_def =>
          {grpcbox_def,'hellostreamingworld.MultiGreeter',
                       #Fun<hellostreamingworld_multi_greeter_client.0.23407338>,
                       #Fun<hellostreamingworld_multi_greeter_client.1.23407338>},
      stream_id => 1,stream_pid => <0.630.0>}}
2> grpcbox_client:send(S1, #{name => "hi", num_greetings => "3"}).
ok
3> grpcbox_client:recv_data(S1).
timeout

Why, It's reasonable?

New grpcbox release

Hi @tsloughter,

It's not actually an issue, just a question.
The current main branch includes a few important fixes related to GRPC timeouts/deadlines:

  • #91
  • #89
  • #95
    It looks like it's worth tagging a new release to include those fixes. What do you think about that?

grpcbox is is inconsistent with chatterbox/h2_client

Hi,

I noticed that grpcbox uses functions that do not exist. For example, grpcbox_subchannel.erl(line 96) uses:
case h2_client:start_link(Transport, Host, Port, options(Transport, SSLOptions), #{stream_callback_mod => grpcbox_client_stream}) of
There is no five argument function in chatterbox application.

Unfortunately it is required to create a new stream.

I am using the latest versions of grpcbox and chatterbox.

Compilation fails

Compilation fails on latest release and on master for Erlang/OTP v20 (on Ubuntu):

drasko@Kant:~/grpcbox$ rebar3 compile
===> Fetching covertool v2.0.1
===> Downloaded package, caching at /home/drasko/.cache/rebar3/hex/hexpm/packages/covertool-2.0.1.tar
===> Compiling covertool
===> Fetching grpcbox_plugin v0.7.0
===> Downloaded package, caching at /home/drasko/.cache/rebar3/hex/hexpm/packages/grpcbox_plugin-0.7.0.tar
===> Fetching gpb v4.7.3
===> Downloaded package, caching at /home/drasko/.cache/rebar3/hex/hexpm/packages/gpb-4.7.3.tar
===> Fetching providers v1.7.0
===> Downloaded package, caching at /home/drasko/.cache/rebar3/hex/hexpm/packages/providers-1.7.0.tar
===> Fetching getopt v1.0.1
===> Downloaded package, caching at /home/drasko/.cache/rebar3/hex/hexpm/packages/getopt-1.0.1.tar
===> Compiling getopt
===> Compiling providers
===> Compiling gpb
Compiling descriptor.proto...
Compiling gpb_descriptor.erl...
Compiling gpb_compile_descr.erl...
===> Compiling grpcbox_plugin
===> Fetching rebar3_lint v0.1.10
===> Downloaded package, caching at /home/drasko/.cache/rebar3/hex/hexpm/packages/rebar3_lint-0.1.10.tar
===> Fetching elvis v0.4.2
===> Downloaded package, caching at /home/drasko/.cache/rebar3/hex/hexpm/packages/elvis_core-0.4.2.tar
===> Fetching goldrush v0.1.9
===> Downloaded package, caching at /home/drasko/.cache/rebar3/hex/hexpm/packages/goldrush-0.1.9.tar
===> Fetching katana_code v0.1.2
===> Downloaded package, caching at /home/drasko/.cache/rebar3/hex/hexpm/packages/katana_code-0.1.2.tar
===> Fetching zipper v1.0.1
===> Downloaded package, caching at /home/drasko/.cache/rebar3/hex/hexpm/packages/zipper-1.0.1.tar
===> Fetching aleppo v1.1.1
===> Downloaded package, caching at /home/drasko/.cache/rebar3/hex/hexpm/packages/inaka_aleppo-1.1.1.tar
===> Compiling goldrush
===> Compiling zipper
===> Compiling aleppo
===> Compiling katana_code
===> Compiling elvis
===> Compiling rebar3_lint
===> Verifying dependencies...
===> Fetching acceptor_pool v1.0.0
===> Downloaded package, caching at /home/drasko/.cache/rebar3/hex/hexpm/packages/acceptor_pool-1.0.0.tar
===> Fetching chatterbox v0.9.1
===> Downloaded package, caching at /home/drasko/.cache/rebar3/hex/hexpm/packages/ts_chatterbox-0.9.1.tar
===> Fetching ctx v0.5.0
===> Downloaded package, caching at /home/drasko/.cache/rebar3/hex/hexpm/packages/ctx-0.5.0.tar
===> Fetching gproc v0.8.0
===> Downloaded package, caching at /home/drasko/.cache/rebar3/hex/hexpm/packages/gproc-0.8.0.tar
===> Fetching hpack v0.2.3
===> Downloaded package, caching at /home/drasko/.cache/rebar3/hex/hexpm/packages/hpack_erl-0.2.3.tar
===> Compiling hpack
===> Compiling chatterbox
===> Compiling gproc
_build/default/lib/gproc/src/gproc_dist.erl:25: Warning: behaviour gen_leader undefined

===> Compiling ctx
===> Compiling acceptor_pool
===> Compiling grpcbox
===> Compiling src/grpcbox_stream.erl failed
src/grpcbox_stream.erl:4: can't find include lib "kernel/include/logger.hrl"; Make sure kernel is in your app file's 'applications' list
src/grpcbox_stream.erl:213: undefined macro 'LOG_INFO/2'

src/grpcbox_stream.erl:23: function on_receive_data/2 undefined

src/grpcbox_stream.erl:217: Warning: function handle_message/2 is unused
src/grpcbox_stream.erl:244: Warning: function handle_unary/3 is unused

drasko@Kant:~/grpcbox$ erl
Erlang/OTP 20 [erts-9.2] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:10] [kernel-poll:false]

Eshell V9.2  (abort with ^G)
1> 
BREAK: (a)bort (c)ontinue (p)roc info (i)nfo (l)oaded
       (v)ersion (k)ill (D)b-tables (d)istribution

drasko@Kant:~/grpcbox$ rebar3 --version
rebar 3.12.0 on Erlang/OTP 20 Erts 9.2

grpcbox_channel.erl error

gproc_pool:new(Name, BalancerType, [{size, length(Endpoints)}, {autosize, true}]),
this autosize is error ,it should be auto_size.

grpc-timeout header not matching the spec

Hi,

When testing with a deadline in context, the timeout value is converted to nanosecond in grpc-timeout header.
But there is no check of the TimeoutValue range, which could result in the server rejecting the msg with protocol error.

The grpc spec specified that the TimeoutValue can contain at most 8 digits:

TimeoutValue β†’ {positive integer as ASCII string of at most 8 digits}

Best regards,
Tonny

Fix documentation and add documentation for elixir

I would like to make a MR, where I will fix the documentation build, add possibility for documentation to be built for HexDocs just like telemetry, and finally try to complete all the missing documentation.

What I need to know is how _bhvr files were generated and why they contain the invalid @doc tags?

Streaming interceptor that runs on end of stream

Related to #2.

Right now the interceptor is only run when the user calls a grpcbox_client:recv_ function. This breaks the functionality of interceptors like opencensus that rely on the time difference to report span duration and metrics.

Maybe keeping those metrics and exposing them to interceptors would work or have the intercepts run in the stream process, which may come out of #2.

Metadata not returned in case of non-ok status

Currently code (e.g. in grpcbox_client:unary_handler/6) contains code:

case recv_trailers(S) of
    {ok, {<<"0">>, _, Metadata}} ->
        {ok, Data} = recv_data(S, 0),
        {ok, Data, #{headers => Headers,
                     trailers => Metadata}};
    {ok, {Status, Message, _Metadata}} ->
        {error, {Status, Message}}
end;

Which means, that if status is not ?GRPC_STATUS_OK, it does not send metadata (headers and trailers) to client.
We are integrating Erlang service with Kotlin (Java) services, which use additional trailers to send more details about errors.
With this code, we can not consume this information.
It can be fixed, e.g. by:

case recv_trailers(S) of
    {ok, {<<"0">>, _, Metadata}} ->
        {ok, Data} = recv_data(S, 0),
        {ok, Data, #{headers => Headers,
                     trailers => Metadata}};
    {ok, {Status, Message, Metadata}} ->
        {error, {Status, Message}, #{headers => Headers, trailers => Metadata}}
end;

But unfortunately, it changes API of grpcbox.

We are willing to make pull request for this change, but we are not sure, how to approach API change.
There are at least two options:

  • return always extended error, and handle API change through change in version
  • use error format based on Options (add new option for extended error)
  • maybe something else

What is your preferred approach?

Client streaming interceptors without recv_data functions

Current client implementation on the client branch provides send and recv functions for interacting with the server. This was done to make interceptors work.

If the user simply uses receive to get the stream data no interceptors will run. Preferably this would not be the case and allow for users to have to rely on grpcbox_client functions to read from the stream.

Using it with `rebar3 new release`

I am trying to configure grpcbox with a rebar3 release. I cannot get the files to be generated at apps/models/src.

My configuration looks like.

{grpc, [{protos, "protos"},
        {out_dir, "apps/models/src"},
        {gpb_opts, [{o, "apps/models/src"},                
                    {out_dir, "apps/models/src"},
                    {module_name_suffix, "_pb"}
                   ]
        }]
}.

Race condition when closing a stream via `grpcbox_client:close_and_recv/1`

I've got a bit of a conundrum trying to chase down a spurious error in a test suite. I'm occasionally getting timeout back from grpcbox_client:close_and_recv/1 instead of the expected stream_finished.

The code between grpcbox and chatterbox is a bit hard to follow. But near as I can tell, what's happening is that we're receiving the eos message at [1] and then there's a race to get to [2] where most of the time is_process_alive/1 returns false although occasionally it can return true which leads to us getting a timeout.

At this point I think I'm just going to remove my assertion on the return of grpcbox_client:close_and_recv/1 because I can't figure out an appropriate patch given how we demonitor when receiving the eos message. The only thing I can think of is to have a new grpcbox_client_stream:recv_end/2 call that ends up returning any messages in the mailbox until the process is dead or something? But that seemed not quite right so I figured I'd just open the issue to see if anyone else had any better ideas.

[1] https://github.com/tsloughter/grpcbox/blob/v0.9.1/src/grpcbox_client.erl#L197-L199
[2] https://github.com/tsloughter/grpcbox/blob/master/src/grpcbox_client_stream.erl#L117-L122

Opening and closing client channels (gRPC conncetions)

Currently client channels (gRPC conncetions) can be definied only in the configuration before starting of the application.

It would be good to be capable to dynamically open and close gRPC connection while keeping the connection handle as a reference in our application state - this is how many client drivers are working.

Typically, in my application I handle gRPC comm from within gen_server, and then opening and closing gRPC connection on each gen_server's init/1 and terminate/2.

Apart from this - if this dynamic connect/disconnect can not be added - what is the best way today do do the actual channel termination? It is cler how chonnels are created (by initial config prior to starting the app), but what is the best way to close channels that we do not use later in the application?

Renaming of dyalizer type any() for google.protobuf.Any

Hello! I was wondering if it's possible to specify an code gen override for a particular type that is out of my control.

In this case, I'm talking of google.protobuf.Any, a proto message used to encode any protobuf message.

I'm getting the following error on cod gen:

===> Writing src/app_pb.erl
===> Error building src/app_pb.erl
        114: type any() is a builtin type; it cannot be redefined

Which I believe is a dyalizer error for trying to generate type any :: ... here:

%... part of the generated code
   114 -type any() ::
   115       #{type_url                => iodata(),        % = 1
   116         value                   => iodata()         % = 2
   117        }.

What work arounds, other than modifying the source proto files that define this message, could be used here?

Thanks for the work on grpcbox πŸ™Œ

Streams leak memory

grpcbox version v0.11.0
The grpcbox library leaks memory for each message sent on an open stream. In the stream_state record used for the h2_stream gen_statem data, there is an incoming_frames field. This field is a queue to which frames are added, but none are ever removed. In fact the information in this field is not used, apart from adding to it. Because of adding more and more data to it, the memory usage will keep increasing as long as the stream stays open. Note that currently the chatterbox library used by grpcbox is a forked one, not the original one.

OpenTelemetry instrumentation

OpenCensus was deprecated and replaced by the merging with OpenTracing resulting in https://opentelemetry.io/

We have https://github.com/open-telemetry/opentelemetry-erlang

I was originally going to simply replace the OC module in this repo but now wondering if it would be better to put it as its own library in https://github.com/open-telemetry/opentelemetry-erlang-contrib to make it more visible for someone coming to otel and looking to see what instrumentions are available.

Falls apart with grpcannon concurrency over 2

Running grpcannon with:

$ grpcannon -n 100 -c 3 -proto interop/proto/test.proto -call grpc.testing.TestService/UnaryCall -d '{}' localhost:8080

Results in dozens and dozens of transport closing errors.

Crash report:

=CRASH REPORT==== 29-Jun-2018::19:19:09.328427 ===
  crasher:
    initial call: h2_stream:init/1
    pid: <0.1037.0>
    registered_name: []
    exception exit: {{badmatch,{error,einval}},
                     [{h2_connection,handle_event,3,
                          [{file,
                               "/home/tristan/Devel/grpcbox/_build/default/lib/chatterbox/src/h2_connection.erl"},
                           {line,940}]},
                      {gen_statem,call_state_function,5,
                          [{file,"gen_statem.erl"},{line,1660}]},
                      {gen_statem,loop_event_state_function,6,
                          [{file,"gen_statem.erl"},{line,1023}]},
                      {proc_lib,init_p_do_apply,3,
                          [{file,"proc_lib.erl"},{line,249}]}]}
      in function  gen_statem:loop_receive/3 (gen_statem.erl, line 894)
    ancestors: [<0.983.0>,'grpcbox_pool_0.0.0.0_8080',
                  'grpcbox_services_sup_0.0.0.0_8080',
                  grpcbox_services_simple_sup,grpcbox_sup,<0.565.0>]
    message_queue_len: 0
    messages: []
    links: []
    dictionary: []
    trap_exit: true
    status: running
    heap_size: 4185
    stack_size: 27 
    reductions: 222484

Decide what to do with `ctx`

Likely this dep needs to just be dropped and anything useful moved into the repo.

OpenTelemetry is not using ctx but instead the process dictionary.

I'm not sure yet that there is any benefit to having an application that simply acts on top of the process dictionary, tho there may be for more complex uses of keys.

Missing trailers when streaming long data

I need to stream out some binary data following trailers like this:

...
grpcbox_stream:send(#{binary => Bin}, GRPCStream),
grpcbox_stream:add_trailers([{<<"size">>, integer_to_binary(byte_size(Bin))}], GRPCStream),
ok.

If the Bin is relatively small everything is ok. But if the Bin is big, then trailers are missed completely, in tcpdump I see that end stream flag is send together with the last chunk of the binary. When testing with grpcurl it claims: "Message: server closed the stream without sending trailers"
It looks like the reason is here:
https://github.com/tsloughter/grpcbox/blob/main/src/grpcbox_stream.erl#L337
If I change send_end_stream to false the problem disappears.

Add optional configuration information to service handlers

I have an issue where it would be nice to add an additional parameter to every service method with some configuration/state information provided when starting the service - so something along the lines of:

AdditionalOptions = ... %% some term
Server = #{grpc_opts => #{service_protos => [my_service], 
                                              services => #{'my.myService' => {my_service_handler, AdditionalOptions}}
                  ,...}
grpcbox:start_server(Server)

and then in my_service.erl:

my_service_function(Ctx, Input, AdditionalOptions) ->
 ...

or alternatively:

my_service_function(Ctx, Input) ->
  AdditionalOptions = ctx:get(Ctx, ctx_additional_options),
  ...

I guess the latter means that the generated behaviours don't need changing...

Any thoughts on a) whether this is a PR that you'd be interested in taking and b) if it is something you'd be interested in, views on what the usage should look like?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    πŸ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. πŸ“ŠπŸ“ˆπŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❀️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.