Comments (9)
Sometimes, instead of trailers
we get
%GRPC.RPCError{status: 2, message: "unexpected when waiting for headers: {:data, <<some data>>}"}
from grpc.
I found the problem.
For the context, we use grpc streams.
Gun by default sends its messages to a calling process. This means that if you use grpc in a GenServer and you get a message before calling GRPC.Stub.recv
, it will be dropped by the default implementation of GenServer's handle_info.
That's why we get unexpected when waiting for headers
. Messages are sent from the server correctly (confirmed with Wireshark), gun receives and forwards them correctly too, but they are dropped by GenServer's handle_info 🤔
from grpc.
This seems like a user-side problem on handling gRPC's messages. I'm closing the issue because of that, but feel free to comment if any improvements on our side could be done!
Maybe on using via a multi-purpose GenServer (which I wouldn't advise due to serial message processing issues), you could match on gRPC's messages specifically to handle differently than the generic ones.
from grpc.
This seems like a user-side problem on handling gRPC's messages.
Doesn't it mean that we can't basically use grpc streams in a GenSever?
Even if we knew which messages from gun are gRPC related, we can't feed them into gRPC library. The library assumes we always call GRPC.Stub.recv which under the hood calls :gun.await. This will fails if we call GRPC.Stub.recv too late as we will end up in a state where some of gun messages are received by our GenServer's handle_info and some other are received by :gun.await 🤔
To make our implemenetation work, we had to wrap calls to the grpc library into a plain process with a dedicated receive
block that doesn't have a default branch.
Another option could be using Mint adapter as Mint provides both passive and active mode but unfortunately it crashes.
from grpc.
I took a look into the Gun adapter and I can't actually see what would make your GenServer receive the message that should be sent to Gun instead.
Do you have a minimal example of how you're calling gRPC to get the error?
from grpc.
I can try to explain this in more detail:
- We start sending data to Google Speech to Text (STT) service
- While we are still sending data, STT starts sending responses
- Those responses are received by gun. Gun sends them to a process that spawned it. Becasue our GenServer calls grpc functions like connect, it's our GenServer who (under the hood and implicitly) spawns gun process.
- Our GenServer gets those first responses in handle_info and drops them.
- After we finish streaming to STT, we call GRPC.Stub.recv which calls :gun.await. :gun.await is a simple
receive do end
block that goes through our GenServer's mailbox. It reads http2 responses and returns them. Those responses are then parsed by grpc. The problem is that some of those responses have already been fetched by handle_info.
In other words, I belive it's all about gun working in the active mode - i.e. it sends HTTP2 responses to a calling process which in the case of grpc is always a process that calls GRPC.Stub.connect
.
Does it sound clearer now? I can also try to create some minimal reproducible example in a free time.
from grpc.
Ah, I think I get it more clearly now. A minimal reproduction would be helpful!
However, I think that the issue at hand has more to do with event ordering than anything. From the workflow described, it seems that your GenServer logic expects to only receive data after you finish sending it, and that doesn't seem to correspond to reality.
A possible (paliative) solution would be to spawn a separate linked process to your GenServer, such that this process will only receive messages from gun (or maybe GenServer.call
s from its parent). This would enable you to either just re-enqueue said messages or just never read them (depending on whether you're spawning a plain process or a GenServer of sorts).
It might make sense for this process encapsulation to be absorbed by this library, but I'd need the reproduction to experiment with this.
from grpc.
I can come up with an example for this this week. I've working on an internal library at my workplace to manage connection pools using elixir-grpc and I know the best way to show this
from grpc.
@mickel8 Not sure if you found a workaround for this, but here's what I got
In my code design, I've wrap the processing of grpc streams inside a gen_server. What I was able to do was:
- I wrap the stream processing inside a task
- that task will be iterating over the Stream struct (that calls
:gun.await
) - My GenServer will receive the HTTP Packages and forward them to the Task pid
- Inside the Task pid, the calls to
gun.await
will no longer hang
You're right on your assumption about this being problem with Gun in active mode. There are some other underlying issues with the Gun adapter leaking the messages to the process that start the connection as well. Anyways, I hope you find the bellow code useful
defmodule Processor do
#.. my own code
defp request(%{stream: nil, processor_meta: processor_meta} = _state, request) do
channel = ConnectionPool.channel!(processor_meta[:pool])
%{grpc_module: grpc_module, rpc_name: rpc_name} = processor_meta[:stream_rpc]
stub = Module.concat(grpc_module, Stub)
stream =
stub
|> apply(rpc_name, [channel])
|> GRPC.Stub.send_request(request)
# Start the gRPC Streaming request
{:ok, ex_stream} = GRPC.Stub.recv(stream, timeout: :infinity)
my_pid = self()
stream_processor_sup = processor_meta[:task_supervisor]
task =
Task.Supervisor.async(stream_processor_sup, fn ->
# Wrap this gRPC Streaming processing inside a task and store the task pid
ex_stream
|> Stream.each(&send(my_pid, {:process_response, &1}))
|> Stream.run()
send(my_pid, {:process_response, :done})
end)
{stream, task.pid, make_ref()}
end
defp do_send_request(%{stream: stream} = state, query_request) do
stream = GRPC.Stub.send_request(stream, query_request)
{stream, nil, state.ref}
end
@impl GenServer
def handle_info(
{:process_response, {:ok, response}},
state
) do
state.processor_meta[:receive_function].(response)
{:noreply, state}
end
def handle_info({:process_response, :done}, state) do
{:noreply, state, {:continue, :reset}}
end
def handle_info({_ref, {:process_response, :done}}, state) do
{:noreply, state}
end
def handle_info({:process_response, {:error, error}}, state) do
Logger.error("received error from stream: #{inspect(error)}")
{:noreply, state, {:continue, :reset}}
end
def handle_info(msg, state) do
# async actions
case msg do
# gun specific messages
{:gun_data, _pid, _ref, _is_fin, _data} ->
if is_pid(state.stream_task), do: send(state.stream_task, msg)
{:gun_error, _pid, _ref, _reason} ->
if is_pid(state.stream_task), do: send(state.stream_task, msg)
{:gun_trailers, _pid, _ref, _headers} ->
if is_pid(state.stream_task), do: send(state.stream_task, msg)
# messages received when the async task finish
{_ref, :ok} ->
:ok
{:DOWN, _ref, :process, _pid, :normal} ->
:ok
_other ->
Logger.warning("unexpected msg: #{inspect(msg)}")
end
# gen server reply
case msg do
{:DOWN, _ref, :process, _pid, :normal} -> {:noreply, state, {:continue, :reconnect}}
_other -> {:noreply, state}
end
end
end
from grpc.
Related Issues (20)
- Add client-side measurements to telemetry livebook
- Bubble errors up from server (and client?) pipelines HOT 2
- Approximately 50ms latency increase observed for synchronous vs asynchronous calls HOT 14
- Default `accepted_comparators` option of logger interceptors doesn't match Elixir's default behavior
- Building a new release with the latest changes. HOT 4
- Add the ability to use a different webserver HOT 5
- shutdown: failed to start child: GRPC.Server.Supervisor
- Cannot cancel Server streaming RPC from GRPC.Stub
- publish version bump to hex.pm -- version wasn't bumped for deps tweak HOT 1
- GRPC.STUB connect loops on this error: `CLIENT ALERT: Fatal - Unknown CA` HOT 2
- Idea: Set GitHub repo website URL to hex.pm page
- Improve the documentation for generated Stub functions HOT 1
- Gun messages are not handled by GRPC library HOT 1
- Add guide for Server and Client interceptors HOT 3
- gRPC HTTP/2 Channel Connection Pooling HOT 2
- Mint Adapter crashes HOT 5
- Are grpc server contexts supported? HOT 2
- Server Side Error Reports
- Passing a nil channel to GRPC.Stub function errors on compressor
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from grpc.