Giter Club home page Giter Club logo

nats-pure.rb's Introduction

NATS - Pure Ruby Client

A thread safe Ruby client for the NATS messaging system written in pure Ruby.

License Apache 2.0Build StatusGem Version

Getting Started

gem install nats-pure

Basic Usage

require 'nats/client'

nats = NATS.connect("demo.nats.io")
puts "Connected to #{nats.connected_server}"

# Simple subscriber
nats.subscribe("foo.>") { |msg, reply, subject| puts "Received on '#{subject}': '#{msg}'" }

# Simple Publisher
nats.publish('foo.bar.baz', 'Hello World!')

# Unsubscribing
sub = nats.subscribe('bar') { |msg| puts "Received : '#{msg}'" }
sub.unsubscribe()

# Requests with a block handles replies asynchronously
nats.request('help', 'please', max: 5) { |response| puts "Got a response: '#{response}'" }

# Replies
sub = nats.subscribe('help') do |msg|
  puts "Received on '#{msg.subject}': '#{msg.data}' with headers: #{msg.header}"
  msg.respond("I'll help!")
end

# Request without a block waits for response or timeout
begin
  msg = nats.request('help', 'please', timeout: 0.5)
  puts "Received on '#{msg.subject}': #{msg.data}"
rescue NATS::Timeout
  puts "nats: request timed out"
end

# Request using a message with headers
begin
  msg = NATS::Msg.new(subject: "help", headers: {foo: 'bar'})
  resp = nats.request_msg(msg)
  puts "Received on '#{resp.subject}': #{resp.data}"
rescue NATS::Timeout => e
  puts "nats: request timed out: #{e}"
end

# Server roundtrip which fails if it does not happen within 500ms
begin
  nats.flush(0.5)
rescue NATS::Timeout
  puts "nats: flush timeout"
end

# Closes connection to NATS
nats.close

JetStream Usage

Introduced in v2.0.0 series, the client can now publish and receive messages from JetStream.

require 'nats/client'

nc = NATS.connect("nats://demo.nats.io:4222")
js = nc.jetstream

js.add_stream(name: "mystream", subjects: ["foo"])

Thread.new do
  loop do
    # Periodically publish messages
    js.publish("foo", "Hello JetStream!")
    sleep 0.1
  end
end

psub = js.pull_subscribe("foo", "bar")

loop do
  begin
    msgs = psub.fetch(5)
    msgs.each do |msg|
      msg.ack
    end
  rescue NATS::IO::Timeout
    puts "Retry later..."
  end
end

Clustered Usage

require 'nats/client'

cluster_opts = {
  servers: ["nats://127.0.0.1:4222", "nats://127.0.0.1:4223"],
  dont_randomize_servers: true,
  reconnect_time_wait: 0.5,
  max_reconnect_attempts: 2
}

nats = NATS.connect(cluster_opts)
puts "Connected to #{nats.connected_server}"


nats.on_error do |e|
  puts "Error: #{e}"
end

nats.on_reconnect do
  puts "Reconnected to server at #{nats.connected_server}"
end

nats.on_disconnect do
  puts "Disconnected!"
end

nats.on_close do
  puts "Connection to NATS closed"
end

nats.subscribe("hello") do |msg|
  puts "#{Time.now} - Received: #{msg.data}"
end

n = 0
loop do
  n += 1
  nats.publish("hello", "world.#{n}")
  sleep 0.1
end

TLS

It is possible to setup a custom TLS connection to NATS by passing an OpenSSL context to the client to be used on connect:

tls_context = OpenSSL::SSL::SSLContext.new
tls_context.ssl_version = :TLSv1_2

NATS.connect({
   servers: ['tls://127.0.0.1:4444'],
   reconnect: false,
   tls: {
     context: tls_context
   }
 })

WebSocket

Since NATS Server v2.2 it is possible to connect to a NATS server using WebSocket.

  1. Add a websocket gem to your Gemfile:

    # Gemfile
    gem 'websocket'
  2. Connect to WebSocket-enabled NATS Server using ws or wss protocol in URLs (for plain and secure connection respectively):

    nats = NATS.connect("wss://demo.nats.io:8443")
  3. Use NATS as usual.

NKEYS and JWT User Credentials

This requires server with version >= 2.0.0

Starting from v0.6.0 release of the client, you can also optionally install NKEYS in order to use the new NATS v2.0 auth features:

gem install nkeys

NATS servers have a new security and authentication mechanism to authenticate with user credentials and NKEYS. A single file containing the JWT and NKEYS to authenticate against a NATS v2 server can be set with the user_credentials option:

NATS.connect("tls://connect.ngs.global", user_credentials: "/path/to/creds")

This will create two callback handlers to present the user JWT and sign the nonce challenge from the server. The core client library never has direct access to your private key and simply performs the callback for signing the server challenge. The library will load and wipe and clear the objects it uses for each connect or reconnect.

Bare NKEYS are also supported. The nkey seed should be in a read only file, e.g. seed.txt.

> cat seed.txt
# This is my seed nkey!
SUAGMJH5XLGZKQQWAWKRZJIGMOU4HPFUYLXJMXOO5NLFEO2OOQJ5LPRDPM

Then in the client specify the path to the seed using the nkeys_seed option:

NATS.connect("tls://connect.ngs.global", nkeys_seed: "path/to/seed.txt")

Cluster Server Discovery

By default, when you connect to a NATS server that's in a cluster, the client will take information about servers it doesn't know about yet. This can be disabled at connection time:

NATS.connect(servers: ['nats://127.0.0.1:4444'], ignore_discovered_urls: true)

Ractor Usage

Using NATS within a Ractor requires URI 0.11.0 or greater to be installed.

Ractor.new do
  ractor_nats = NATS.connect('demo.nats.io')

  ractor_nats.subscribe('foo') do |msg, reply|
    puts "Received on '#{msg.subject}': '#{msg.data}' with headers: #{msg.header}"
    ractor_nats.publish(reply, 'baz')
  end

  sleep
end

nats = NATS.connect('demo.nats.io')
response = nats.request('foo', 'bar', timeout: 0.5)
puts response.data

License

Unless otherwise noted, the NATS source files are distributed under the Apache Version 2.0 license found in the LICENSE file.

nats-pure.rb's People

Contributors

adamrainsby avatar anykeyh avatar bruth avatar calavera avatar capps avatar cavalle avatar dependabot[bot] avatar envek avatar gcolliso avatar jphenow avatar mason-stewart avatar palkan avatar petergoldstein avatar quixoten avatar remicaumette avatar ripienaar avatar rodrigc avatar wallyqs avatar zaben903 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

nats-pure.rb's Issues

Multi-threaded use problems

We use nats-pure.rb in a Rails environment, spawning multiple threads (because: puma, for example), and it seems to be wrecking havoc in certain scenarios.

(We've already contacted a maintainer of this project, this issue merely serves as a more public channel of communication in the spirit that others facing the same issues can find solutions)

Even with 0.7.2, we've been having messages going to the wrong callbacks. Essentially, connecting to a NATS cluster may trigger its callback with the wrong response (for example: expecting a PONG, but receiving a messages originally destined as a response to a request, sent from another thread). This causes errors like:

NATS::IO::ConnectError
expected PONG, got MSG _INBOX.lN1A3W2inyFq5CiG7u1sbR.lN1A3W2inyFq5CiG7u1sob 1 30

or

NATS::IO::ServerError
Unknown protocol: <message body destined as a response to a request>

Our temporary solution is to use thread-local variables for memoizing NATS connections like:

Thread.curent[:nats] ||= NATS::IO::Client.new

Unrelated to connection issues, it seems like nats-pure.rb can sometimes stall the application. I've encountered this during an ActiveRecord transaction. I can't be sure what's happening, but none of the DB queries coming from a nats.subscribe block would get through. Listing the various current threads showed something like:

---- thread 0: #<Thread:0x000000015405fac8 sleep>
/Users/jerome/.rubies/ruby-2.7.2/lib/ruby/2.7.0/monitor.rb:108:in `sleep'
/Users/jerome/.rubies/ruby-2.7.2/lib/ruby/2.7.0/monitor.rb:108:in `wait'
/Users/jerome/.rubies/ruby-2.7.2/lib/ruby/2.7.0/monitor.rb:108:in `wait_for_cond'
/Users/jerome/.rubies/ruby-2.7.2/lib/ruby/2.7.0/monitor.rb:108:in `wait'
/Users/jerome/.gem/ruby/2.7.2/bundler/gems/nats-pure.rb-f04628e4d536/lib/nats/io/client.rb:484:in `block (2 levels) in request'
---- thread 1: #<Thread:0x00000001350d9ec8 /Users/jerome/.gem/ruby/2.7.2/gems/activerecord-6.1.4/lib/active_record/connection_adapters/abstract/connection_pool.rb:323 sleep>
/Users/jerome/.gem/ruby/2.7.2/gems/activerecord-6.1.4/lib/active_record/connection_adapters/abstract/connection_pool.rb:329:in `sleep'
/Users/jerome/.gem/ruby/2.7.2/gems/activerecord-6.1.4/lib/active_record/connection_adapters/abstract/connection_pool.rb:329:in `block in spawn_thread'
---- thread 2: #<Thread:0x0000000124f47cf0 /Users/jerome/.gem/ruby/2.7.2/gems/prometheus_exporter-0.7.0/lib/prometheus_exporter/server/web_server.rb:107 sleep>
/Users/jerome/.gem/ruby/2.7.2/gems/webrick-1.7.0/lib/webrick/server.rb:173:in `select'
/Users/jerome/.gem/ruby/2.7.2/gems/webrick-1.7.0/lib/webrick/server.rb:173:in `block in start'
/Users/jerome/.gem/ruby/2.7.2/gems/webrick-1.7.0/lib/webrick/server.rb:32:in `start'
/Users/jerome/.gem/ruby/2.7.2/gems/webrick-1.7.0/lib/webrick/server.rb:160:in `start'
/Users/jerome/.gem/ruby/2.7.2/gems/prometheus_exporter-0.7.0/lib/prometheus_exporter/server/web_server.rb:109:in `block in start'
---- thread 3: #<Thread:0x000000013702c550 /Users/jerome/.gem/ruby/2.7.2/gems/minitest-5.14.4/lib/minitest/parallel.rb:28 sleep_forever>
/Users/jerome/.gem/ruby/2.7.2/gems/minitest-5.14.4/lib/minitest/parallel.rb:30:in `pop'
/Users/jerome/.gem/ruby/2.7.2/gems/minitest-5.14.4/lib/minitest/parallel.rb:30:in `block (2 levels) in start'
---- thread 4: #<Thread:0x000000013702c438 /Users/jerome/.gem/ruby/2.7.2/gems/minitest-5.14.4/lib/minitest/parallel.rb:28 sleep_forever>
/Users/jerome/.gem/ruby/2.7.2/gems/minitest-5.14.4/lib/minitest/parallel.rb:30:in `pop'
/Users/jerome/.gem/ruby/2.7.2/gems/minitest-5.14.4/lib/minitest/parallel.rb:30:in `block (2 levels) in start'
---- thread 5: #<Thread:0x00000001371a0378 /Users/jerome/.gem/ruby/2.7.2/bundler/gems/nats-pure.rb-f04628e4d536/lib/nats/io/client.rb:1368 sleep>
/Users/jerome/.gem/ruby/2.7.2/bundler/gems/nats-pure.rb-f04628e4d536/lib/nats/io/client.rb:1591:in `select'
/Users/jerome/.gem/ruby/2.7.2/bundler/gems/nats-pure.rb-f04628e4d536/lib/nats/io/client.rb:1591:in `rescue in read'
/Users/jerome/.gem/ruby/2.7.2/bundler/gems/nats-pure.rb-f04628e4d536/lib/nats/io/client.rb:1588:in `read'
/Users/jerome/.gem/ruby/2.7.2/bundler/gems/nats-pure.rb-f04628e4d536/lib/nats/io/client.rb:1059:in `block in read_loop'
/Users/jerome/.gem/ruby/2.7.2/bundler/gems/nats-pure.rb-f04628e4d536/lib/nats/io/client.rb:1048:in `loop'
---- thread 6: #<Thread:0x00000001371a0288 /Users/jerome/.gem/ruby/2.7.2/bundler/gems/nats-pure.rb-f04628e4d536/lib/nats/io/client.rb:1372 sleep_forever>
/Users/jerome/.gem/ruby/2.7.2/bundler/gems/nats-pure.rb-f04628e4d536/lib/nats/io/client.rb:1077:in `pop'
/Users/jerome/.gem/ruby/2.7.2/bundler/gems/nats-pure.rb-f04628e4d536/lib/nats/io/client.rb:1077:in `block in flusher_loop'
/Users/jerome/.gem/ruby/2.7.2/bundler/gems/nats-pure.rb-f04628e4d536/lib/nats/io/client.rb:1075:in `loop'
/Users/jerome/.gem/ruby/2.7.2/bundler/gems/nats-pure.rb-f04628e4d536/lib/nats/io/client.rb:1075:in `flusher_loop'
/Users/jerome/.gem/ruby/2.7.2/bundler/gems/nats-pure.rb-f04628e4d536/lib/nats/io/client.rb:1372:in `block in start_threads!'
---- thread 7: #<Thread:0x00000001371a01c0 /Users/jerome/.gem/ruby/2.7.2/bundler/gems/nats-pure.rb-f04628e4d536/lib/nats/io/client.rb:1376 sleep>
/Users/jerome/.gem/ruby/2.7.2/bundler/gems/nats-pure.rb-f04628e4d536/lib/nats/io/client.rb:1112:in `sleep'
/Users/jerome/.gem/ruby/2.7.2/bundler/gems/nats-pure.rb-f04628e4d536/lib/nats/io/client.rb:1112:in `block in ping_interval_loop'
/Users/jerome/.gem/ruby/2.7.2/bundler/gems/nats-pure.rb-f04628e4d536/lib/nats/io/client.rb:1111:in `loop'
/Users/jerome/.gem/ruby/2.7.2/bundler/gems/nats-pure.rb-f04628e4d536/lib/nats/io/client.rb:1111:in `ping_interval_loop'
/Users/jerome/.gem/ruby/2.7.2/bundler/gems/nats-pure.rb-f04628e4d536/lib/nats/io/client.rb:1376:in `block in start_threads!'
---- thread 8: #<Thread:0x000000013532fc18 /Users/jerome/.gem/ruby/2.7.2/bundler/gems/nats-pure.rb-f04628e4d536/lib/nats/io/client.rb:419 run>
/Users/jerome/src/github.com/superfly/web/test/graphql/mutations/launch_machine_test.rb:39:in `backtrace'
/Users/jerome/src/github.com/superfly/web/test/graphql/mutations/launch_machine_test.rb:39:in `block (3 levels) in <class:LaunchMachineTest>'
/Users/jerome/src/github.com/superfly/web/test/graphql/mutations/launch_machine_test.rb:37:in `each'
/Users/jerome/src/github.com/superfly/web/test/graphql/mutations/launch_machine_test.rb:37:in `each_with_index'
/Users/jerome/src/github.com/superfly/web/test/graphql/mutations/launch_machine_test.rb:37:in `block (2 levels) in <class:LaunchMachineTest>'
---- thread 9: #<Thread:0x000000013532c428 /Users/jerome/.gem/ruby/2.7.2/bundler/gems/nats-pure.rb-f04628e4d536/lib/nats/io/client.rb:419 sleep_forever>
/Users/jerome/.gem/ruby/2.7.2/bundler/gems/nats-pure.rb-f04628e4d536/lib/nats/io/client.rb:421:in `pop'
/Users/jerome/.gem/ruby/2.7.2/bundler/gems/nats-pure.rb-f04628e4d536/lib/nats/io/client.rb:421:in `block (2 levels) in subscribe'
/Users/jerome/.gem/ruby/2.7.2/bundler/gems/nats-pure.rb-f04628e4d536/lib/nats/io/client.rb:420:in `loop'
/Users/jerome/.gem/ruby/2.7.2/bundler/gems/nats-pure.rb-f04628e4d536/lib/nats/io/client.rb:420:in `block in subscribe'
---- thread 10: #<Thread:0x00000001261c4b80 /Users/jerome/.gem/ruby/2.7.2/bundler/gems/nats-pure.rb-f04628e4d536/lib/nats/io/client.rb:1394 sleep_forever>
/Users/jerome/.gem/ruby/2.7.2/bundler/gems/nats-pure.rb-f04628e4d536/lib/nats/io/client.rb:1396:in `pop'
/Users/jerome/.gem/ruby/2.7.2/bundler/gems/nats-pure.rb-f04628e4d536/lib/nats/io/client.rb:1396:in `block (2 levels) in start_resp_mux_sub!'
/Users/jerome/.gem/ruby/2.7.2/bundler/gems/nats-pure.rb-f04628e4d536/lib/nats/io/client.rb:1395:in `loop'
/Users/jerome/.gem/ruby/2.7.2/bundler/gems/nats-pure.rb-f04628e4d536/lib/nats/io/client.rb:1395:in `block in start_resp_mux_sub!'

I'm not savvy enough about nats-pure.rb to do much about this, unfortunately.

Jetstream example isn't working

I'm copy pasting the Jetstream exemple on a fresh installation of nats on MacOS (intel).

Pushing seems to work, getting this:
=> #<struct NATS::JetStream::PubAck stream="mystream", seq=7, duplicate=nil, domain=nil>

Fetching however returns:
nats-pure-2.2.0/lib/nats/io/client.rb:369:in publish': NATS::IO::BadSubject (NATS::IO::BadSubject)`

If I try subscribe on another topic I made by myself I get:
nats-pure-2.2.0/lib/nats/io/js.rb:479:in api_request': invalid JSON (status_code=400, err_code=10025) (NATS::JetStream::Error::BadRequest)`

I'm also getting timeouts if I try to push from a Rails controller method. Nothing seems to be working, or I'm doing something really wrong.

Long running jobs get killed when draining

When draining the nats client, the last or currently running job is killed immediately and not allowed to process and finish.
It appears that in client.rb once the pending_queue.size == 0 (line 1098) the wait_for_msgs_t thread is exited and not allowed to finish (line 1108).
Would be nice if we gave a long running job time to finished before being killed.

Example:

require 'nats/io/client'
nats = NATS.connect("nats://localhost:4222")
puts "Connected to #{nats.connected_server}"

# Slow running job
sub = nats.subscribe("foo.>") do |msg, reply, subject| 
  puts "queue size: #{sub.pending_queue.size}"
  puts "*Received #{msg}" 
  sleep 5
  puts "*Done with #{msg}"
end

nats.on_disconnect do
  puts "Disconnected!"
end

nats.on_close do
  puts "Connection to NATS closed"
end

# Send three messages, I would expect all of them to get processed
(1..3).each do |cnt|
  nats.publish('foo.bar.baz', "Message Number #{cnt}")
  puts "send message #{cnt}"
end

# Stop/drain the connection, simulate a kill on the pod
# expect no more messages to be recieved
# and all jobs to finish
puts "going to drain"
nats.drain
sleep 1

# These jobs should not be processed, and aren't as expected since we drained
(4..6).each do |cnt|
  nats.publish('foo.bar.baz', "Message Number #{cnt}")
  puts "send message #{cnt}"
end

sleep 30

Note we never get:
*Done with Message Number 3
as it is killed during its sleep 5

nats: timeout on client.flush

We observed a timeout happening on flush with the following stacktrace:

nats: timeout
/var/vcap/data/packages/director/f0cf3673bd2c2187b33530f649f1d27a03f4187a/gem_home/ruby/2.6.0/gems/nats-pure-0.6.2/lib/nats/io/client.rb:842:in `with_nats_timeout'
/var/vcap/data/packages/director/f0cf3673bd2c2187b33530f649f1d27a03f4187a/gem_home/ruby/2.6.0/gems/nats-pure-0.6.2/lib/nats/io/client.rb:544:in `block in flush'
/var/vcap/data/packages/ruby-2.6.3-r0.17.0/95f13ead566b2d36b0efb94d0377994d7ff5cf3d/lib/ruby/2.6.0/monitor.rb:230:in `mon_synchronize'
/var/vcap/data/packages/director/f0cf3673bd2c2187b33530f649f1d27a03f4187a/gem_home/ruby/2.6.0/gems/nats-pure-0.6.2/lib/nats/io/client.rb:538:in `flush'

What could be the issue here, could a load on the system cause an issue to the ping-pong protocol when flush is used?

The problem is hard to reproduce, it happens just from time to time and we observed it was when BOSH director was busy updating a lot of deployments. We assume the BOSH VM was under stress during this time. Could the ping-pong thread be affected, or the pong misplaced, not recognised (synchronisation issue?) so that we run into this timeout.

What possible workarounds / fixes we may implement in place to mitigate such an issue.

JSON parse error on processing INFO if we have whitespace in NATS server version

We face a JSON parse error when we are using BOSH forked NATS 1.3.0 server from:
https://github.com/bosh-dep-forks/gnatsd

our build script
https://github.com/bosh-dep-forks/gnatsd/blob/d6d6c1935a05a913fa03f6d7bc63db8371b2588d/ci/tasks/build.sh#L19-L23

is setting the server version to: "1.3.0 <time_stamp> <git_rev>"
which breaks the INFO parsing in:

_, info_json = line.split(' ')
process_info(info_json)

because we have a whitespace in the JSON with the server version.
We solved it with a something like that:
videlov@651ee18

Would you please fix this for our custom NATS server version?

Thanks in advance.

Memory leak with request/reply

I'm seeing ever-increasing memory usage in my Rails app when using NATS request/reply. I was able to reproduce it with this code (the reply is generated in another process to keep this reproduction minimal and avoid ambiguity in where the leak resides):

require 'nats/io/client'

nats = NATS::IO::Client.new.tap(&:connect)
1_000_000.times { nats.request("my.subject", "") }

I ran it against Ruby 2.6.5, 2.7.2, and 3.0.0. All exhibit the memory leak.

It looks like the client's @resp_map ivar is not letting go of the responses after they've been returned or yielded to the request block.

reconnects not working

Last while I've observed client connections dropping and not reconnecting, I added some debugging:

      def ping_interval_loop
        loop do
          File.open("/tmp/nats.log", "a") {|f| f.puts("Sleeping %s seconds in ping_interval_loop" % @options[:ping_interval])}
          sleep @options[:ping_interval]
          if @pings_outstanding > @options[:max_outstanding_pings]
            # FIXME: Check if we have to dispatch callbacks.
            File.open("/tmp/nats.log", "a") {|f| f.puts("Closing connection due to outstanding pings")}
            close
          end
          @pings_outstanding += 1

          send_command(PING_REQUEST)
          @flush_queue << :ping
        end
      rescue => e
        File.open("/tmp/nats.log", "a") {|f| f.puts("Ping interval loop exiting: %s: %s: %s" % [$!.class, $!.to_s, caller[0]])}
        process_op_error(e)
      end

I let it run for a while and then I firewall it using DROP to simulate upstream connection going away etc - so no TCP resets are made, this closely models what happen when say my home fibre drops:

iptables -I INPUT 1 -p tcp -m tcp --sport 4222 -j DROP

In the log I have:

Sleeping 10 seconds in ping_interval_loop
Sleeping 10 seconds in ping_interval_loop
Sleeping 10 seconds in ping_interval_loop
Sleeping 10 seconds in ping_interval_loop 
# here I firewall it
Sleeping 10 seconds in ping_interval_loop
Sleeping 10 seconds in ping_interval_loop
Closing connection due to outstanding pings

So the logic works, missing pings are detected and the connection is closed, however, after that its dead and do not reconnect, strace on the ruby binary shows nothing but:

futex(0x2340654, FUTEX_WAIT_BITSET_PRIVATE, 7419, {1050227, 643453722}, ffffffff) = -1 ETIMEDOUT (Connection timed out)
futex(0x23406d0, FUTEX_WAKE_PRIVATE, 1) = 0
futex(0x2340654, FUTEX_WAIT_BITSET_PRIVATE, 7421, {1050227, 744178975}, ffffffff) = -1 ETIMEDOUT (Connection timed out)
futex(0x23406d0, FUTEX_WAKE_PRIVATE, 1) = 0
futex(0x2340654, FUTEX_WAIT_BITSET_PRIVATE, 7423, {1050227, 844864410}, ffffffff) = -1 ETIMEDOUT (Connection timed out)
futex(0x23406d0, FUTEX_WAKE_PRIVATE, 1) = 0
futex(0x2340654, FUTEX_WAIT_BITSET_PRIVATE, 7425, {1050227, 945528955}, ffffffff) = -1 ETIMEDOUT (Connection timed out)
futex(0x23406d0, FUTEX_WAKE_PRIVATE, 1) = 0
futex(0x2340654, FUTEX_WAIT_BITSET_PRIVATE, 7427, {1050228, 46186705}, ffffffff) = -1 ETIMEDOUT (Connection timed out)
futex(0x23406d0, FUTEX_WAKE_PRIVATE, 1) = 0
futex(0x2340654, FUTEX_WAIT_BITSET_PRIVATE, 7429, {1050228, 146958817}, ffffffff) = -1 ETIMEDOUT (Connection timed out)
futex(0x23406d0, FUTEX_WAKE_PRIVATE, 1) = 0
futex(0x2340654, FUTEX_WAIT_BITSET_PRIVATE, 7431, {1050228, 247299938}, ffffffff) = -1 ETIMEDOUT (Connection timed out)
futex(0x23406d0, FUTEX_WAKE_PRIVATE, 1) = 0
futex(0x2340654, FUTEX_WAIT_BITSET_PRIVATE, 7433, {1050228, 347603578}, ffffffff) = -1 ETIMEDOUT (Connection timed out)
futex(0x23406d0, FUTEX_WAKE_PRIVATE, 1) = 0
futex(0x2340654, FUTEX_WAIT_BITSET_PRIVATE, 7435, {1050228, 448375716}, ffffffff) = -1 ETIMEDOUT (Connection timed out)
futex(0x23406d0, FUTEX_WAKE_PRIVATE, 1) = 0
futex(0x2340654, FUTEX_WAIT_BITSET_PRIVATE, 7437, {1050228, 549081022}, ffffffff) = -1 ETIMEDOUT (Connection timed out)
futex(0x23406d0, FUTEX_WAKE_PRIVATE, 1) = 0
futex(0x2340654, FUTEX_WAIT_BITSET_PRIVATE, 7439, {1050228, 649710273}, ffffffff) = -1 ETIMEDOUT (Connection timed out)

Now that I can easily reproduce this I'm happy to help answer any questions you migth ahve

support for unsubscribe unclear

based on usage examples in README and examples/basic-usage.rb, it seems that unsubscribe it intended to be part of the nats-pure API. However, the unsubscribe method is private:

nats = NATS::IO::Client.new
nats.connect(:servers => ["nats://127.0.0.1:4222"])
sid = nats.subscribe('bar') { |msg| puts "Received : '#{msg}'" }
nats.unsubscribe(sid)

=> NoMethodError (private method `unsubscribe' called for #<NATS::Client:0x00007fa4b10dfce8 @mon_data=#<Monitor:0x00007fa4b10dfbd0>

Spec failure: `should be able to receive response to requests`

On Ruby 2.2.7 build:

  1) Client - Specification should be able to receive response to requests
     Failure/Error:
       expect do
         3.times do
           responses << nc.request("help", "please", timeout: 1)
         end
       end.to_not raise_error
     
       expected no Exception, got #<NATS::IO::Timeout: nats: timeout> with backtrace:
         # ./lib/nats/io/client.rb:646:in `with_nats_timeout'
         # ./lib/nats/io/client.rb:336:in `block in request'
         # ./lib/nats/io/client.rb:332:in `request'
         # ./spec/client_spec.rb:199:in `block (4 levels) in <top (required)>'
         # ./spec/client_spec.rb:198:in `times'
         # ./spec/client_spec.rb:198:in `block (3 levels) in <top (required)>'
         # ./spec/client_spec.rb:197:in `block (2 levels) in <top (required)>'
     # ./spec/client_spec.rb:197:in `block (2 levels) in <top (required)>'

/home/travis/.rvm/rubies/ruby-2.2.7/bin/ruby -I/home/travis/build/nats-io/pure-ruby-nats/vendor/bundle/ruby/2.2.0/gems/rspec-core-3.5.1/lib:/home/travis/build/nats-io/pure-ruby-nats/vendor/bundle/ruby/2.2.0/gems/rspec-support-3.5.0/lib /home/travis/build/nats-io/pure-ruby-nats/vendor/bundle/ruby/2.2.0/gems/rspec-core-3.5.1/exe/rspec spec/auth_spec.rb spec/client_cluster_reconnect_spec.rb spec/client_errors_spec.rb spec/client_reconnect_spec.rb spec/client_spec.rb spec/client_threadsafe_spec.rb spec/client_tls_spec.rb --format documentation --colour failed

Token not completely supported

Right now the only way to use token is by putting them into the URI string:

nats.connect("nats://secret@nats:4222")

But in case there are special characters, this will fail because of URI encoded issues. Ideally should be able to set token just with an option:

nats.connect("nats://secret@nats:4222", token: 'my/token')

Threads not being cleaned up when no reply.

Hi, thanks for the great gem.
In the scenario when my clients are down or not responding, the Threads that get created to listen to the request response seem to live forever.

This is my code:

nats.request('foo', 'test', max: 1, timeout: 1) do |response|
  puts 'Got reply'
end

It seems like the timeout never calls, and if I run this code many times without the clients running, my Thread.list count grows in a linear fashion. Once I bring my clients back online, obviously old messages aren't replied to and the Threads remain in the sleep_forever state.

Any ideas what the best way to manage this is?

Thanks

updating the server list

When there are long running NATS clients you might have the environment change underneath you, like say perhaps the client is fed with data from SRV records or some other non DNS based service discovery method like consul, etcd or whatever

The retry logic of this client is great because I can tell it to retry infinitely and it will deal with complexities of resubscribing on reconnect etc, so I like infinite retries

Problem is once I kick the client off with such an infinite retry setting I never have an option to update the server list it should use on subsequent retries.

Now I could adjust the Client#server_pool from my side but at present of course there is no concurrency mutexes or anything like that around that array so it would be a terrible idea.

The NATS server publishing feature is nice on paper but in reality I find it not that useful, it publishes private IPs that arent reachable from outside NAT etcetc so there's some issues still and anyway sysadmins like to manage their connection pools more explicitly I found

Do you have thoughts on how we might handle this situation? A way to update the Client#server_pool that's safe while having infinite retries set on the client gem would be great but open to other approaches you can think of.

LoadError when requiring nats/io/client

On version 2.0.0.pre.rc1, ruby version 2.7.4 via rbenv, and bundler version 2.1.4.

Trying require 'nats' and require 'nats/client' also result in the same issue, which makes sense given that the first thing both of them do is require 'nats/io/client'.

The direct cause is that the lib/nats/io/kv.rb file is missing from my local installation. I checked, and it does exist in both the tagged branch and in the zipped source. So, I'm not sure what's going on. I was able to patch the issue locally by manually copy-pasting the file from github, but that's not an option in production.

irb(main):001:0> require 'nats/io/client'
Traceback (most recent call last):
       16: from ... /.rbenv/versions/2.7.4/lib/ruby/2.7.0/bundler/vendor/thor/lib/thor/command.rb:27:in `run'
       15: from ... /.rbenv/versions/2.7.4/lib/ruby/2.7.0/bundler/cli.rb:476:in `exec'
       14: from ... /.rbenv/versions/2.7.4/lib/ruby/2.7.0/bundler/cli/exec.rb:28:in `run'
       13: from ... ./rbenv/versions/2.7.4/lib/ruby/2.7.0/bundler/cli/exec.rb:63:in `kernel_load'
       12: from ... /.rbenv/versions/2.7.4/lib/ruby/2.7.0/bundler/cli/exec.rb:63:in `load'
       11: from ... /.rbenv/versions/2.7.4/bin/irb:23:in `<top (required)>'
       10: from ... /.rbenv/versions/2.7.4/bin/irb:23:in `load'
        9: from ... /.rbenv/versions/2.7.4/lib/ruby/gems/2.7.0/gems/irb-1.2.6/exe/irb:11:in `<top (required)>'
        8: from (irb):1
        7: from (irb):1:in `require'
        6: from ... /.rbenv/versions/2.7.4/lib/ruby/gems/2.7.0/gems/nats-pure-2.0.0.pre.rc1/lib/nats/io/client.rb:18:in `<top (required)>'
        5: from ... /.rbenv/versions/2.7.4/lib/ruby/gems/2.7.0/gems/nats-pure-2.0.0.pre.rc1/lib/nats/io/client.rb:18:in `require_relative'
        4: from ... /.rbenv/versions/2.7.4/lib/ruby/gems/2.7.0/gems/nats-pure-2.0.0.pre.rc1/lib/nats/io/msg.rb:14:in `<top (required)>'
        3: from ... /.rbenv/versions/2.7.4/lib/ruby/gems/2.7.0/gems/nats-pure-2.0.0.pre.rc1/lib/nats/io/msg.rb:14:in `require_relative'
        2: from ... /.rbenv/versions/2.7.4/lib/ruby/gems/2.7.0/gems/nats-pure-2.0.0.pre.rc1/lib/nats/io/js.rb:17:in `<top (required)>'
        1: from ... /.rbenv/versions/2.7.4/lib/ruby/gems/2.7.0/gems/nats-pure-2.0.0.pre.rc1/lib/nats/io/js.rb:17:in `require_relative'
LoadError (cannot load such file -- ... /.rbenv/versions/2.7.4/lib/ruby/gems/2.7.0/gems/nats-pure-2.0.0.pre.rc1/lib/nats/io/kv)

Creating consumer does not work with ActiveSupport gem activated

I come to a pretty nasty problem leading to not being able to use nats-pure with ActiveSupport gem.

Here goes the problem:

        # nats/io/js.rb:344
        req = {
          stream_name: stream,
          config: config
        }

        result = api_request(req_subject, req.to_json, params)

It expects to call the method NATS::JetStream::API::ConsumerConfig#to_json defined in the same file.

But thanks (or no thanks...) to monkey patching from ActiveSupport, Hash#to_json is instead calling this method:

  def to_json(options = nil)
      if options.is_a?(::JSON::State)
        # Called from JSON.{generate,dump}, forward it to JSON gem's to_json
        super(options)
      else
        # to_json is being invoked directly, use ActiveSupport's encoder
        ActiveSupport::JSON.encode(self, options)
      end
    end

Which itself is calling this block:

  # in Hash object
  def as_json(options = nil) #:nodoc:
    if respond_to?(:to_hash)
      # struct respond_to to_hash
      to_hash.as_json(options)
    else
      instance_values.as_json(options)
    end
  end

This causes the to_json(*args) defined in the struct to be completely ignored and to never compact the configurations hash before sending to the API. This obviously doesn't play well with the nats server, causing this error:

invalid JSON (status_code=400, err_code=10025) (NATS::JetStream::Error::BadRequest)

The solution would be obviously not to use ActiveSupport, but since it's a pretty popular gem, I would recommend changing:

        req = {
          stream_name: stream,
          config: config
        }

        result = api_request(req_subject, req.to_json, params)

to:

        req = {
          stream_name: stream,
          config: config.to_h.compact
        }

        result = api_request(req_subject, req.to_json, params)

And remove the custom-defined to_json method in the structure, which is here only to compact anyway.

I could create a PR if you feel like nats-pure should work with active support core extensions.

Message is not published in multiprocess environment

Hello,
I have a problem with sending messages to my NATS server in my Rails application when I use Puma/Passenger in multiprocess mode (workers are spawned).

For each process is created a new connection. I use something like

require 'nats/io/client'
require 'concurrent'

class Client
  extend Forwardable

  def_delegators :@nats, :connected?, :connecting?, :publish, :flush, :subscribe, :close

  def self.current
    @current ||= Concurrent::Map.new
    @current[Process.pid] ||= new.connect
  end

  def self.connection_pool
    @current
  end

  def self.current=(connection)
    @current[Process.pid] = connection
  end

  def self.establish_connection
    current&.close
    current = new.connect
  end

  def connect
    @nats = NATS::IO::Client.new
    @nats.on_error      { |e| puts "NATS Error: #{e}" }
    @nats.on_reconnect  { puts "NATS reconnect" }
    @nats.on_disconnect { puts "NATS disconnect" }
    @nats.connect('localhost')
    @nats
  end
end

There is no issue when I use threaded mode in Puma. For publishing a message I call

Client.current.publish('subject', 'Hello World')

In above case, nothing is sent to NATS server (run with-DV)

nats: fetch timeout (NATS::IO::Timeout) when connecting JetStream

I've tried to test nats-pure with Jetstream on M1 ,
Using

  • Docker nats:latest image
  • nats-pure 2.0.0 on Ruby 3.1.1

And copy code block from example:

gem 'nats-pure', '=2.0.0'
require 'nats/client'

nc = NATS.connect
js = nc.jetstream

js.add_stream(name: "mystream", subjects: ["foo"])

js.publish("foo", "Hello JetStream!")

psub = js.pull_subscribe("foo", "bar")

loop do
  msgs = psub.fetch(5)
  msgs.each do |msg|
    msg.ack
  end 
end

I found timeout occur

/Users/testuser/.rvm/gems/ruby-3.1.1/gems/nats-pure-2.0.0/lib/nats/io/js.rb:629:in `block in fetch': nats: fetch timeout (NATS::IO::Timeout)
	from /Users/testuser/.rvm/rubies/ruby-3.1.1/lib/ruby/3.1.0/monitor.rb:202:in `synchronize'
	from /Users/testuser/.rvm/rubies/ruby-3.1.1/lib/ruby/3.1.0/monitor.rb:202:in `mon_synchronize'
	from /Users/testuser/.rvm/gems/ruby-3.1.1/gems/nats-pure-2.0.0/lib/nats/io/js.rb:620:in `fetch'
	from test.rb:14:in `block in <main>'
	from test.rb:13:in `loop'
	from test.rb:13:in `<main>

Nats Log with debug display like this:

service_1  | [1] 2022/03/24 07:40:18.592069 [DBG] Enabled JetStream for account "$G"
service_1  | [1] 2022/03/24 07:40:18.592071 [DBG]   Max Memory:      -1 B
service_1  | [1] 2022/03/24 07:40:18.592072 [DBG]   Max Storage:     -1 B
service_1  | [1] 2022/03/24 07:40:18.592397 [DBG] JetStream state for account "$G" recovered
service_1  | [1] 2022/03/24 07:40:18.592803 [INF] Listening for client connections on 0.0.0.0:4222
service_1  | [1] 2022/03/24 07:40:18.592817 [DBG] Get non local IPs for "0.0.0.0"
service_1  | [1] 2022/03/24 07:40:18.593015 [DBG]   ip=172.19.0.2
service_1  | [1] 2022/03/24 07:40:18.593024 [INF] Server is ready
service_1  | [1] 2022/03/24 07:40:45.777369 [DBG] 172.19.0.1:56152 - cid:4 - Client connection created
service_1  | [1] 2022/03/24 07:40:48.072274 [DBG] 172.19.0.1:56152 - cid:4 - "v2.0.0:ruby3.1.1" - Client Ping Timer
service_1  | [1] 2022/03/24 07:40:55.828695 [DBG] 172.19.0.1:56152 - cid:4 - "v2.0.0:ruby3.1.1" - Client connection closed: Client Closed
service_1  | [1] 2022/03/24 09:53:44.633139 [DBG] 172.19.0.1:56154 - cid:11 - Client connection created
service_1  | [1] 2022/03/24 09:53:46.700322 [DBG] 172.19.0.1:56154 - cid:11 - "v2.0.0:ruby3.1.1" - Client Ping Timer
service_1  | [1] 2022/03/24 09:53:54.672813 [DBG] 172.19.0.1:56154 - cid:11 - "v2.0.0:ruby3.1.1" - Client connection closed: Client Closed

How can I investigate this timeout ??

Add auth token support?

Passing a token in the server URL results in a CONNECT message to the server with user & pass not auth_token. I thought there might be an alternate interface auth_token doesn't show up anywhere in the client.rb code and the auth spec currently doesn't test for it?

My gnatsd server is logging an Authorization Error so I'm guessing a user with a null password isn't treated the same as auth_token.

Additional stats

hello,

I am facing a problem where reconnects are not happening at all - or perhaps the library does not detect the failure.

To get nearer to the bottom of it - and just because it would be useful - can we please track in the existing stats method:

  • Time last ping sent
  • Time last ping received
  • Time last pong sent
  • Time last pong received

Slow consumer detection

We're interested in slow consumer detection as described in this NATS doc page.

The doc page indicates that slow consumer check is not available in the ruby client. Is that correct? if so, what are the other options to detect slow consumer?

Jetstream pull subscription fetch returning empty msg

I'm seeing the following:

Steps:

  1. create a stream and consumer
  2. create a pull subscription for the consumer
    (No messages exist in stream.)
  3. perform fetch with the pull subscription
    -> timeout exception is raise (because no msgs are in stream (expected))
  4. perform another fetch
    -> returns a single empty msg (not expected)

This pattern repeats indefinitely (fetch => timeout, fetch => empty NATS::Msg, ....)

sub = nats_client.jetstream.pull_subscribe(subcription_subject, consumer_name)

sub.fetch
*** NATS::IO::Timeout Exception: nats: fetch timeout

sub.fetch
=> [#<NATS::Msg(subject: "_INBOX.NFp40anEUTX0yZqmnIdIyB", reply: "", data: "", header={"Nats-Pending-Messages"=>"1", "Nats-Pending-Bytes"=>"0"})>]

More Context:
Stream Info:

Configuration:

             Subjects: STREAM.SUBJECT
     Acknowledgements: true
            Retention: File - Limits
             Replicas: 1
       Discard Policy: Old
     Duplicate Window: 2m0s
    Allows Msg Delete: true
         Allows Purge: true
       Allows Rollups: false
     Maximum Messages: unlimited
        Maximum Bytes: unlimited
          Maximum Age: unlimited
 Maximum Message Size: unlimited
    Maximum Consumers: unlimited


State:

             Messages: 0
                Bytes: 0 B
             FirstSeq: 0
              LastSeq: 0
     Active Consumers: 1

Consumer Info:

Configuration:

        Durable Name: CONSUMER_NAME
           Pull Mode: true
      Deliver Policy: All
          Ack Policy: Explicit
            Ack Wait: 30s
       Replay Policy: Instant
     Max Ack Pending: 1,000
   Max Waiting Pulls: 512

State:

   Last Delivered Message: Consumer sequence: 0 Stream sequence: 0
     Acknowledgment floor: Consumer sequence: 0 Stream sequence: 0
         Outstanding Acks: 0 out of maximum 1,000
     Redelivered Messages: 0
     Unprocessed Messages: 0
            Waiting Pulls: 0 of maximum 512

Fallback to IPv4 when IPv6 connexion fails is broken

gem version: nats-pure-0.2.4

Context

A dual-stacked machine I manage lost it's IPv6 connectivity and the nats client running on it was unable to recover by connecting to the dual-stacked natsd server over IPv4.

The client still has an IPv6 address, IPv6 routes and all (the network that connects it to the internet over IPv6 is broken, but IPv6 itself is working correctly for local stuff), so getaddrinfo returns both an IPv6 and an IPv4 address for the remote server.

Problem description

When nats-pure detects that the connection over IPv6 fails (first address), it does not try to connect using IPv4 (second address) address, but reties to connect using IPv6 over and over.

Expected behavior

When a connection fails or gets broken, nats-pure should attempt to connect using all remaining IP addresses before looping and retrying to connect using the first one.

Work-around

While the public network is broken, I could successfully connect by changing AF_UNSPEC to PF_INET in
https://github.com/nats-io/pure-ruby-nats/blob/master/lib/nats/io/client.rb#L1044.

require 'nats/client' not working

hi guys! when you add nats-pure to the gem file, by default it adds 0.7.2 version.
When you need to require 'nats/client' (as readme shows) it says that it cannot find the file. Instead you can import the code with require 'nats/io/client'.

multi home network handling not falling back to v4

Great work on this client very happy with how this is going.

The old EM client had some ipv6 issues I didn't mention since they were really EM issues, but since you now 'own' the whole space it's worth mentioning :)

Given a setup where I connect to a host by name and the name resolves to a v4 and a v6 address. It tries the v6 network first thats fine.

But should some node be single homed or having some networking issue - it will fail here. https://github.com/nats-io/pure-ruby-nats/blob/60a958b896a219cf0e5dbf9cd7132d4b1959e50a/lib/nats/io/client.rb#L1064:

#<Errno::ENETUNREACH: Network is unreachable - connect(2) for [x:x::x:x:x:5cac]:4222>

This is particularly bad on v4 only hosts where the name I am connecting to has a v6 and a v4 name. Since on those nodes it will only ever try the v6 address and fail forever never trying the v4s where it would work.

Here you'd need to somehow handle that and try the v4 address instead :(

And to be correct you should try the v6 address and when you cant fail there try the v4 address on the exact same hostname, if that fails, move to the new server and do the same again. This way should there be a connectivity issue on say v6 stack but the v4 stack still works, it'll be able to connect rather than bail on the first error.

Further I also noticed that this particular failure happens early enough that it breaks the retry sleep behaviour...the failure is here https://github.com/nats-io/pure-ruby-nats/blob/60a958b896a219cf0e5dbf9cd7132d4b1959e50a/lib/nats/io/client.rb#L181 notice that :was_connected is still not set here this means should_delay_connect? will be false and https://github.com/nats-io/pure-ruby-nats/blob/60a958b896a219cf0e5dbf9cd7132d4b1959e50a/lib/nats/io/client.rb#L564 never sleeps, so in early network errors like this you get a tight never ending never sleeping loop

Further none of the on_* callbacks are being fired anywhere along this failure path meaning this failure is completely opaque to the outside world https://github.com/nats-io/pure-ruby-nats/blob/60a958b896a219cf0e5dbf9cd7132d4b1959e50a/lib/nats/io/client.rb#L199 should probably get some on_error there?

Let me know if you'd rather I pull these into separate issues.

callback logging

The library has a lot of logic in it but its all fairly opaque and it makes debugging issues quite hard, is it actively trying to send pings/processing pongs etc, did the thread perhaps die, is reconnects happening etc

2 ways to solve this is to accept a instance of Logger or something that quacks like Logger or to add a logging callback.

I like the callback approach since it lets me format messages how I like them, the stomp gem implements something like this see https://github.com/stompgem/stomp/blob/dev/examples/lflogger.rb this worked well for me in the past.

Of course you already have a few callbacks so perhaps the answer is just to add many more? Not sure but more visibility would be great

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.