nats-io / nats.ex Goto Github PK
View Code? Open in Web Editor NEWElixir client for NATS, the cloud native messaging system. https://nats.io
License: MIT License
Elixir client for NATS, the cloud native messaging system. https://nats.io
License: MIT License
@quixoten pointed out in a previous PR that when the cluster topology changes (nodes enter or leave?) nats sends INFO
messages with additional information about where each node lives. This way you could do a rolling set of restarts and give the new nodes totally different addresses while keeping all clients online.
We should add support for these messages and somehow update our list of available nodes.
Hi,
Currently the connection prinout here
nats.ex/lib/gnat/connection_supervisor.ex
Line 60 in f2accd1
Logger.info
, this means that if you are deploying this with authentication enabled you will print the secrets into std.out unless you are silencing all info messages.
I have the following proposals:
OR
Thanks for any other suggestions or help :) We're already using this in production for one of our apps.
The current CHANGELOG.md
file have not been updated since version 6.0 release. It would be useful and helpful to let the developer have an overview of changes for each released version.
Nats 2.2+ has introduced the JetStream feature.
It would be nice to have this feature supported in this elixir client lib.
Getting this message every time calling any pub/sub
function.
[warn] Using execute/3 with a single event value is deprecated. Use a measurement map instead.
When working with JetStream Push Consumers there's a type of header message that gets published that has headers like this: NATS/1.0 100 Idle Heartbeat\r\nNats-Last-Consumer: 2\r\nNats-Last-Stream: 2\r\n
. These messages result in a {:error, "Could not parse headers"}
. I believe because this line https://github.com/nats-io/nats.ex/blob/main/lib/gnat/parsec.ex#L116 is expecting a line break immediately after the NATS/1.0
declaration.
It would be useful to know what version of the server the connection is for in order to do feature detection. For example, you can't use Key/Value JetStream features before version 2.6.2. This would help showing users useful error messages as well as conditionally running tests
The current implementation of ConnectionSupervisor
and ConsumerSupervisor
are working, but I'm not sure they are idiomatic or ideal.
I had a conversation on elixir forum where I got some feedback about other possible approaches to this problem.
While experimenting with observer
, my friend @newellista killed the process that the ConsumerSupervisor
had started, and it didn't get restarted ๐ฑ
Instead the consumer supervisor just logged a message
15:43:58.924 [error] Elixir.Gnat.ConsumerSupervisor received unexpected message {:EXIT, #PID<0.361.0>, :killed}
So we need to make sure we are catching those exit messages in the ConsumerSupervisor
and starting it back up.
would it be nice to have a supervisor module built into gnat
that takes a list of nats addresses and supervises a named Gnat
actor and re-connect if the Gnat
actor dies
When a client calls sub
we record a mapping entry to map their sid
to the receivers address (ie where we will deliver the messages). When a client calls unsub
, we should remove this mapping. Also if a client calls unsub
with max_messages: 1
, we should automatically cleanup the mapping of sid
=> pid
.
In the request
/response
flow we generate a new subscription for every request and failure to cleanup these subscriptions will result in a memory leak.
I've been using a fork of your library on Fly.io for logs with a few changes.
My proposal negotiate auth if the client wrote those on its config without needing for the server to ask for it. I've implemented here an example: 9ddb11a
This feels like a breaking change for some so I'd love input on this!
From spec https://docs.nats.io/reference/reference-protocols/nats-protocol#protocol-messages
I've had to do a quick hack to support logs from Fly.io so maybe this is a good thing to add here too?
Would be glad to follow up with a PR
Error messages from gen_tcp are not very helpful. Should we consider validating some connection arguments before calling the low-level libraries?
I paired with the head of devops at my current job because we were seeing a confusing error message in our staging environment.
iex([email protected])5> cfg = v(4)
%{
host: "nats.raw.svc.cluster.local",
port: "4222",
tls: true,
token: "somerandomtoken"
}
iex([email protected])6> Gnat.init(cfg)
** (exit) an exception was raised:
** (FunctionClauseError) no function clause matching in :inet_tcp.getserv/1
(kernel 7.3) inet_tcp.erl:55: :inet_tcp.getserv("4222")
(kernel 7.3) gen_tcp.erl:181: :gen_tcp.connect1/4
(kernel 7.3) gen_tcp.erl:165: :gen_tcp.connect/4
(gnat 1.2.0) lib/gnat/handshake.ex:9: Gnat.Handshake.connect/1
(gnat 1.2.0) lib/gnat.ex:201: Gnat.init/1
(stdlib 3.14.1) erl_eval.erl:680: :erl_eval.do_apply/6
(elixir 1.11.4) src/elixir.erl:280: :elixir.recur_eval/3
(elixir 1.11.4) src/elixir.erl:265: :elixir.eval_forms/3
(iex 1.11.4) lib/iex/evaluator.ex:261: IEx.Evaluator.handle_eval/5
(iex 1.11.4) lib/iex/evaluator.ex:242: IEx.Evaluator.do_eval/3
(iex 1.11.4) lib/iex/evaluator.ex:220: IEx.Evaluator.eval/3
(iex 1.11.4) lib/iex/evaluator.ex:102: IEx.Evaluator.loop/1
(iex 1.11.4) lib/iex/evaluator.ex:32: IEx.Evaluator.init/4
(stdlib 3.14.1) proc_lib.erl:226: :proc_lib.init_p_do_apply/3
(kernel 7.3) gen_tcp.erl:170: :gen_tcp.connect/4
(gnat 1.2.0) lib/gnat/handshake.ex:9: Gnat.Handshake.connect/1
(gnat 1.2.0) lib/gnat.ex:201: Gnat.init/1
The port
was configured as a string (read from an environment variable), but it took some poking at to realize what the problem was.
Ability to pass in a list of connection specs would be an extra bonus
According to the NATS protocol documentation, protocol operation names are case insensitive
NATS protocol operation names are case insensitive, thus
SUB foo 1\r\n
andsub foo 1\r\n
are equivalent.
This includes cleaning up our record of the subscription
I'm in a situation where I need ipv6 to reach a NATS cluster, and cannot use ipv4 to reach it.
NATs currently only calls ipv4, and doesn't have any flag to enable ipv6 that I see.
Any chance you could add a config option in start_link and the Connection Settings config block for passing ipv6 flag down to gen_tcp:connect?
Here's an example of making an ipv6 connection, it just needs that extra socket options parameter to pass to gen_tcp.
{ok, conn} = gen_tcp:connect("ipv6.google.com", 80, [inet6])
Thanks for considering,
Reconnect is currently listed as an experimental feature. Users have expressed interest in promoting this as a fully supported feature. We need to evaluate where reconnect functionality is at and how it can be supported as a first class feature of this client.
@mmmries , wdyt?
Hi there!
I've been using a fork of your library on Fly.io for logs for a while but I had to make a few changes.
My proposal here would be to make inbox optional defaulting to true (the current behavior). I've already had this implemented here: 162cd94
If it's something you'd like I could make a pull request and add tests if needed
Should they default to on or off? How do I turn them on/off manually?
Nats 2.2+ has added new protocol for publishing/subscribing messages w/ http-like headers.
it would be nice for this client lib to support this new protocol.
Do we need to provide test helpers or a stub of some kind?
Similar to #32, if a client subscribed to something and then they die, we won't be able to deliver any more messages to them. How will we detect this? How can we cleanup the subscription in this case?
The other NATS clients all support the ability to make a request and then gather a list of replies within a given timeout period. While this is possible to do manually using the existing functions, it's awkward and error prone and we should have an officially supported way of doing this. The scatter-gather pattern is pretty commonplace when using NATS.
Rust reference: https://docs.rs/nats/0.16.0/nats/struct.Connection.html#method.request_multi
I have this library running in a managed environment that is not fully provisioned yet and got an error report for a badmatch of
{:error, {:tls_alert, 'record overflow'}}
I'm guessing that this instance of gnatsd wasn't fully configured yet so it didn't have TLS enabled. But maybe we should handle this?
I got a rather unhelpful error report today from my system and as I tracked it down the logs around the time of the error look like this:
21:23:18.427 [info] [83, 83, 76, 58, 32, 83, 111, 99, 107, 101, 116, 32, 101, 114, 114, 111, 114, 58, 32, 'etimedout', 32, '\n']
21:23:18.428 [error] connection failed {{:badmatch, {:error, :closed}}, [{Gnat, :handle_call, 3, [file: 'lib/gnat.ex', line: 218]}, {:gen_server, :try_handle_call, 4, [file: 'gen_server.erl', line: 615]}, {:gen_server, :handle_msg, 5, [file: 'gen_server.erl', line: 647]}, {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 247]}]}
21:23:18.461 [error] GenServer :rpc_gnat terminating
** (MatchError) no match of right hand side value: {:error, :closed}
lib/gnat.ex:218: Gnat.handle_call/3
(stdlib) gen_server.erl:615: :gen_server.try_handle_call/4
(stdlib) gen_server.erl:647: :gen_server.handle_msg/5
(stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Last message: {:unsub, 421, [max_messages: 2]}
The failure resulted in my connection restarting and it re-established the connection just fine, but it would be nice to have a better error message, something like "tcp connection to nats failed with error {:error, :closed}
, crashing the process so we can start fresh".
I was recently working on implementing an RPC server which would setup long-lived subscriptions and I was getting an error that kept crashing the Gnat
connection process.
[error] connection failed {{:badmatch, {:error, :einval}}, [{Gnat, :handle_call, 3, [file: 'lib/gnat.ex', line: 195]}, {:gen_server, :try_handle_call, 4, [file: 'gen_server.erl', line: 615]}, {:gen_server, :handle_msg, 5, [file: 'gen_server.erl', line: 647]}, {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 247]}]}
After a bunch of digging I found out that it was because I was calling Gnat.sub
with queue_group: :dispatch_ex
. The TCP library wouldn't serialize that atom in an iolist, it needed to be specified as a string or a charlist. This was pretty confusing to track down, and having a nice error message would have been very helpful.
This is a bet of a broad issue, but I just wanted to make the data available. I locally merged together #25 and #26 and setup a benchmark. You can see the details of the benchmark on my branch, but in essence it is a server node with 4 connections open to nats and 4 subscriptions on the same queue group. The client spins up between 1 and 32 connections to the nats and serially sends requests and waits for responses. Nats, client and server all run on a laptop with 8 cores. Here are the current numbers:
num clients | min | max | median | 90 % | average | throughput |
---|---|---|---|---|---|---|
1 | 186 | 14726 | 230 | 263 | 234 | 4086.60329707154 |
2 | 158 | 23510 | 204 | 285 | 223 | 8582.497541114455 |
4 | 162 | 26558 | 311 | 415 | 328 | 10841.850736446264 |
8 | 244 | 19285 | 565 | 744 | 584 | 12802.59329329749 |
16 | 177 | 26069 | 777 | 1183 | 844 | 17392.752483522 |
32 | 181 | 20510 | 1468 | 2126 | 1509 | 18008.1774008066 |
TL/DR; 18 thousand request/sec with pure elixir
sort of like pure-ruby accepts an SSL Context so you can specify your own CA stores etc
This should have the simplest example of how to use neato to send/receive messages.
The NATS protocol for INFO has an option for auth_required
indicating that the client should try to authenticate on CONNECT
.
Are there any plans to support streaming? I saw a closed #90 and it mentions a separate package. But I could not find anything related. Would be nice to have a streaming client.
We should do this with some good unit tests.
I think it would be nice to have a quick way to detect when the network connection disappears unexpectedly. I'm thinking some kind of inactivity timeout that is renewed every time we receive data from the server. When the inactivity timeout is exceeded, a ping should be sent. If the ping times out the GenServer should be stopped.
Not sure how best to implement. Could it be another process that has something like:
# some other module ?
def activity_monitor
receive do
:refresh -> activity_monitor
after 5_000 -> Gnat.ping_or_stop_async(state.gnat)
end
end
# Gnat module
def handle_info({:tcp, socket, data}, %{socket: socket, parser: parser}=state) do
...
ActivityMonitor.call(state.activity_monitor, :refresh)
...
end
I know that is not a fully fleshed out idea, but I think it's enough to understand how I'm thinking about implementing it.
Expect an erlang client support.
Need it to be "~> 1.0" because of a bunch of other deps. Thanks!
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.