Giter Club home page Giter Club logo

grpc's People

Contributors

adrielbento avatar angelo-moreira avatar asummers avatar avillen avatar beligante avatar carrascoacd avatar cjab avatar darkofabijan avatar davydog187 avatar drowzy avatar fahchen avatar falood avatar fire avatar jayh5 avatar nezteb avatar nmashton avatar pettermachado avatar polvalente avatar rands0n avatar rbao avatar redink avatar scohen avatar sleipnir avatar sorliem avatar ssboisen avatar ssepml avatar szahariev avatar tony612 avatar v0idpwn avatar wingyplus avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

grpc's Issues

0.3.0-alpha.2 blocks GRPC servers from receiving messages

We have a GRPC streaming server that works something like this using the GRPC 0.3.0-alpha.1:

@spec stream_listeners(Enumerable.t(), Stream.t()) :: Stream.t()
def stream_listeners(req_enum, stream0) do
  :ok = Publisher.subscribe(Publisher, :listeners, self())
  handle_requests(req_enum, stream0)
end

@spec handle_requests(Enumerable.t(), Stream.t()) :: Stream.t()
defp handle_requests(req_enum, stream0) do
  req_enum |> Enum.reduce(stream0, &handle_request(&1, &2))
end

@spec handle_request(DiscoveryRequest.t(), Stream.t()) :: Stream.t()
defp handle_request(_request, stream) do
  receive do
    {:listeners, version_info, resources} ->
      send_reply(stream, version_info, resources)
  end
end

@spec send_reply(Stream.t(), String.t(), [unquote(resource_type).t]) :: Stream.t()
defp send_reply(stream, version_info, resources),
  do: GRPC.Server.send_reply(stream, mkresponse(version_info, resources))

I've tried to adapt the server for 0.3.0-alpha.2, but the receive call seems to be blocked by grpc-elixir. Messages are never received and an error is produced like:

12:41:16.486 [error] ** (CaseClauseError) no case clause matching: {:listeners, "1", ...
    (grpc) lib/grpc/adapter/cowboy.ex:86: GRPC.Adapter.Cowboy.read_stream/2
    (elixir) lib/stream.ex:1466: Stream.do_unfold/4
    (elixir) lib/enum.ex:1919: Enum.each/2
    (grpc) lib/grpc/server.ex:134: GRPC.Server.do_handle_request/5
    (grpc) lib/grpc/adapter/cowboy/handler.ex:180: GRPC.Adapter.Cowboy.Handler.do_call_rpc/3
    (grpc) lib/grpc/adapter/cowboy/handler.ex:168: GRPC.Adapter.Cowboy.Handler.call_rpc/3

I think the relevant change is tony612/grpc@d1abe70 and the message we send our server is actually received in the adapter handler's sync_call function, which then doesn't match any clause in the adapter's read_stream function.

We would like to be able to send messages to our server as it makes streaming easier. Can we find a way to filter the messages that the adapter handler receives or change the system in some way to allow messages to be sent to the server?

Thanks for the great library!

TLS handshake error for Go client

As a side project, I'm rewriting part of a Go service I wrote using Elixir. The Go service uses TLS with server/client certificates. When I try to setup the same server certificate on my Elixir project, there's some weird handshake issue going on when I send a request using my Go client.

When using openssl to check if the socket is properly configured, everything seems good:

$ openssl s_client -connect host:9001 -cert certs/127.0.0.1.crt -key certs/127.0.0.1.key -CAfile certs/cabundle.pem -tls1
...
---
SSL handshake has read 2168 bytes and written 426 bytes
---
New, TLSv1/SSLv3, Cipher is DHE-RSA-AES256-SHA
Server public key is 2048 bit
Secure Renegotiation IS supported
Compression: NONE
Expansion: NONE
SSL-Session:
    Protocol  : TLSv1
    Cipher    : DHE-RSA-AES256-SHA
    Session-ID: 89096596000B262380F50F8A10359D403A2DC3E46ABFF7D471C8A7F8829C29E0
    Session-ID-ctx:
    Master-Key: 75ECC1506DC23170AF356406BCD35BE87FDD4EC7738FC6B4F3F1B5121451E9D99513EADCD79CCADC5B85C2BDBF70D8A1
    Key-Arg   : None
    Start Time: 1509063686
    Timeout   : 7200 (sec)
    Verify return code: 0 (ok)
---
closed

But when I try the Go gRPC client, this is what I get on the Go side:

PANI[0000] Error when creating session!                  error="rpc error: code = Internal desc = server closed the stream without sending trailers"
panic: (*logrus.Entry) (0x146c400,0xc4201f0690)
...

and this on the Elixir side:

16:03:55.168 [info]  Running Talk.Talk.Server with Cowboy using https://0.0.0.0:9001
Interactive Elixir (1.5.1) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> 16:03:55.528 [debug] Lager installed handler lager_backend_throttle into lager_event
16:04:46.591 [warning] lager_error_logger_h dropped 72 messages in the last second that exceeded the limit of 50 messages/sec
16:04:46.591 [error] SSL: hello: tls_handshake.erl:203:Fatal error: handshake failure - handshake_decode_error

16:04:46.591 [error] SSL: :hello: tls_handshake.erl:203:Fatal error: handshake failure - handshake_decode_error
...

Any ideas what could be going on?

Stress testing Elixir <=> Elixir and Ruby <=> Elixir communication

If calls between Ruby and Elixir are performed in any combination by hand with manually triggering client calls from shell things work without any issues. Not sure if relevant but just a note that in this case client process is short lived - one rpc call per process.

I was browsing tests for Ruby implementation and noticed that they have stress tests so I wanted to try something like that. I just put a loop around client in Ruby and Elixir to perform 100K calls in helloworld example. This is far from the proper stress test, no concurrency in client calls is introduced and probably some other things are missing. Just first step.

Here is a short report on how things work:

  • ๐Ÿ’ฅ Elixir client => Elixir server can survive around 3K calls before crashing
  • โœ… Ruby client => Ruby server can survive 100K calls without any issues
  • ๐Ÿ’ฅ Elixir client => Ruby server can survive between 1 to 3 calls. If timeout between calls is introduced it can survive a bit longer, maybe 10 calls.
  • ๐Ÿ’ฅ Ruby client => Elixir server can survive around 6K calls

Results with logs and exceptions can be seen here โ€“ https://semaphoreci.com/darkofabijan/grpc-stress-test/branches/master/builds/17
Code is in โ€“ https://github.com/darkofabijan/grpc-stress-test/

Also, these stress tests are not systematically organized and it's not possible to easily test changes. I took a shortcut and just committed everything in a single repo to test out this approach. I can think about introducing a more systematic approach that can scale and follow in CI automatically. This is also probably good groundwork for benchmarking.

P.S.
grpc-elixir is using this version of chatterbox - joedevivo/chatterbox#114

Troubles when talking to Ruby server

I am getting an exception when making a call from Elixir client to Ruby server. I hope that I am just missing something trivial and that it's an easy fix :)

How to reproduce the issue:

  1. Follow grpc quick start guide for Ruby
gem install grpc
gem install grpc-tools
git clone -b v1.4.x https://github.com/grpc/grpc
cd grpc/examples/ruby
  1. On Elixir side
git clone [email protected]:tony612/grpc-elixir.git
cd grpc-elixir/examples/helloworld
mix do deps.get compile

Ruby server, Elixir client

Run server with ruby greeter_server.rb

Run client with mix run priv/client.exs and I get following exception:


06:47:44.129 [error] ** State machine :"grpc_chatter_client_localhost:50051" terminating
** Last message in was {:tcp, #Port<0.5241>,
 <<0, 0, 24, 4, 0, 0, 0, 0, 0, 0, 4, 0, 0, 255, 255, 0, 5, 0, 1, 0, 9, 0, 6, 0,
   0, 64, 0, 254, 3, 0, 0, 0, 1>>}
** When State == :handshake
**      Data  == {:connection, :client, [], :undefined, {:gen_tcp, #Port<0.5241>},
 {:settings, 4096, 1, :unlimited, 65535, 16384, :unlimited},
 {:settings, 4096, 1, :unlimited, 65535, 16384, :unlimited}, 65535, 65535,
 {:hpack_context, {:dynamic_table, [], 4096, 0}, 4096},
 {:hpack_context, {:dynamic_table, [], 4096, 0}, 4096},
 {[{#Reference<0.1800748995.1976565762.116339>,
    {:settings, 4096, 1, :unlimited, 65535, 16384, :unlimited}}], []}, 1,
 {:stream_set, :client, {:peer_subset, :unlimited, 0, 0, 1, []},
  {:peer_subset, :unlimited, 0, 0, 2, []}}, :chatterbox_static_stream, [],
 :empty, :undefined, :auto, %{}}
** Reason for termination =
** {:function_clause,
 [{:h2_frame_settings, :parse_settings,
   [<<254, 3, 0, 0, 0, 1>>, [{<<6>>, 16384}, {<<5>>, 65545}, {<<4>>, 65535}]],
   [file: '/Users/darko/work/grpc-elixir/examples/helloworld/deps/chatterbox/src/h2_frame_settings.erl',
    line: 93]},
  {:h2_frame_settings, :parse_settings, 1,
   [file: '/Users/darko/work/grpc-elixir/examples/helloworld/deps/chatterbox/src/h2_frame_settings.erl',
    line: 90]},
  {:h2_frame_settings, :read_binary, 2,
   [file: '/Users/darko/work/grpc-elixir/examples/helloworld/deps/chatterbox/src/h2_frame_settings.erl',
    line: 82]},
  {:h2_frame, :recv, 1,
   [file: '/Users/darko/work/grpc-elixir/examples/helloworld/deps/chatterbox/src/h2_frame.erl',
    line: 72]},
  {:h2_connection, :handle_socket_data, 3,
   [file: '/Users/darko/work/grpc-elixir/examples/helloworld/deps/chatterbox/src/h2_connection.erl',
    line: 1482]}, {:gen_fsm, :handle_msg, 8, [file: 'gen_fsm.erl', line: 483]},
  {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 247]}]}

** (EXIT from #PID<0.73.0>) an exception was raised:
    ** (FunctionClauseError) no function clause matching in :h2_frame_settings.parse_settings/2
        (chatterbox) /Users/darko/work/grpc-elixir/examples/helloworld/deps/chatterbox/src/h2_frame_settings.erl:93: :h2_frame_settings.parse_settings(<<254, 3, 0, 0, 0, 1>>, [{<<6>>, 16384}, {<<5>>, 65545}, {<<4>>, 65535}])
        (chatterbox) /Users/darko/work/grpc-elixir/examples/helloworld/deps/chatterbox/src/h2_frame_settings.erl:90: :h2_frame_settings.parse_settings/1
        (chatterbox) /Users/darko/work/grpc-elixir/examples/helloworld/deps/chatterbox/src/h2_frame_settings.erl:82: :h2_frame_settings.read_binary/2
        (chatterbox) /Users/darko/work/grpc-elixir/examples/helloworld/deps/chatterbox/src/h2_frame.erl:72: :h2_frame.recv/1
        (chatterbox) /Users/darko/work/grpc-elixir/examples/helloworld/deps/chatterbox/src/h2_connection.erl:1482: :h2_connection.handle_socket_data/3
        (stdlib) gen_fsm.erl:483: :gen_fsm.handle_msg/8
        (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3

Elixir server, Ruby client

Run server with mix grpc.server

Run client with ruby greeter_server.rb ๐Ÿ‘ˆ Works! ๐ŸŽ†

My best guess is that chatterbox is just missing something. Could you please advise how to move forward? I saw that chatterbox fork is used and that some fixes are already applied.

Status of interceptors

Hello,

when will the Interceptors support be merged into the master? I see your Prometheus interceptor project is using "support-interceptor" branch still.

Thanks.

GRPC function that returns a stream fails with deadline exceeded or goaway

Whenever I try to use a function that returns a stream from my GRPC LND server I get one of these 2 errors:

iex(20)> stream = channel |> Lnrpc.Lightning.Stub.subscribe_transactions(Lnrpc.GetTransactionsRequest.new())                 
{:error, %GRPC.RPCError{message: "deadline exceeded", status: 4}}

or

iex(22)> stream = channel |> Lnrpc.Lightning.Stub.subscribe_transactions(Lnrpc.GetTransactionsRequest.new())
{:error, %GRPC.RPCError{message: "{:stop, {:goaway, 3, :enhance_your_calm, \"too_many_pings\"}, :\"Client is going away.\"}", status: 2}}

The function itself is defined as:

rpc SubscribeTransactions (GetTransactionsRequest) returns (stream Transaction);

And is translated to:

rpc :SubscribeTransactions, Lnrpc.GetTransactionsRequest, stream(Lnrpc.Transaction)

Running the same request from python or nodejs works as expected, It seems like some parameter I'm using in the GRPC client or something like that, but I'm not sure.

Any news on the Cowboy branch?

I'm looking to incorporate this into a Phoenix app at work, but mix complains about a conflict on the cowboy dependency because this packages uses a fork of the official lib.

Are there plans to move to using the official version of cowboy in the near future?

Imports

This package doesn't seem to support files that have import statements. Is there a way to use such files currently ?

exprotobuf seems to support import statements when providing all the imported files explicitly, however I can't find a way to do that with grpc-elixir.

Sending 0 with grpc python results in "nil"

Thanks for a great library, i have issues accessing vi python,
given the following proto file

syntax = "proto3";

// fix this with compiler flag -I

option java_multiple_files = true;
option java_package = "io.grpc.nilapi";
option java_outer_classname = "NilTest";
option objc_class_prefix = "CORE";

package nilapi;

service IntegerService {
  rpc SendInteger (SenderInfo) returns (SenderInfo) {}
}

message SenderInfo {
  int32 frequency = 3;
}

and the following python client

channel = grpc.insecure_channel('localhost:50051')
    stub = nil_test_pb2_grpc.IntegerServiceStub(channel)
    send_int = nil_test_pb2.SenderInfo(frequency=0)
    a = stub.SendInteger(send_int)
    print("Expecting an int back %d", a)
  1. the input recieved on the elixir side reports that the frequency is nil (and not 0 as expected)

  2. The problem is reversible, if elixir returns the returned value will be null.

elixir code:

defmodule Nilapi.IntegerService.Server do
  use GRPC.Server, service: Nilapi.IntegerService.Service
  require Logger
  alias GRPC.Server

  @spec send_integer(Nilapi.SenderInfo.t, GRPC.Server.Stream.t) :: Nilapi.SenderInfo.t
  def send_integer(request, _stream) do
    Logger.debug inspect request
    Nilapi.SenderInfo.new(frequency: 0)
  end  
end

All the code for reproducing is attached.

grpc_python_nil.zip

Multiple services on same port

Hello!

I'm sorry if this is obvious and I just missed it, I'm pretty new to elixir.
Is it possible to have multiple services on the same port as with other GRPC implementations?

Getting Malformed response: missing :status in HEADERS frame

Hello,

I'm trying to use grpc-elixir client to connect to a LND GRPC server, the commands I do are:

ca_path = Path.expand("./tls/tls.cert", :code.priv_dir(:elixir_lnd))
credential = GRPC.Credential.new(ssl: [cacertfile: ca_path])

{:ok, channel} = GRPC.Stub.connect("localhost:10009", [cred: credential])
{:ok, reply} = channel |> Lnrpc.Lightning.Stub.get_info(Lnrpc.GetInfoRequest.new())

As far as the server getting the message, it seems to work great, but I do get an error from its response:

iex(5)> {:ok, reply} = channel |> Lnrpc.Lightning.Stub.get_info(Lnrpc.GetInfoRequest.new())
{:ok, reply} = channel |> Lnrpc.Lightning.Stub.get_info(Lnrpc.GetInfoRequest.new())
** (CaseClauseError) no case clause matching: {:error, %GRPC.RPCError{message: "{:stream_error, :protocol_error, :\"Malformed response; missing :status in HEADERS frame. (RFC7540 8.1.2.4)\"}", status: 2}}
    (grpc) lib/grpc/stub.ex:320: GRPC.Stub.recv_body/5
    (grpc) lib/grpc/stub.ex:290: GRPC.Stub.recv/2

My Protobuf file is like this:

syntax = "proto3";

package lnrpc;
service Lightning {
    rpc GetInfo (GetInfoRequest) returns (GetInfoResponse) { }
}

message GetInfoRequest {
}
message GetInfoResponse {

    /// The identity pubkey of the current node.
    string identity_pubkey = 1 [json_name = "identity_pubkey"];

    /// If applicable, the alias of the current node, e.g. "bob"
    string alias = 2 [json_name = "alias"];

    /// Number of pending channels
    uint32 num_pending_channels = 3 [json_name = "num_pending_channels"];

    /// Number of active channels
    uint32 num_active_channels = 4 [json_name = "num_active_channels"];

    /// Number of peers
    uint32 num_peers = 5 [json_name = "num_peers"];

    /// The node's current view of the height of the best block
    uint32 block_height = 6 [json_name = "block_height"];

    /// The node's current view of the hash of the best block
    string block_hash = 8 [json_name = "block_hash"];

    /// Whether the wallet's view is synced to the main chain
    bool synced_to_chain = 9 [json_name = "synced_to_chain"];

    /// Whether the current node is connected to testnet
    bool testnet = 10 [json_name = "testnet"];

    /// A list of active chains the node is connected to
    repeated string chains = 11 [json_name = "chains"];

    /// The URIs of the current node.
    repeated string uris = 12 [json_name = "uris"];

    /// Timestamp of the block best known to the wallet
    int64 best_header_timestamp = 13 [ json_name = "best_header_timestamp" ];
}

Which generated this file:

defmodule Lnrpc.GetInfoRequest do
  @moduledoc false
  use Protobuf, syntax: :proto3

  defstruct []

end

defmodule Lnrpc.GetInfoResponse do
  @moduledoc false
  use Protobuf, syntax: :proto3

  @type t :: %__MODULE__{
    identity_pubkey:       String.t,
    alias:                 String.t,
    num_pending_channels:  non_neg_integer,
    num_active_channels:   non_neg_integer,
    num_peers:             non_neg_integer,
    block_height:          non_neg_integer,
    block_hash:            String.t,
    synced_to_chain:       boolean,
    testnet:               boolean,
    chains:                [String.t],
    uris:                  [String.t],
    best_header_timestamp: integer
  }
  defstruct [:identity_pubkey, :alias, :num_pending_channels, :num_active_channels, :num_peers, :block_height, :block_hash, :synced_to_chain, :testnet, :chains, :uris, :best_header_timestamp]

  field :identity_pubkey, 1, type: :string
  field :alias, 2, type: :string
  field :num_pending_channels, 3, type: :uint32
  field :num_active_channels, 4, type: :uint32
  field :num_peers, 5, type: :uint32
  field :block_height, 6, type: :uint32
  field :block_hash, 8, type: :string
  field :synced_to_chain, 9, type: :bool
  field :testnet, 10, type: :bool
  field :chains, 11, repeated: true, type: :string
  field :uris, 12, repeated: true, type: :string
  field :best_header_timestamp, 13, type: :int64
end

defmodule Lnrpc.Lightning.Service do
  @moduledoc false
  use GRPC.Service, name: "lnrpc.Lightning"
  rpc :GetInfo, Lnrpc.GetInfoRequest, Lnrpc.GetInfoResponse
end

defmodule Lnrpc.Lightning.Stub do
  @moduledoc false
  use GRPC.Stub, service: Lnrpc.Lightning.Service
end

I'm using latest master (I did try the branches too but same problem) code

Connection errors handling

Hello.

Is there a way to handle 'Connection Refused' error?

iex(1)> GRPC.Stub.connect("localhost:50051")
18:33:40.765 [error] CRASH REPORT Process <0.441.0> with 1 neighbours exited with reason: econnrefused in gen_fsm:init_it/6 line 335

Thank you.

Example of consuming third party gRPC

Below I have an example in Go lang. I met the elixir a short time (4 days). I wanted to understand how I would consume a third-party gRPC with your repo. Like the Dgraph in the example.

Thank you.
Cheers

package main

import (
  "context"
  "flag"
  "fmt"
  "io/ioutil"
  "log"
  "os"

  "github.com/dgraph-io/dgraph/client"
  "github.com/gogo/protobuf/proto"
  "google.golang.org/grpc"
)

var (
  dgraph = flag.String("d", "127.0.0.1:9080", "Dgraph server address")
)

func main() {
  flag.Parse()
  conn, err := grpc.Dial(*dgraph, grpc.WithInsecure())
  if err != nil {
    log.Fatal(err)
  }
  defer conn.Close()

  // The Dgraph client needs a directory in which to write its blank node maps.
  clientDir, err := ioutil.TempDir("", "client_")
  if err != nil {
    log.Fatal(err)
  }
  defer os.RemoveAll(clientDir)

  dgraphClient := client.NewDgraphClient([]*grpc.ClientConn{conn}, client.DefaultOptions, clientDir)
  defer dgraphClient.Close()

  req := client.Req{}
  req.SetQuery(`{
  bladerunner(func: eq(name@en, "Blade Runner")) {
    _uid_
    name@en
    initial_release_date
    netflix_id
  }
}`)

  resp, err := dgraphClient.Run(context.Background(), &req)
  if err != nil {
    log.Fatalf("Error in getting response from server, %s", err)
  }
  fmt.Printf("Response %+v\n", proto.MarshalTextString(resp))
}

Macaroon support?

Do you have or plan to have support for macaroon for credentials?

I want to use your library (the client side) to connect with a LND gRPC server which uses macaroon

How to handle client streams on server-side?

Edit note: This issue was described in other way. I made some changes to the code that allows me to bring a more complete question, here it goes:

Given the following proto:

syntax = "proto3";

package Streamer;

service Broadcast {
  rpc Broadcast (stream Video) returns (BroadcastResponse) {}
}

service Tune {
  rpc Tune (TuneRequest) returns (stream Video) {}
}

message Band {
  string uid = 1;
  string key = 2;
}

message TuneRequest {
  Band band = 1;
}

message Video {
  string index = 1;
  bytes chunk = 2;
}

message BroadcastResponse {
  bool success = 1;
  Band band = 2;
}

I'm having trouble in implementing the Broadcast server in Elixir.

Here is my current implementation:

defmodule Streamer.Servers.Broadcast do
  require Logger

  use GRPC.Server, service: Streamer.Broadcast.Service

  alias Streamer, as: S

  def broadcast(request, _stream) do
    do_broadcast(request, null_band())
  end

  defp do_broadcast(request, band) do
    Logger.info "Started broadcast | Band UID: #{band.uid} | Band Key: #{band.key}"

    Enum.map(request, &(handle/1))

    S.BroadcastResponse.new(success: true, band: band)
  end

  defp handle(video) do
    IO.puts "Gathered -> index: #{video.index}, chunk: #{video.chunk}"
  end

  defp null_band do
    S.Band.new(uid: "000", key: "111")
  end
end

And to make things clear, here's my Ruby client:

module SyrinxClient
  class Broadcast
    def initialize
      @stub = Streamer::Broadcast::Stub
                .new('localhost:7171', :this_channel_is_insecure)
    end

    def perform
      reqs = RandomVideo.new
      resp = @stub.broadcast(reqs.each)
      p "response: #{resp.inspect}"
    end
  end

  class RandomVideo
    def initialize; end

    def each
      return enum_for(:each) unless block_given?
      loop do
        v = video
        p v.inspect
        yield v
      end
    end

    private

    def video
      Streamer::Video.new(
        index: rand(0..255).to_s,
        chunk: SecureRandom.uuid
      )
    end
  end
end

Notice the loop do line in my Ruby client. I'm just streaming random data in a loop. And here begins my problem.

When I run this, my Elixir server just prints the "Started broadcast ..." message and then it stands still. I was expecting it to keep printing the "Gathered -> index: ..." message as long as it receives. Sometimes it prints some of those "Gathered" messages, but just a few like four, two, one. Then it seems to stop.

But look at this part of the Ruby client:

      loop do
        v = video
        p v.inspect
        yield v
      end

If I change it to:

      10_000.times do
        v = video
        p v.inspect
        yield v
      end

Effectively killing the infinite loop. By doing this it seems to work just fine. But there's a catch: It seems to print all the "Gathered" messages at the end of the stream. Not as long as it receives it. Which is what I was expecting.

So looks like it waits the stream to end to start doing the work, but:

Another fact is that by adding a sleep in the Ruby client like so:

      loop do
        p video.inspect
        yield video
        sleep rand(1..2)
      end

But keeping it infinite with loop do, it also seems to work just fine. And this time even better 'cause it now seems to be in sync. Pack sent, pack processed at the same time.

My question is: Is there a way to handle a "infinite" stream asynchronously? Eg.: It keeps processing new packs as long as it receives with no forced delay by the client?

Thank you!

Chatterbox client fsm terminating with :ebadf

When executing some RPCs concurrently, the server seems to be closing the connection. This happens only when executing the RPCs concurrently, and doesn't seem to happen from other clients (I've tried with a Go client).

I'm running this:

Enum.each(1..20, fn x -> spawn fn -> {:ok, channel} = GRPC.Channel.connect("addr:1000", []); channel |> Some.Stub.sign(Something.new()) end; end)

And getting this:

16:52:08.517 [error] ** State machine :"grpc_chatter_client_addr:1000" terminating
** Last message in was {:tcp_error, #Port<0.34925>, :ebadf}
** When State == :connected
**      Data  == {:connection, :client, [], :undefined, {:gen_tcp, #Port<0.34925>},
 {:settings, 4096, 1, :unlimited, 65535, 16384, :unlimited},
 {:settings, 4096, 1, :unlimited, 65535, 16384, :unlimited}, 64375, 77243,
 {:hpack_context,
  {:dynamic_table,
   [{62, "grpc-message", ""}, {63, "grpc-status", "0"},
    {64, "content-type", "application/grpc"}], 4096, 148}, 4096},
 {:hpack_context,
  {:dynamic_table,
   [{62, "te", "trailers"}, {63, "user-agent", "grpc-elixir/0.1.0"},
    {64, "content-type", "application/grpc+proto"},
    {65, ":authority", "addr"}, {66, ":path", "/some.Service/sign"}],
   4096, 275}, 4096}, {[], []}, 79,
 {:stream_set, :client,
  {:peer_subset, :unlimited, 10, 0, 79,
   [{:closed_stream, 1, #PID<0.1014.0>,
     [{":status", "200"}, {"content-type", "application/grpc"},
      {"grpc-status", "0"}, {"grpc-message", ""}],
     [<<0, 0, 0, 1, 144, 10, 3, 71, 69, 84, 18, 46, 104, 116, 116, 112, 115, 58,
        47, 47, 97, 112, ...>>], false},
    {:closed_stream, 3, #PID<0.1015.0>,
     [{":status", "200"}, {"content-type", "application/grpc"},
      {"grpc-status", "0"}, {"grpc-message", ""}],
     [<<0, 0, 0, 1, 146, 10, 3, 71, 69, 84, 18, 46, 104, 116, 116, 112, 115, 58,
        47, 47, 97, ...>>], false},
    {:closed_stream, 5, #PID<0.1016.0>,
     [{":status", "200"}, {"content-type", "application/grpc"},
      {"grpc-status", "0"}, {"grpc-message", ""}],
     [<<0, 0, 0, 1, 146, 10, 3, 71, 69, 84, 18, 46, 104, 116, 116, 112, 115, 58,
        47, 47, ...>>], false},
    {:closed_stream, 7, #PID<0.1017.0>,
     [{":status", "200"}, {"content-type", "application/grpc"},
      {"grpc-status", "0"}, {"grpc-message", ""}],
     [<<0, 0, 0, 1, 144, 10, 3, 71, 69, 84, 18, 46, 104, 116, 116, 112, 115, 58,
        47, ...>>], false},
    {:closed_stream, 9, #PID<0.1012.0>,
     [{":status", "200"}, {"content-type", "application/grpc"},
      {"grpc-status", "0"}, {"grpc-message", ""}],
     [<<0, 0, 0, 1, 139, 10, 3, 71, 69, 84, 18, 46, 104, 116, 116, 112, 115, 58,
        ...>>], false},
    {:closed_stream, 11, #PID<0.1025.0>,
     [{":status", "200"}, {"content-type", "application/grpc"},
      {"grpc-status", "0"}, {"grpc-message", ""}],
     [<<0, 0, 0, 1, 148, 10, 3, 71, 69, 84, 18, 46, 104, 116, 116, 112, 115,
        ...>>], false},
    {:closed_stream, 13, #PID<0.1028.0>,
     [{":status", "200"}, {"content-type", "application/grpc"},
      {"grpc-status", "0"}, {"grpc-message", ""}],
     [<<0, 0, 0, 1, 142, 10, 3, 71, 69, 84, 18, 46, 104, 116, 116, 112, ...>>],
     false},
    {:closed_stream, 15, #PID<0.1031.0>,
     [{":status", "200"}, {"content-type", "application/grpc"},
      {"grpc-status", "0"}, {"grpc-message", ""}],
     [<<0, 0, 0, 1, 144, 10, 3, 71, 69, 84, 18, 46, 104, 116, 116, ...>>],
     false},
    {:closed_stream, 17, #PID<0.1034.0>,
     [{":status", "200"}, {"content-type", "application/grpc"},
      {"grpc-status", "0"}, {"grpc-message", ""}],
     [<<0, 0, 0, 1, 142, 10, 3, 71, 69, 84, 18, 46, 104, 116, ...>>], false},
    {:closed_stream, 19, #PID<0.1037.0>,
     [{":status", "200"}, {"content-type", "application/grpc"},
      {"grpc-status", "0"}, {"grpc-message", ""}],
     [<<0, 0, 0, 1, 146, 10, 3, 71, 69, 84, 18, 46, 104, ...>>], false},
    {:closed_stream, 21, #PID<0.1040.0>,
     [{":status", "200"}, {"content-type", "application/grpc"},
      {"grpc-status", "0"}, {"grpc-message", ""}],
     [<<0, 0, 0, 1, 144, 10, 3, 71, 69, 84, 18, 46, ...>>], false},
    {:closed_stream, 23, #PID<0.1043.0>,
     [{":status", "200"}, {"content-type", "application/grpc"},
      {"grpc-status", "0"}, {"grpc-message", ""}],
     [<<0, 0, 0, 1, 142, 10, 3, 71, 69, 84, 18, ...>>], false},
    {:closed_stream, 25, #PID<0.1046.0>,
     [{":status", "200"}, {"content-type", "application/grpc"},
      {"grpc-status", "0"}, {"grpc-message", ""}],
     [<<0, 0, 0, 1, 142, 10, 3, 71, 69, 84, ...>>], false},
    {:closed_stream, 27, #PID<0.1049.0>,
     [{":status", "200"}, {"content-type", "application/grpc"},
      {"grpc-status", "0"}, {"grpc-message", ""}],
     [<<0, 0, 0, 1, 142, 10, 3, 71, 69, ...>>], false},
    {:closed_stream, 29, #PID<0.1052.0>,
     [{":status", "200"}, {"content-type", "application/grpc"},
      {"grpc-status", "0"}, {"grpc-message", ""}],
     [<<0, 0, 0, 1, 138, 10, 3, 71, ...>>], false},
    {:closed_stream, 31, #PID<0.1055.0>,
     [{":status", "200"}, {"content-type", "application/grpc"},
      {"grpc-status", "0"}, {"grpc-message", ""}],
     [<<0, 0, 0, 1, 138, 10, 3, ...>>], false},
    {:closed_stream, 33, #PID<0.1058.0>,
     [{":status", "200"}, {"content-type", "application/grpc"},
      {"grpc-status", "0"}, {"grpc-message", ""}],
     [<<0, 0, 0, 1, 142, 10, ...>>], false},
    {:closed_stream, 35, #PID<0.1061.0>,
     [{":status", "200"}, {"content-type", "application/grpc"},
      {"grpc-status", "0"}, {"grpc-message", ""}], [<<0, 0, 0, 1, 142, ...>>],
     false},
    {:closed_stream, 37, #PID<0.1064.0>,
     [{":status", "200"}, {"content-type", "application/grpc"},
      {"grpc-status", "0"}, {"grpc-message", ""}], [<<0, 0, 0, 1, ...>>],
     false},
    {:closed_stream, 39, #PID<0.1067.0>,
     [{":status", "200"}, {"content-type", "application/grpc"},
      {"grpc-status", ...}, {"grpc-message", ...}], [<<0, 0, 0, ...>>], false},
    {:closed_stream, 41, #PID<0.1070.0>,
     [{":status", "200"}, {"content-type", ...}, {...}, ...], [<<0, 0, ...>>],
     false},
    {:closed_stream, 43, #PID<0.1073.0>, [{":status", ...}, {...}, ...],
     [<<0, ...>>], false},
    {:closed_stream, 45, #PID<0.1076.0>, [{...}, ...], [...], ...},
    {:closed_stream, 47, #PID<0.1079.0>, [...], ...},
    {:closed_stream, 49, #PID<0.1082.0>, ...}, {:closed_stream, 51, ...},
    {:closed_stream, ...}, {...}, ...]},
  {:peer_subset, :unlimited, 0, 0, 2, []}}, :chatterbox_static_stream, :empty,
 :undefined, :auto}
** Reason for termination =
** :ebadf

Error compiling hpack

Hi,

Just made a quick project to try this against an existing service. I followed the instructions in the README and ran into a problem compiling hpack using mix do deps.get, compile.

===> Compiling hpack
===> Compiling src/huffman.erl failed
src/huffman.erl:none: undefined parse transform 'lager_transform'

Did I miss a step?

I keep receiving `:gun_down` and `:gun_up` error messages

I use grpc-elixir to build a Dgraph client. I am using the client now to build a little side project. When running the phoenix server these messages keep popping up in my log every couple of seconds.

Most of the message comes from my client. But I am wondering if this behavior is normal. It seems as if the connection breaks every couple of seconds and gun then re-connects.

The gun docs state:

When the connection is lost, Gun will send a gun_down message indicating the current protocol, the reason the connection was lost and two list of stream references.

The first list indicates open streams that may have been processed by the server. The second list indicates open streams that the server did not process.

I could also handle these messages on my side but I first want to figure out what is going on.

Maybe this is not the right place to post this and I will open an issue over at the gun repo. But I thought I give it a try and you know more about this.

[error] ExDgraph.Protocol #PID<0.280.0> received unexpected message: {:gun_down, #PID<0.288.0>, :http2, :normal, [], []}
[error] ExDgraph.Protocol #PID<0.279.0> received unexpected message: {:gun_down, #PID<0.287.0>, :http2, :normal, [], []}
[error] ExDgraph.Protocol #PID<0.278.0> received unexpected message: {:gun_down, #PID<0.286.0>, :http2, :normal, [], []}
[error] ExDgraph.Protocol #PID<0.276.0> received unexpected message: {:gun_down, #PID<0.284.0>, :http2, :normal, [], []}
[error] ExDgraph.Protocol #PID<0.277.0> received unexpected message: {:gun_down, #PID<0.285.0>, :http2, :normal, [], []}
[error] ExDgraph.Protocol #PID<0.280.0> received unexpected message: {:gun_up, #PID<0.288.0>, :http2}
[error] ExDgraph.Protocol #PID<0.279.0> received unexpected message: {:gun_up, #PID<0.287.0>, :http2}
[error] ExDgraph.Protocol #PID<0.277.0> received unexpected message: {:gun_up, #PID<0.285.0>, :http2}
[error] ExDgraph.Protocol #PID<0.276.0> received unexpected message: {:gun_up, #PID<0.284.0>, :http2}
[error] ExDgraph.Protocol #PID<0.278.0> received unexpected message: {:gun_up, #PID<0.286.0>, :http2}
[error] ExDgraph.Protocol #PID<0.280.0> received unexpected message: {:gun_down, #PID<0.288.0>, :http2, :normal, [], []}
[error] ExDgraph.Protocol #PID<0.279.0> received unexpected message: {:gun_down, #PID<0.287.0>, :http2, :normal, [], []}
[error] ExDgraph.Protocol #PID<0.276.0> received unexpected message: {:gun_down, #PID<0.284.0>, :http2, :normal, [], []}
[error] ExDgraph.Protocol #PID<0.278.0> received unexpected message: {:gun_down, #PID<0.286.0>, :http2, :normal, [], []}
[error] ExDgraph.Protocol #PID<0.277.0> received unexpected message: {:gun_down, #PID<0.285.0>, :http2, :normal, [], []}
[error] ExDgraph.Protocol #PID<0.280.0> received unexpected message: {:gun_up, #PID<0.288.0>, :http2}
[error] ExDgraph.Protocol #PID<0.278.0> received unexpected message: {:gun_up, #PID<0.286.0>, :http2}
[error] ExDgraph.Protocol #PID<0.276.0> received unexpected message: {:gun_up, #PID<0.284.0>, :http2}
[error] ExDgraph.Protocol #PID<0.279.0> received unexpected message: {:gun_up, #PID<0.287.0>, :http2}
[error] ExDgraph.Protocol #PID<0.277.0> received unexpected message: {:gun_up, #PID<0.285.0>, :http2}

Timeline for stable hex release

Hey,

I am really looking forward to a stable release on hex since a project of mine depends on grpc-elixir. I cannot move forward without stable dependencies.

As I understand an issue was that gun wasn't stable. This seems to have changed yesterday (I discovered it by accident, no worries): https://github.com/ninenines/gun/releases

Is there a timeline on a stable release? Is there something we can help with?

โค๏ธ

grpc requires Cowboy ~> 2.2.0, phoenix requires 1.0

Currently grpc-elixir requires cowboy ~> 2.2.0. When trying to use it inside a phoenix app this causes an issue because phoenix itself requires cowboy ~> 1.0.

Failed to use "cowboy" (version 1.1.2) because
  apps/trope_api/mix.exs requires ~> 1.0
  deps/grpc/mix.exs requires ~> 2.2.0
  phoenix (version 1.3.2) requires ~> 1.0
  mix.lock specifies 1.1.2

** (Mix) Hex dependency resolution failed, change the version requirements of your dependencies or unlock them (by using mix deps.update or mix deps.unlock). If you are unable to resolve the conflicts you can try overriding with {:dependency, "~> 1.0", override: true}

Is Cowboy 2.2.0 really necessary or could you go down with the version in your mix.exs?

Mini question

Hello tony,

i'm eager to test your code but i'm having trouble getting started it seems like the mix task grpc.gen has been removed which is referenced from the docs and examples. I'm having trouble finding what it has been replaced with. Would apprechiate some pointers in the right direction

:infinity is not a valid value for cowboy setting_timeout

It uses the same value as :idle_timeout which defaults to infinity however, :infinity is not a valid value for :settings_timeout consider adding the posibility to pass :settings_timeout instead of just :idle_timeout and default :settings_timeout to 5000 or something. I can send you a PR if you like

Disabling of lager for library users

Have been using this library for the last few weeks and really liking it - simple and quick to get started. Kudos! :)

My only major problem so far, is that the grpc-elixir imports lager, which seems to take over the logging stack (as in, make itself the default logging library) with no obvious way of turning it off. This could unfortunately be a blocker for using the lib for me. Obviously, the choice of logging stack should be the choice of the user.

I am not sure what the best way to accomplish this would be. I don't see any lager configuration option to disable it, and it seems to be a hard dependency in chatterbox. If we can find an easy way to do this on the user end, perhaps we could add it to the readme?

Middlewares support

Hi, thanks for grpc-elixir, it's enabling my company to move toward Elixir faster :]

There's a bunch of features in grpc-go that we're relying upon that would greatly benefit grpc-elixir.

One of those is support for interceptors, which basically enable to write middlewares for things such as logging, metrics reporting and retrying.

While such a feature can be implemented on top of the generated code, it makes it difficult to write certain middlewares with only let's say say_hello(request, _stream).

So I was looking at the code and thinking that it could be implemented (in broad terms) like that:

# Server-side
defmodule Helloworld.Greeter.Server do
  use GRPC.Server, service: Helloworld.Greeter.Service, middlewares: [m1,m2]

  def say_hello(request, _stream) do
    Helloworld.HelloReply.new(message: "Hello #{request.name}")
  end
end

# Client-side
{:ok, channel} = GRPC.Stub.connect("localhost:50051")
reply = 
  channel 
  |> Helloworld.Greeter.Stub.say_hello(Helloworld.HelloRequest.new(name: "grpc-elixir", middlewares: [m1, m2])

A middleware being defined broadly as following:

# Server-side
defmodule Helloworld.Greeter.LoggingServerMiddleware do
  def call(service_mod, stream, {_, {req_mod, req_stream}, {res_mod, res_stream}} = _rpc, func_name, next_mod) do
    Logger.info(...)
    next.call(service_mod, stream, rpc), func_name, next_mod) 
    # or perform the request with send_request/5 to short circuit the chain 
  end
end

# Client-side
defmodule Helloworld.Greeter.LoggingClientMiddleware do
  @spec call(atom, tuple, String.t, GRPC.Channel, struct | nil, keyword, atom) :: any
  def call(service_mod, rpc, path, channel, request, opts, next_mod) do
     Logger.info(...)
     next.call(service_mod, rpc, path, channel, request, opts, next_mod)
  end
end

Tbh, something like Plug would be nicer but I'd rather go for the straightforward implementation first.

What do you think about such an approach?

I think I can find time at work to implement this but I'd rather talk with you about it first :)

Issue compiling in new elixir app

Hi there,

I seem to be having an issue compiling this (or one of its dependencies) in a new Elixir app.

After adding, {:grpc, github: "tony612/grpc-elixir"} as the sole dependency, and running mix do deps.get, compile, I receive the following error:

===> Compiling chatterbox
===> Compiling src/h2_stream.erl failed
src/h2_stream.erl:105: gen_fsm:start_link/3 is deprecated and will be removed in a future release; use gen_statem:start/3
src/h2_stream.erl:115: gen_fsm:send_event/2 is deprecated and will be removed in a future release; use gen_statem:cast/1
src/h2_stream.erl:120: gen_fsm:send_event/2 is deprecated and will be removed in a future release; use gen_statem:cast/1
src/h2_stream.erl:124: gen_fsm:sync_send_all_state_event/2 is deprecated and will be removed in a future release; use gen_statem:call/2
src/h2_stream.erl:128: gen_fsm:sync_send_all_state_event/2 is deprecated and will be removed in a future release; use gen_statem:call/2
src/h2_stream.erl:132: gen_fsm:send_all_state_event/2 is deprecated and will be removed in a future release; use gen_statem:cast/1
src/h2_stream.erl:136: gen_fsm:send_all_state_event/2 is deprecated and will be removed in a future release; use gen_statem:cast/1
src/h2_stream.erl:139: gen_fsm:sync_send_all_state_event/2 is deprecated and will be removed in a future release; use gen_statem:call/2
src/h2_stream.erl:143: gen_fsm:stop/1 is deprecated and will be removed in a future release; use gen_statem:stop/1
src/h2_stream.erl:485: gen_fsm:send_all_state_event/2 is deprecated and will be removed in a future release; use gen_statem:cast/1

** (Mix) Could not compile dependency :chatterbox, "/Users/cfreeman/.mix/rebar3 bare compile --paths "/Users/cfreeman/projects/elixir/dummy/_build/dev/lib/*/ebin"" command failed. You can recompile this dependency with "mix deps.compile chatterbox", update it with "mix deps.update chatterbox" or clean it with "mix deps.clean chatterbox"

I've attempted this on Elixir 1.4.3, 1.4.4, and 1.4.5.

Do you have any idea what the issue might be? I know this project is also using a fork of chatterbox, but I'm not sure if the issue originates from chatterbox itself or from how this lib is using it.

Using Google.Protobuf predefined types (e.g. Empty)

Hey @tony612
Kudos for your great work with protobufs and gRPC.

I'm extending HelloWorld example trying to create method which does not expect request param.
In proto definition file I imported import "google/protobuf/empty.proto"; and use it in rpc definition, e.g. rpc HeartBeat (google.protobuf.Empty) returns (HelloReply) {}

Then I regenerated the code with provided protoc command. So far everything went smooth.

Then following example in iex REPL I created channel, but I don't know how to pass Google.Protobuf.Empty message to the stub's method.
Does elixir-protobuf lib provides predefined types from Google's protoc includes?

Thank you for help!

can't keepalive

connect between client and server will broken after a while,




defmodule REPLTalk do

	def repltalk(channel) do

		questionAsking = IO.gets "> "
		if (byte_size(questionAsking) <= 1) do
			repltalk(channel)
		else
			#IO.puts("thinking...")
			reply = channel |> Talker.Talker.Stub.ask_question(Talker.Ask.new(question: questionAsking))
			IO.inspect reply.answer
			repltalk(channel)
		end
	end
end

GRPC.Server.start(Talker.Talker.Server, "localhost:50051", insecure: true)
{:ok, channel} = GRPC.Stub.connect("localhost:50051", insecure: true)
REPLTalk.repltalk(channel)

** (exit) exited in: :gen_fsm.sync_send_all_state_event(#PID<0.321.0>, {:new_stream, #PID<0.71.0>})
** (EXIT) no process
(stdlib) gen_fsm.erl:249: :gen_fsm.sync_send_all_state_event/2
(grpc) lib/grpc/adapter/chatterbox/client.ex:37: GRPC.Adapter.Chatterbox.Client.send_header/2
(grpc) lib/grpc/adapter/chatterbox/client.ex:28: GRPC.Adapter.Chatterbox.Client.send_request/3
(grpc) lib/grpc/adapter/chatterbox/client.ex:21: GRPC.Adapter.Chatterbox.Client.unary/3
(grpc) lib/grpc/stub.ex:70: GRPC.Stub.send_request/5
run.exs:12: REPLTalk.repltalk/1

16:50:19.230 [error] GenServer GRPC.Adapter.Cowboy.ServerSup terminating
** (stop) exited in: :gen_fsm.sync_send_all_state_event(#PID<0.321.0>, {:new_stream, #PID<0.71.0>})
** (EXIT) no process
Last message: {:EXIT, #PID<0.71.0>, {:noproc, {:gen_fsm, :sync_send_all_state_event, [#PID<0.321.0>, {:new_stream, #PID<0.71.0>}]}}}
State: {:state, {:local, GRPC.Adapter.Cowboy.ServerSup}, :one_for_one, [], :undefined, 1, 5, [], 0, GRPC.Adapter.Cowboy.ServerSup, []}

Unexpected RECV_DATA message when making unary RPCs

I was making unary RPCs and I noticed unwanted RECV_DATA messages. I was able to reproduce the issue with the route_guide example. I added the following code the display these messages at the end of client.exs

defmodule Flush do
  def flush() do
    receive do
      message ->
        IO.inspect(message, [binaries: :as_strings]))
        flush()
    after
      0 -> :ok
    end
  end
end

Flush.flush()

I see messages like

{:RECV_DATA, 1,
 "\0\0\0\0O\n:Berkshire Valley Management Area Trail, Jefferson, NJ, USA\x12\x11\b\x9A\xA6\x8C\xC3\x01\x10\x96\x9F\x98\x9C\xFD\xFF\xFF\xFF\xFF\x01"}

which is the whole response as far as I can tell.

I do not see such messages when I comment out the unary calls to keep only the streaming calls. I guess the former does not consume them, which does not seem to be on purpose.

Authentication for accessing Google PubSub (and other third-party APIs)

Hi @tony612!

Thanks for making this library. ๐Ÿ‘

I want to use it for accessing Google PubSub sevice. Right now I'm in the process of trying to figure out how to do the authentication properly. Your route_guide example uses a self-signed CA certificate in both client and server. Does this mean that I should find somewhere Google's CA cert and use that? After looking at the source code of gun adapter in grpc-elixir and gun docs, I tried specifying the following opts to create a channel:

opts = [cred: GRPC.Credential.new(ssl: [])] # This is a workaround to make gun adapter establish a TLS connection without providing it a CA cert.
{:ok, channel} = GRPC.Stub.connect("pubsub.googleapis.com:443", opts) 

I'm not sure whether it works properly and how to verify that.

Next, I used goth to retrieve a JWT token. I verified that it works when accessing my Google PubSub project using https://github.com/GoogleCloudPlatform/elixir-google-api that works over REST.

Next, I generated Elixir code from Google PubSub proto files (for client stub and relevant messages).

I also added ability to include the authorization header. I'll post a PR with that code soon.

However, I wasn't able to make this all work. When I execute the code below, I get 404.

token = client.authenticate() # Retrieve a JWT token using goth.
project_id = "pubsub-elixir" # A project I created with Google PubSub.
req = Google.Pubsub.V1.ListTopicsRequest.new(project: "projects/#{project_id}", page_size: 5)
{_, reply} = channel |> Google.Pubsub.V1.Publisher.Stub.list_topics(req, token: token) # Right now this returns `%GRPC.RPCError{message: "status got is 404 instead of 200", status: 13}` as `reply`.

Please advise what should I do differently, or in which direction to move.

Updating to 0.3.0-alpha.1 breaks connection

When upgrading from master on GitHub to 0.3.0-alpha.1 and running my client again I encounter a couple of new errors when trying to connect to a Server (Dgraph) using GRPC:

** (KeyError) key :code not found in: %CaseClauseError{term: {:error, %GRPC.RPCError{message: "{:stream_error, :protocol_error, :\"Malformed response; missing :status in HEADERS frame. (RFC7540 8.1.2.4)\"}", status: 2}}}

** (MatchError) no match of right hand side value: {:error, [code: 2, message: "{:connection_error, :compression_error, :\"Error while trying to decode HPACK-encoded header block. (RFC7540 4.3)\"}"]}

** (MatchError) no match of right hand side value: {:error, [code: 2, message: ":noproc"]}

** (MatchError) no match of right hand side value: {:error, [code: 13, message: "shouldn't finish when getting headers"]}

I tried upgrading protobuf-elixir and protoc-gen-elixir. No luck.

Before I start digging into my code: Any idea what is going on here?

Thank you!

Process not alive after switch to hex package

I switched to the hex package (0.3.0-alpha.1). My app is now throwing this error when I try to connect to the server (Dgraph) via the GRPC.Stub.

01:26:40.282 [error] GenServer #PID<0.6941.0> terminating
** (stop) exited in: :gen_server.call(:gun_sup, {:start_child, [#PID<0.6941.0>, 'localhost', 9080, %{protocols: [:http2], transport: :tcp}]}, :infinity)
    ** (EXIT) no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started
    (stdlib) gen_server.erl:214: :gen_server.call/3
    /Users/ole/exdgraph/deps/gun/src/gun.erl:137: :gun.open/3
    lib/grpc/adapter/gun.ex:32: GRPC.Adapter.Gun.connect_insecurely/2
    (ex_dgraph) lib/exdgraph/protocol.ex:23: ExDgraph.Protocol.connect/1
    (db_connection) lib/db_connection/connection.ex:135: DBConnection.Connection.connect/2
    (connection) lib/connection.ex:622: Connection.enter_connect/5
    (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Last message: nil

Adding grpc to the list of applications in my mix.exs fixed this but the docs state that this is only necessary for Elixir before 1.4 (I'm using 1.6).

Reconnect after network error

Hi,

I'm testing the following scenario against a golang server:

  • run rpc request => everything ok
  • stop server
  • run rpc request => error
  • start server again
  • run rpc request => it should reconnect and return correct answer
iex(9)> {:ok, channel} = GRPC.Stub.connect("localhost:8080", insecure: true)
{:ok,
 %GRPC.Channel{adapter: GRPC.Adapter.Chatterbox.Client, host: "localhost",
  payload: %{pid: #PID<0.450.0>}, port: 8080, scheme: "http"}}
iex(10)> request = Uav.LookupSlugRequest.new(slug: "grpc-elixir")
%Uav.LookupSlugRequest{slug: "grpc-elixir"}
iex(11)> response = Uav.RPC.Stub.lookup_slug(channel, request)
22:46:13.607 [info] [client] Stream 1 WindowUpdate 5
%Uav.LookupSlugResponse{ref_id: nil}

I stopped the golang server.

iex(12)> response = Uav.RPC.Stub.lookup_slug(channel, request)
** (exit) exited in: :gen_fsm.sync_send_all_state_event(#PID<0.450.0>, {:new_stream, #PID<0.425.0>})
    ** (EXIT) no process
    (stdlib) gen_fsm.erl:249: :gen_fsm.sync_send_all_state_event/2
      (grpc) lib/grpc/adapter/chatterbox/client.ex:37: GRPC.Adapter.Chatterbox.Client.send_header/2
      (grpc) lib/grpc/adapter/chatterbox/client.ex:28: GRPC.Adapter.Chatterbox.Client.send_request/3
      (grpc) lib/grpc/adapter/chatterbox/client.ex:21: GRPC.Adapter.Chatterbox.Client.unary/3
      (grpc) lib/grpc/stub.ex:70: GRPC.Stub.send_request/5

I started the golang server again.

iex(13)> response = Uav.RPC.Stub.lookup_slug(channel, request)
** (exit) exited in: :gen_fsm.sync_send_all_state_event(#PID<0.450.0>, {:new_stream, #PID<0.425.0>})
    ** (EXIT) no process
    (stdlib) gen_fsm.erl:249: :gen_fsm.sync_send_all_state_event/2
      (grpc) lib/grpc/adapter/chatterbox/client.ex:37: GRPC.Adapter.Chatterbox.Client.send_header/2
      (grpc) lib/grpc/adapter/chatterbox/client.ex:28: GRPC.Adapter.Chatterbox.Client.send_request/3
      (grpc) lib/grpc/adapter/chatterbox/client.ex:21: GRPC.Adapter.Chatterbox.Client.unary/3
      (grpc) lib/grpc/stub.ex:70: GRPC.Stub.send_request/5

The channel should try to reconnect on next RPC call, at least this is what golang and ruby clients are doing. Any advices ?

Thank you,
Teodor

Error on update

Hey there!

I tried to update this package on my application to the latest commit from master:

  defp deps do
    [...,
     {:grpc, github: "tony612/grpc-elixir", ref: "30ebca9c3d5dd25e9545ea405fc73f37ae152677"}]
  end

Updating was just fine, but when I try to load a new iex session with the mix packages, I get this error:

root@aae0cb8eccc1:/user_service# iex -S mix
Erlang/OTP 19 [erts-8.1] [source] [64-bit] [smp:2:2] [async-threads:10] [hipe] [kernel-poll:false]

Compiling 1 file (.ex)
02:08:37.188 [error] Supervisor 'Elixir.Logger.Supervisor' had child 'Elixir.Logger.ErrorHandler' started with 'Elixir.Logger.Watcher':watcher(error_logger, 'Elixir.Logger.ErrorHandler', {true,false,500}, link) at <0.175.0> exit with reason normal in context child_terminated
02:08:37.233 [info] Application lager started on node nonode@nohost
02:08:37.238 [info] Application gpb started on node nonode@nohost
02:08:37.238 [info] Application exprotobuf started on node nonode@nohost
02:08:37.244 [info] Application hpack started on node nonode@nohost
02:08:37.245 [info] Application chatterbox started on node nonode@nohost
02:08:37.260 [info] Application cowlib started on node nonode@nohost
02:08:37.274 [info] Application ranch started on node nonode@nohost
02:08:37.282 [info] Application cowboy started on node nonode@nohost
02:08:37.282 [info] Application grpc started on node nonode@nohost
02:08:37.287 [info] Application decimal started on node nonode@nohost
02:08:37.290 [info] Application poolboy started on node nonode@nohost
02:08:37.298 [info] Application ecto started on node nonode@nohost
02:08:37.306 [info] Application connection started on node nonode@nohost
02:08:37.320 [info] Application db_connection started on node nonode@nohost
02:08:37.328 [info] Application postgrex started on node nonode@nohost
02:08:37.389 [error] Supervisor 'Elixir.UserService.Supervisor' had child 'Elixir.GRPC.Server.Supervisor' started with 'Elixir.GRPC.Server.Supervisor':start_link({'Elixir.Services.User.Server',50051}) at undefined exit with reason {'EXIT',{undef,[{'Elixir.GRPC.Server.Supervisor',start_link,[{'Elixir.Services.User.Server',50051}],[]},{supervisor,do_start_child,2,[{file,"supervisor.erl"},{line,365}]},{supervisor,start_children,3,[{file,"supervisor.erl"},{line,348}]},{supervisor,init_children,2,[{file,"supervisor.erl"},{line,314}]},{gen_server,init_it,6,[{file,"gen_server.erl"},{line,328}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,247}]}]}} in context start_error
02:08:37.390 [error] CRASH REPORT Process <0.247.0> with 0 neighbours exited with reason:{{shutdown,{failed_to_start_child,'Elixir.GRPC.Server.Supervisor',{'EXIT',{undef,[{'Elixir.GRPC.Server.Supervisor',start_link,[{'Elixir.Services.User.Server',50051}],[]},{supervisor,do_start_child,2,[{file,"supervisor.erl"},{line,365}]},{supervisor,start_children,3,[{file,"supervisor.erl"},{line,348}]},{supervisor,init_children,2,[{file,"supervisor.erl"},{line,314}]},{gen_server,init_it,6,[{file,"gen_server.erl"},{line,328}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,247}]}]}}}},...} in application_master:init/4 line 134

02:08:37.487 [info]  Application user_service exited: UserService.start(:normal, []) returned an error: shutdown: failed to start child: GRPC.Server.Supervisor
    ** (EXIT) an exception was raised:
        ** (UndefinedFunctionError) function GRPC.Server.Supervisor.start_link/1 is undefined (module GRPC.Server.Supervisor is not available)
            GRPC.Server.Supervisor.start_link({Services.User.Server, 50051})
            (stdlib) supervisor.erl:365: :supervisor.do_start_child/2
            (stdlib) supervisor.erl:348: :supervisor.start_children/3
            (stdlib) supervisor.erl:314: :supervisor.init_children/2
            (stdlib) gen_server.erl:328: :gen_server.init_it/6
            (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3

02:08:37.487 [info]  Application postgrex exited: :stopped

02:08:37.488 [info]  Application db_connection exited: :stopped

02:08:37.488 [info]  Application connection exited: :stopped

02:08:37.488 [info]  Application ecto exited: :stopped

02:08:37.489 [info]  Application poolboy exited: :stopped

02:08:37.489 [info]  Application decimal exited: :stopped

02:08:37.490 [info]  Application grpc exited: :stopped

02:08:37.490 [info]  Application cowboy exited: :stopped

02:08:37.491 [info]  Application ranch exited: :stopped

02:08:37.491 [info]  Application cowlib exited: :stopped

02:08:37.493 [info]  Application chatterbox exited: :stopped

02:08:37.494 [info]  Application hpack exited: :stopped

02:08:37.495 [info]  Application exprotobuf exited: :stopped

02:08:37.497 [info]  Application gpb exited: :stopped

02:08:37.499 [info]  Application lager exited: :stopped

=INFO REPORT==== 9-Feb-2017::02:08:37 ===
    application: logger
    exited: stopped
    type: temporary
** (Mix) Could not start application user_service: UserService.start(:normal, []) returned an error: shutdown: failed to start child: GRPC.Server.Supervisor
    ** (EXIT) an exception was raised:
        ** (UndefinedFunctionError) function GRPC.Server.Supervisor.start_link/1 is undefined (module GRPC.Server.Supervisor is not available)
            GRPC.Server.Supervisor.start_link({Services.User.Server, 50051})
            (stdlib) supervisor.erl:365: :supervisor.do_start_child/2
            (stdlib) supervisor.erl:348: :supervisor.start_children/3
            (stdlib) supervisor.erl:314: :supervisor.init_children/2
            (stdlib) gen_server.erl:328: :gen_server.init_it/6
            (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3

Since it seemed like the GRPC.Server.Supervisor module was not being properly loaded, I removed it from the list of the child supervisors on my application and then I did a double check via iex:

iex(8)> h GRPC.Server.Supervisor
Could not load module GRPC.Server.Supervisor, got: nofile

Am I missing something?

For now I just got back to the "stable version" which was the sha1: 1d4609cdc2d5f1e17232d01ae29b9b3bb6fba61b. Besides, it would be great to control the releases via version tag, what do you think?

HTTP2 Content-Type Option

Hi there, I just came across grpc-elixir the other day and decided to see if I could build a Google Datastore client from Google's IDL files. It looks like auth isn't completely supported yet or at least the type of auth needed for datastore (I'm still really fuzzy on how datastore auth works). In the meantime I'm testing against Google's local datastore emulator.

The first snag I hit is that datastore emulator requires the content-type header to be application/grpc instead of application/grpc+proto. After changing the header it works! But it doesn't look like there's a way to set the content-type header through options. On the one hand it looks like datastore emulator may not be completely following spec, but on the other hand this type of problem may be common. Would you be against a PR that added either a content-type or more generically headers to options on Stub.connect?

Feature request: recv streaming response asynchronously via message passing

Message passing is more Elixir way than blocking Stream, especially for streaming response.

As an example, HTTPoison supports async requests in this way:

iex> HTTPoison.get! "https://github.com/", %{}, stream_to: self
%HTTPoison.AsyncResponse{id: #Reference<0.0.0.1654>}
iex> flush
%HTTPoison.AsyncStatus{code: 200, id: #Reference<0.0.0.1654>}
%HTTPoison.AsyncHeaders{headers: %{"Connection" => "keep-alive", ...}, id: #Reference<0.0.0.1654>}
%HTTPoison.AsyncChunk{chunk: "<!DOCTYPE html>...", id: #Reference<0.0.0.1654>}
%HTTPoison.AsyncEnd{id: #Reference<0.0.0.1654>}
:ok

Encode/decode proto's map

Hey there, @tony612!

First, thank you again for this awesome project :-)

So, I'm trying to work with a mapped field for errors, which is properly defined in the proto file:

message CreateUserResponse {
  bool success = 1;
  string id = 2;
  map<string, string> errors = 3;
}

However it turns out that when I try to use the Elixir's map in the errors key:

Services.CreateUserResponse.new(success: false, errors: %{"foo" => "bar"})

I got this error on the server:

04:41:34.410 [warning] lager_error_logger_h dropped 40 messages in the last second that exceeded the limit of 50 messages/sec
04:41:34.410 [error] CRASH REPORT Process <0.367.0> with 0 neighbours exited with reason:{badkey,'__struct__',#{<<"foo">> => <<"bar">>}} in 'Elixir.Protobuf.Encoder':fix_value/1 line 42 in 'Elixir.GRPC.Adapter.Cowboy.StreamHandler':proc_lib_hack/3 line 135

04:41:34.462 [error] Ranch listener Services.User.Server, connection process #PID<0.365.0>, stream 3 had its request process #PID<0.367.0> exit with reason {:badkey, :__struct__, %{"foo" => "bar"}} and stacktrace [{Protobuf.Encoder, :fix_value, 1, [file: 'lib/exprotobuf/encoder.ex', line: 42]}, {Protobuf.Encoder, :"-fix_undefined/1-fun-0-", 2, [file: 'lib/exprotobuf/encoder.ex', line: 31]}, {Enum, :"-reduce/3-lists^foldl/2-0-", 3, [file: 'lib/enum.ex', line: 1623]}, {Protobuf.Encoder, :encode, 2, [file: 'lib/exprotobuf/encoder.ex', line: 22]}, {GRPC.Server, :stream_send, 2, [file: 'lib/grpc/server.ex', line: 143]}, {GRPC.Adapter.Cowboy.Handler, :init, 2, [file: 'lib/grpc/adapter/cowboy/handler.ex', line: 19]}, {:cowboy_handler, :execute, 2, [file: 'src/cowboy_handler.erl', line: 39]}, {GRPC.Adapter.Cowboy.StreamHandler, :execute, 3, [file: 'lib/grpc/adapter/cowboy/stream_handler.ex', line: 143]}]

04:41:34.463 [error] Ranch listener 'Elixir.Services.User.Server', connection process <0.365.0>, stream 3 had its request process <0.367.0> exit with reason {badkey,'__struct__',#{<<"foo">> => <<"bar">>}} and stacktrace [{'Elixir.Protobuf.Encoder',fix_value,1,[{file,"lib/exprotobuf/encoder.ex"},{line,42}]},{'Elixir.Protobuf.Encoder','-fix_undefined/1-fun-0-',2,[{file,"lib/exprotobuf/encoder.ex"},{line,31}]},{'Elixir.Enum','-reduce/3-lists^foldl/2-0-',3,[{file,"lib/enum.ex"},{line,1623}]},{'Elixir.Protobuf.Encoder',encode,2,[{file,"lib/exprotobuf/encoder.ex"},{line,22}]},{'Elixir.GRPC.Server',stream_send,2,[{file,"lib/grpc/server.ex"},{line,143}]},{'Elixir.GRPC.Adapter.Cowboy.Handler',init,2,[{file,"lib/grpc/adapter/cowboy/handler.ex"},{line,19}]},{cowboy_handler,execute,2,[{file,"src/cowboy_handler.erl"},{line,39}]},{'Elixir.GRPC.Adapter.Cowboy.StreamHandler',execute,3,[{file,"lib/grpc/adapter/cowboy/stream_handler.ex"},{line,143}]}]

After testing a few data structures (simple lists, keyword lists, tuples) I could only make it work with a list of tuples such as:

Services.CreateUserResponse.new(success: false, errors: [{"foo", "bar"}])

Is that the expected behavior? If so, is there any particular reason for choosing to encode/decode proto's maps to this kind of structure (list of tuples) rather than using the Elixir's maps?

Looking forward for your answer. Thanks!

Generators, Modules & Typespecs

Currently, when you run the generators, you end up with two generated files, which look something like the following:

my_service.pb.ex:

# DO NOT EDIT!
...
defmodule MyService do
  use Protobuf, """
syntax = "proto3";

service MyService {
  rpc Get (GetRequest) returns (GetReply) {}
}

message GetRequest {
  string name = 1;
  int32 age = 2;
}

message GetReply {
  string hello_name = 1;
}
"""
end

and

defmodule MyService.MyService.Server do
  use GRPC.Server, service: MyService.MyService.Service
  def get(get_request, _stream) do
  end
end

Getting dropped in there after creating a new project, it's incredibly difficult to move forward since you don't know what your types are from the protobufs and you don't have a typespec to guide you (or dialyzer) in your service definitions. I think it would be incredibly beneficial to someone building a service on grpc-elixir to have generators that instead create:

my_service.pb.ex:

# DO NOT EDIT!

defmodule MyService do
  defmodule GetRequest do
    @type t :: %{
        name: String.t,
        age: integer()
    }
    defstruct [name: "", age: 0]
  end

  defmodule GetReply do
    @type t :: %{
      hello_name: String.t
    }
    defstruct [name: ""]
  end
end

my_service_server.ex

defmodule ManifestService.ManifestService.Server do
  @spec(MyService.GetRequest.t, GRPC.Server.Stream.t) :: MyService.GetReply.t | nil
  def get(get_request, _stream) do
  end
end

with the protobufs modules generated a priori, it becomes very to look up what the definition of the modules are in your server code. Additionally, it is trivial to write them into the server with the clear function specs and becomes easy to dialyze your code.

I wanted to garner feedback from @tony612 if this is something interesting and important to this project?

Stateful connections

Hi and thanks for the great work!

In my project I would like to model my connection as a stateful gen_server. Ideally I would get messages from the actual connection with requests, do the processing asynchronously and then reply.

Is far as I understood, the callback in the server module does not get any identifier of the corresponding session and therefor can't dispatch to a state-holding process. Maybe I am missing something?

What would be the best way to achieve such a stateful connection?

Getting 'shouldn't finish when getting headers, status : 13' with the latest release

I've tried to update to the latest release, but I am getting the following error when I try to trigger a call from from the client.

** (MatchError) no match of right hand side value: {:error, %GRPC.RPCError{message: "shouldn't finish when getting headers", status: 13}}

Let me know if I can help with debugging.

These are the deps on my project:

{:phoenix, github: "phoenixframework/phoenix"},
{:phoenix_html, github: "phoenixframework/phoenix_html"},
{:poison, "~> 3.1"},
{:gettext, "~> 0.11"},
{:cowboy, "~> 2.2"},
{:protobuf, "~> 0.5"},
{:grpc, github: "tony612/grpc-elixir"},

Server healthchecking support

Hi, @tony612!
My coworkers and I have been trying to include a simple health check for our GRPC servers in our projects, based on the grpc health checking guide.

Ideally, we would want a small library that can be included in projects easily, like:
https://github.com/renderedtext/grpc_health_check/tree/mn/standalone-application

However, when I try including such a library in a sample project, the project cannot be compiled. It's own GRPC.Server.Supervisor refuses to start:

screenshot 2018-08-01 10 53 16

At a first glance, it seems like there ca be only one GRPC.Server.Supervisor process started.

Any thoughts? ๐Ÿ™‚

The official libraries for supported languages come with the Health Check service already bundled. Have you thought of perhaps adding support for this in grpc-elixir directly?

Message is split in chunks of 2^14 bytes

It seems that Channel.unary returns a response whose body is split in chunks of approximately 2^14 bytes.

As a result, Stub.parse_unary_response can fail to parse a message, since it's seeing only the first 2^14 bytes of it.

Due to this, I'm seeing the following error:

** (MatchError) no match of right hand side value: "(very long string of a bit less than 2^14 bytes)"
             lib/protobuf/decoder.ex:163: Protobuf.Decoder.decode_type/3
             lib/protobuf/decoder.ex:30: Protobuf.Decoder.do_decode/4
    (elixir) lib/enum.ex:1229: Enum."-map/2-lists^map/1-0-"/2
             lib/grpc/stub.ex:157: GRPC.Stub.parse_unary_response/2

Returning an error from GRPC handler

From what I saw in the code, the HTTP/2 response code is always set to 200 no matter what.
It would be great if it was possible to set an error code and change the response code accordingly like in other GRPC implementations.

This implementation would probably use raise with a custom error type I assume?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.