Giter Club home page Giter Club logo

Comments (9)

mickel8 avatar mickel8 commented on June 9, 2024

Sometimes, instead of trailers we get

%GRPC.RPCError{status: 2, message: "unexpected when waiting for headers: {:data, <<some data>>}"}

from grpc.

mickel8 avatar mickel8 commented on June 9, 2024

I found the problem.

For the context, we use grpc streams.

Gun by default sends its messages to a calling process. This means that if you use grpc in a GenServer and you get a message before calling GRPC.Stub.recv, it will be dropped by the default implementation of GenServer's handle_info.

That's why we get unexpected when waiting for headers. Messages are sent from the server correctly (confirmed with Wireshark), gun receives and forwards them correctly too, but they are dropped by GenServer's handle_info 🤔

from grpc.

polvalente avatar polvalente commented on June 9, 2024

This seems like a user-side problem on handling gRPC's messages. I'm closing the issue because of that, but feel free to comment if any improvements on our side could be done!

Maybe on using via a multi-purpose GenServer (which I wouldn't advise due to serial message processing issues), you could match on gRPC's messages specifically to handle differently than the generic ones.

from grpc.

mickel8 avatar mickel8 commented on June 9, 2024

This seems like a user-side problem on handling gRPC's messages.

Doesn't it mean that we can't basically use grpc streams in a GenSever?

Even if we knew which messages from gun are gRPC related, we can't feed them into gRPC library. The library assumes we always call GRPC.Stub.recv which under the hood calls :gun.await. This will fails if we call GRPC.Stub.recv too late as we will end up in a state where some of gun messages are received by our GenServer's handle_info and some other are received by :gun.await 🤔

To make our implemenetation work, we had to wrap calls to the grpc library into a plain process with a dedicated receive block that doesn't have a default branch.

Another option could be using Mint adapter as Mint provides both passive and active mode but unfortunately it crashes.

from grpc.

polvalente avatar polvalente commented on June 9, 2024

I took a look into the Gun adapter and I can't actually see what would make your GenServer receive the message that should be sent to Gun instead.

Do you have a minimal example of how you're calling gRPC to get the error?

from grpc.

mickel8 avatar mickel8 commented on June 9, 2024

I can try to explain this in more detail:

  1. We start sending data to Google Speech to Text (STT) service
  2. While we are still sending data, STT starts sending responses
  3. Those responses are received by gun. Gun sends them to a process that spawned it. Becasue our GenServer calls grpc functions like connect, it's our GenServer who (under the hood and implicitly) spawns gun process.
  4. Our GenServer gets those first responses in handle_info and drops them.
  5. After we finish streaming to STT, we call GRPC.Stub.recv which calls :gun.await. :gun.await is a simple receive do end block that goes through our GenServer's mailbox. It reads http2 responses and returns them. Those responses are then parsed by grpc. The problem is that some of those responses have already been fetched by handle_info.

In other words, I belive it's all about gun working in the active mode - i.e. it sends HTTP2 responses to a calling process which in the case of grpc is always a process that calls GRPC.Stub.connect.

Does it sound clearer now? I can also try to create some minimal reproducible example in a free time.

from grpc.

polvalente avatar polvalente commented on June 9, 2024

Ah, I think I get it more clearly now. A minimal reproduction would be helpful!

However, I think that the issue at hand has more to do with event ordering than anything. From the workflow described, it seems that your GenServer logic expects to only receive data after you finish sending it, and that doesn't seem to correspond to reality.

A possible (paliative) solution would be to spawn a separate linked process to your GenServer, such that this process will only receive messages from gun (or maybe GenServer.calls from its parent). This would enable you to either just re-enqueue said messages or just never read them (depending on whether you're spawning a plain process or a GenServer of sorts).

It might make sense for this process encapsulation to be absorbed by this library, but I'd need the reproduction to experiment with this.

from grpc.

beligante avatar beligante commented on June 9, 2024

I can come up with an example for this this week. I've working on an internal library at my workplace to manage connection pools using elixir-grpc and I know the best way to show this

from grpc.

beligante avatar beligante commented on June 9, 2024

@mickel8 Not sure if you found a workaround for this, but here's what I got

In my code design, I've wrap the processing of grpc streams inside a gen_server. What I was able to do was:

  • I wrap the stream processing inside a task
  • that task will be iterating over the Stream struct (that calls :gun.await )
  • My GenServer will receive the HTTP Packages and forward them to the Task pid
  • Inside the Task pid, the calls to gun.await will no longer hang

You're right on your assumption about this being problem with Gun in active mode. There are some other underlying issues with the Gun adapter leaking the messages to the process that start the connection as well. Anyways, I hope you find the bellow code useful

defmodule Processor do
  #.. my own code 
  defp request(%{stream: nil, processor_meta: processor_meta} = _state, request) do
    channel = ConnectionPool.channel!(processor_meta[:pool])
    %{grpc_module: grpc_module, rpc_name: rpc_name} = processor_meta[:stream_rpc]
    stub = Module.concat(grpc_module, Stub)

    stream =
      stub
      |> apply(rpc_name, [channel])
      |> GRPC.Stub.send_request(request)

      
    # Start the gRPC Streaming request
    {:ok, ex_stream} = GRPC.Stub.recv(stream, timeout: :infinity)
    my_pid = self()

    stream_processor_sup = processor_meta[:task_supervisor]

    task =
      Task.Supervisor.async(stream_processor_sup, fn ->
        # Wrap this gRPC Streaming processing inside a task and store the task pid
        ex_stream
        |> Stream.each(&send(my_pid, {:process_response, &1}))
        |> Stream.run()

        send(my_pid, {:process_response, :done})
      end)

    {stream, task.pid, make_ref()}
  end

  defp do_send_request(%{stream: stream} = state, query_request) do
    stream = GRPC.Stub.send_request(stream, query_request)
    {stream, nil, state.ref}
  end

  @impl GenServer
  def handle_info(
        {:process_response, {:ok, response}},
        state
      ) do
    state.processor_meta[:receive_function].(response)

    {:noreply, state}
  end

  def handle_info({:process_response, :done}, state) do
    {:noreply, state, {:continue, :reset}}
  end

  def handle_info({_ref, {:process_response, :done}}, state) do
    {:noreply, state}
  end

  def handle_info({:process_response, {:error, error}}, state) do
    Logger.error("received error from stream: #{inspect(error)}")
    {:noreply, state, {:continue, :reset}}
  end

  def handle_info(msg, state) do
    # async actions
    case msg do
      # gun specific messages
      {:gun_data, _pid, _ref, _is_fin, _data} ->
        if is_pid(state.stream_task), do: send(state.stream_task, msg)

      {:gun_error, _pid, _ref, _reason} ->
        if is_pid(state.stream_task), do: send(state.stream_task, msg)

      {:gun_trailers, _pid, _ref, _headers} ->
        if is_pid(state.stream_task), do: send(state.stream_task, msg)

      # messages received when the async task finish
      {_ref, :ok} ->
        :ok

      {:DOWN, _ref, :process, _pid, :normal} ->
        :ok

      _other ->
        Logger.warning("unexpected msg: #{inspect(msg)}")
    end

    # gen server reply
    case msg do
      {:DOWN, _ref, :process, _pid, :normal} -> {:noreply, state, {:continue, :reconnect}}
      _other -> {:noreply, state}
    end
  end
end

from grpc.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.