Giter Club home page Giter Club logo

nats.go's Introduction

NATS - Go Client

A Go client for the NATS messaging system.

License Apache 2 Go Report Card Build Status GoDoc Coverage Status

Check out NATS by example - An evolving collection of runnable, cross-client reference examples for NATS.

Installation

# Go client
go get github.com/nats-io/nats.go/

# Server
go get github.com/nats-io/nats-server

When using or transitioning to Go modules support:

# Go client latest or explicit version
go get github.com/nats-io/nats.go/@latest
go get github.com/nats-io/nats.go/@v1.36.0

# For latest NATS Server, add /v2 at the end
go get github.com/nats-io/nats-server/v2

# NATS Server v1 is installed otherwise
# go get github.com/nats-io/nats-server

Basic Usage

import "github.com/nats-io/nats.go"

// Connect to a server
nc, _ := nats.Connect(nats.DefaultURL)

// Simple Publisher
nc.Publish("foo", []byte("Hello World"))

// Simple Async Subscriber
nc.Subscribe("foo", func(m *nats.Msg) {
    fmt.Printf("Received a message: %s\n", string(m.Data))
})

// Responding to a request message
nc.Subscribe("request", func(m *nats.Msg) {
    m.Respond([]byte("answer is 42"))
})

// Simple Sync Subscriber
sub, err := nc.SubscribeSync("foo")
m, err := sub.NextMsg(timeout)

// Channel Subscriber
ch := make(chan *nats.Msg, 64)
sub, err := nc.ChanSubscribe("foo", ch)
msg := <- ch

// Unsubscribe
sub.Unsubscribe()

// Drain
sub.Drain()

// Requests
msg, err := nc.Request("help", []byte("help me"), 10*time.Millisecond)

// Replies
nc.Subscribe("help", func(m *nats.Msg) {
    nc.Publish(m.Reply, []byte("I can help!"))
})

// Drain connection (Preferred for responders)
// Close() not needed if this is called.
nc.Drain()

// Close connection
nc.Close()

JetStream

JetStream is the built-in NATS persistence system. nats.go provides a built-in API enabling both managing JetStream assets as well as publishing/consuming persistent messages.

Basic usage

// connect to nats server
nc, _ := nats.Connect(nats.DefaultURL)

// create jetstream context from nats connection
js, _ := jetstream.New(nc)

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

// get existing stream handle
stream, _ := js.Stream(ctx, "foo")

// retrieve consumer handle from a stream
cons, _ := stream.Consumer(ctx, "cons")

// consume messages from the consumer in callback
cc, _ := cons.Consume(func(msg jetstream.Msg) {
    fmt.Println("Received jetstream message: ", string(msg.Data()))
    msg.Ack()
})
defer cc.Stop()

To find more information on nats.go JetStream API, visit jetstream/README.md

The current JetStream API replaces the legacy JetStream API

Service API

The service API (micro) allows you to easily build NATS services The services API is currently in beta release.

Encoded Connections

nc, _ := nats.Connect(nats.DefaultURL)
c, _ := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
defer c.Close()

// Simple Publisher
c.Publish("foo", "Hello World")

// Simple Async Subscriber
c.Subscribe("foo", func(s string) {
    fmt.Printf("Received a message: %s\n", s)
})

// EncodedConn can Publish any raw Go type using the registered Encoder
type person struct {
     Name     string
     Address  string
     Age      int
}

// Go type Subscriber
c.Subscribe("hello", func(p *person) {
    fmt.Printf("Received a person: %+v\n", p)
})

me := &person{Name: "derek", Age: 22, Address: "140 New Montgomery Street, San Francisco, CA"}

// Go type Publisher
c.Publish("hello", me)

// Unsubscribe
sub, err := c.Subscribe("foo", nil)
// ...
sub.Unsubscribe()

// Requests
var response string
err = c.Request("help", "help me", &response, 10*time.Millisecond)
if err != nil {
    fmt.Printf("Request failed: %v\n", err)
}

// Replying
c.Subscribe("help", func(subj, reply string, msg string) {
    c.Publish(reply, "I can help!")
})

// Close connection
c.Close();

New Authentication (Nkeys and User Credentials)

This requires server with version >= 2.0.0

NATS servers have a new security and authentication mechanism to authenticate with user credentials and Nkeys. The simplest form is to use the helper method UserCredentials(credsFilepath).

nc, err := nats.Connect(url, nats.UserCredentials("user.creds"))

The helper methods creates two callback handlers to present the user JWT and sign the nonce challenge from the server. The core client library never has direct access to your private key and simply performs the callback for signing the server challenge. The helper will load and wipe and erase memory it uses for each connect or reconnect.

The helper also can take two entries, one for the JWT and one for the NKey seed file.

nc, err := nats.Connect(url, nats.UserCredentials("user.jwt", "user.nk"))

You can also set the callback handlers directly and manage challenge signing directly.

nc, err := nats.Connect(url, nats.UserJWT(jwtCB, sigCB))

Bare Nkeys are also supported. The nkey seed should be in a read only file, e.g. seed.txt

> cat seed.txt
# This is my seed nkey!
SUAGMJH5XLGZKQQWAWKRZJIGMOU4HPFUYLXJMXOO5NLFEO2OOQJ5LPRDPM

This is a helper function which will load and decode and do the proper signing for the server nonce. It will clear memory in between invocations. You can choose to use the low level option and provide the public key and a signature callback on your own.

opt, err := nats.NkeyOptionFromSeed("seed.txt")
nc, err := nats.Connect(serverUrl, opt)

// Direct
nc, err := nats.Connect(serverUrl, nats.Nkey(pubNkey, sigCB))

TLS

// tls as a scheme will enable secure connections by default. This will also verify the server name.
nc, err := nats.Connect("tls://nats.demo.io:4443")

// If you are using a self-signed certificate, you need to have a tls.Config with RootCAs setup.
// We provide a helper method to make this case easier.
nc, err = nats.Connect("tls://localhost:4443", nats.RootCAs("./configs/certs/ca.pem"))

// If the server requires client certificate, there is an helper function for that too:
cert := nats.ClientCert("./configs/certs/client-cert.pem", "./configs/certs/client-key.pem")
nc, err = nats.Connect("tls://localhost:4443", cert)

// You can also supply a complete tls.Config

certFile := "./configs/certs/client-cert.pem"
keyFile := "./configs/certs/client-key.pem"
cert, err := tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
    t.Fatalf("error parsing X509 certificate/key pair: %v", err)
}

config := &tls.Config{
    ServerName: 	opts.Host,
    Certificates: 	[]tls.Certificate{cert},
    RootCAs:    	pool,
    MinVersion: 	tls.VersionTLS12,
}

nc, err = nats.Connect("nats://localhost:4443", nats.Secure(config))
if err != nil {
	t.Fatalf("Got an error on Connect with Secure Options: %+v\n", err)
}

Using Go Channels (netchan)

nc, _ := nats.Connect(nats.DefaultURL)
ec, _ := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
defer ec.Close()

type person struct {
     Name     string
     Address  string
     Age      int
}

recvCh := make(chan *person)
ec.BindRecvChan("hello", recvCh)

sendCh := make(chan *person)
ec.BindSendChan("hello", sendCh)

me := &person{Name: "derek", Age: 22, Address: "140 New Montgomery Street"}

// Send via Go channels
sendCh <- me

// Receive via Go channels
who := <- recvCh

Wildcard Subscriptions

// "*" matches any token, at any level of the subject.
nc.Subscribe("foo.*.baz", func(m *Msg) {
    fmt.Printf("Msg received on [%s] : %s\n", m.Subject, string(m.Data));
})

nc.Subscribe("foo.bar.*", func(m *Msg) {
    fmt.Printf("Msg received on [%s] : %s\n", m.Subject, string(m.Data));
})

// ">" matches any length of the tail of a subject, and can only be the last token
// E.g. 'foo.>' will match 'foo.bar', 'foo.bar.baz', 'foo.foo.bar.bax.22'
nc.Subscribe("foo.>", func(m *Msg) {
    fmt.Printf("Msg received on [%s] : %s\n", m.Subject, string(m.Data));
})

// Matches all of the above
nc.Publish("foo.bar.baz", []byte("Hello World"))

Queue Groups

// All subscriptions with the same queue name will form a queue group.
// Each message will be delivered to only one subscriber per queue group,
// using queuing semantics. You can have as many queue groups as you wish.
// Normal subscribers will continue to work as expected.

nc.QueueSubscribe("foo", "job_workers", func(_ *Msg) {
  received += 1;
})

Advanced Usage

// Normally, the library will return an error when trying to connect and
// there is no server running. The RetryOnFailedConnect option will set
// the connection in reconnecting state if it failed to connect right away.
nc, err := nats.Connect(nats.DefaultURL,
    nats.RetryOnFailedConnect(true),
    nats.MaxReconnects(10),
    nats.ReconnectWait(time.Second),
    nats.ReconnectHandler(func(_ *nats.Conn) {
        // Note that this will be invoked for the first asynchronous connect.
    }))
if err != nil {
    // Should not return an error even if it can't connect, but you still
    // need to check in case there are some configuration errors.
}

// Flush connection to server, returns when all messages have been processed.
nc.Flush()
fmt.Println("All clear!")

// FlushTimeout specifies a timeout value as well.
err := nc.FlushTimeout(1*time.Second)
if err != nil {
    fmt.Println("All clear!")
} else {
    fmt.Println("Flushed timed out!")
}

// Auto-unsubscribe after MAX_WANTED messages received
const MAX_WANTED = 10
sub, err := nc.Subscribe("foo")
sub.AutoUnsubscribe(MAX_WANTED)

// Multiple connections
nc1 := nats.Connect("nats://host1:4222")
nc2 := nats.Connect("nats://host2:4222")

nc1.Subscribe("foo", func(m *Msg) {
    fmt.Printf("Received a message: %s\n", string(m.Data))
})

nc2.Publish("foo", []byte("Hello World!"));

Clustered Usage

var servers = "nats://localhost:1222, nats://localhost:1223, nats://localhost:1224"

nc, err := nats.Connect(servers)

// Optionally set ReconnectWait and MaxReconnect attempts.
// This example means 10 seconds total per backend.
nc, err = nats.Connect(servers, nats.MaxReconnects(5), nats.ReconnectWait(2 * time.Second))

// You can also add some jitter for the reconnection.
// This call will add up to 500 milliseconds for non TLS connections and 2 seconds for TLS connections.
// If not specified, the library defaults to 100 milliseconds and 1 second, respectively.
nc, err = nats.Connect(servers, nats.ReconnectJitter(500*time.Millisecond, 2*time.Second))

// You can also specify a custom reconnect delay handler. If set, the library will invoke it when it has tried
// all URLs in its list. The value returned will be used as the total sleep time, so add your own jitter.
// The library will pass the number of times it went through the whole list.
nc, err = nats.Connect(servers, nats.CustomReconnectDelay(func(attempts int) time.Duration {
    return someBackoffFunction(attempts)
}))

// Optionally disable randomization of the server pool
nc, err = nats.Connect(servers, nats.DontRandomize())

// Setup callbacks to be notified on disconnects, reconnects and connection closed.
nc, err = nats.Connect(servers,
	nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
		fmt.Printf("Got disconnected! Reason: %q\n", err)
	}),
	nats.ReconnectHandler(func(nc *nats.Conn) {
		fmt.Printf("Got reconnected to %v!\n", nc.ConnectedUrl())
	}),
	nats.ClosedHandler(func(nc *nats.Conn) {
		fmt.Printf("Connection closed. Reason: %q\n", nc.LastError())
	})
)

// When connecting to a mesh of servers with auto-discovery capabilities,
// you may need to provide a username/password or token in order to connect
// to any server in that mesh when authentication is required.
// Instead of providing the credentials in the initial URL, you will use
// new option setters:
nc, err = nats.Connect("nats://localhost:4222", nats.UserInfo("foo", "bar"))

// For token based authentication:
nc, err = nats.Connect("nats://localhost:4222", nats.Token("S3cretT0ken"))

// You can even pass the two at the same time in case one of the server
// in the mesh requires token instead of user name and password.
nc, err = nats.Connect("nats://localhost:4222",
    nats.UserInfo("foo", "bar"),
    nats.Token("S3cretT0ken"))

// Note that if credentials are specified in the initial URLs, they take
// precedence on the credentials specified through the options.
// For instance, in the connect call below, the client library will use
// the user "my" and password "pwd" to connect to localhost:4222, however,
// it will use username "foo" and password "bar" when (re)connecting to
// a different server URL that it got as part of the auto-discovery.
nc, err = nats.Connect("nats://my:pwd@localhost:4222", nats.UserInfo("foo", "bar"))

Context support (+Go 1.7)

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

nc, err := nats.Connect(nats.DefaultURL)

// Request with context
msg, err := nc.RequestWithContext(ctx, "foo", []byte("bar"))

// Synchronous subscriber with context
sub, err := nc.SubscribeSync("foo")
msg, err := sub.NextMsgWithContext(ctx)

// Encoded Request with context
c, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
type request struct {
	Message string `json:"message"`
}
type response struct {
	Code int `json:"code"`
}
req := &request{Message: "Hello"}
resp := &response{}
err := c.RequestWithContext(ctx, "foo", req, resp)

Backwards compatibility

In the development of nats.go, we are committed to maintaining backward compatibility and ensuring a stable and reliable experience for all users. In general, we follow the standard go compatibility guidelines. However, it's important to clarify our stance on certain types of changes:

  • Expanding structures: Adding new fields to structs is not considered a breaking change.

  • Adding methods to exported interfaces: Extending public interfaces with new methods is also not viewed as a breaking change within the context of this project. It is important to note that no unexported methods will be added to interfaces allowing users to implement them.

Additionally, this library always supports at least 2 latest minor Go versions. For example, if the latest Go version is 1.22, the library will support Go 1.21 and 1.22.

License

Unless otherwise noted, the NATS source files are distributed under the Apache Version 2.0 license found in the LICENSE file.

FOSSA Status

nats.go's People

Contributors

0xflotus avatar aricart avatar bruth avatar codegangsta avatar colinsullivan1 avatar derekcollison avatar gcolliso avatar ido-gold-apolicy avatar jarema avatar jduhamel avatar jnmoyne avatar josephwoodward avatar kozlovic avatar krobertson avatar matthewdevenny avatar matthiashanel avatar mkorolyov avatar msoap avatar neilalexander avatar nsurfer avatar nussjustin avatar piotrpio avatar pires avatar ripienaar avatar scottf avatar tbeets avatar teh-cmc avatar tylertreat avatar variadico avatar wallyqs avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

nats.go's Issues

Will not reconnect on "connection reset by peer" errors

We've got a error msg like

write tcp x.y.z.w:4222: connection reset by peer
nats: Connection Closed

and never get reconnectCallBack called.

The related traceback is:

github.com/apcera/nats.(_Conn).close(0xc208084000, 0x2, 0xc20802b001)
/gopath/src/github.com/apcera/nats/nats.go:1620 +0x34e
github.com/apcera/nats.(_Conn).Close(0xc208084000)
/gopath/src/github.com/apcera/nats/nats.go:1630 +0x36
github.com/apcera/nats.(_Conn).processErr(0xc208084000, 0xc208087d20, 0x12)
/gopath/src/github.com/apcera/nats/nats.go:1129 +0x1dd
github.com/apcera/nats.(_Conn).parse(0xc208084000, 0xc2080ca000, 0x19, 0x8000, 0x0, 0x0)
/gopath/src/github.com/apcera/nats/parser.go:222 +0xb9a
github.com/apcera/nats.(_Conn).readLoop(0xc208084000)
/gopath/src/github.com/apcera/nats/nats.go:942 +0x371
created by github.com/apcera/nats.(_Conn).spinUpSocketWatchers
/gopath/src/github.com/apcera/nats/nats.go:461 +0x6a

It would be good if processErr() also checks if there's any "connection reset by peer" msg?

check if key exists

Can I check if a data exists in the system?

example: "func (nc *Conn) Request(subj string, data []byte, timeout time.Duration) (m *Msg, err error)"
Request("subject", string("data"),...) check if "data" exists.

No token based auth support

The gnatsd server supports token based auth via a token_auth json field, however the client does not support this. PR incoming...

Reconnect has potential race conditions

When a Reconnect ocurrs, relying on state works for the one routine managing the state, but there are other active goroutines which may not see the state changes and will continue to operate under the wrong conditions.

doReconnect() calls createConn() which will establish a new connection (nc.conn) and buffered writer (nc.bw). readLoop could be line 868 waiting for the lock and grab the new connection rather than always using the old connection. It would make more sense to move line 873 out of the for loop and never obtain the conn again.
The flusher has a similar issue.

panic: runtime error: invalid memory address or nil pointer dereference
[signal 0xb code=0x1 addr=0x0 pc=0x663cf]

goroutine 118 [running]:
runtime.panic(0x3adb80, 0x5fb184)
/usr/local/Cellar/go/1.3.3/libexec/src/pkg/runtime/panic.c:279 +0xf5
github.com/apcera/nats.(_Conn).parse(0xc208072400, 0xc2080a4000, 0x6, 0x8000, 0x0, 0x0)
/Users/fraenkel/workspace/diego-release/src/github.com/apcera/nats/parser.go:70 +0x12f
github.com/apcera/nats.(_Conn).readLoop(0xc208072400)
/Users/fraenkel/workspace/diego-release/src/github.com/apcera/nats/nats.go:905 +0x454
created by github.com/apcera/nats.(*Conn).spinUpSocketWatchers
/Users/fraenkel/workspace/diego-release/src/github.com/apcera/nats/nats.go:456 +0x157

Subject of empty string will cause the nats connection closed

Hello,
I found that if I give an empty string to the subject parameter of "Publish","Request","Subscribe" function, the function will return no error.
However, operations like these will cause an ERR in gnatsd, and gnatsd will close the connection. After that, calling any function of the connection will return an error of "nats: Connection Closed", and the client won't do any reconnection attempt.
The test code following will cause this condition.

// testNats project main.go
package main

import (
    "fmt"
    "time"

    "github.com/nats-io/nats"
)

func main() {
    nc, err := nats.Connect(nats.DefaultURL)
    if err != nil {
        panic(err)
    }
    defer nc.Close()
    //subscribe 'foo'
    handle := func(m *nats.Msg) {
        fmt.Println(string(m.Data))
    }
    if _, err := nc.Subscribe("foo", handle); err != nil {
        fmt.Println(err)
    }

    //these are invalid operation
    if _, err := nc.Subscribe("", handle); err != nil {
        fmt.Println(err)
    }
    if err := nc.Publish("", nil); err != nil {
        fmt.Println(err)
    }

    //wait for client to send invalid operation to gnatsd
    time.Sleep(time.Second)

    //publish something
    if err := nc.Publish("foo", []byte("bar")); err != nil {
        fmt.Println(err)
    }
    time.Sleep(time.Second)
}

Result:

C:/myGO/src/testNats/testNats.exe  [C:/myGO/src/testNats]
nats: Connection Closed
Success: process exited with code 0.

I think that the private function "subscribe" and "publish" should return an error if we give an empty string subject, but I found no checkout for the subject parameter in these function.

The dangerous condition may easily happened in this way:

// Requests
msg, err := nc.Request("help", []byte("help me"), 10*time.Millisecond)

// Replies
nc.Subscribe("help", func(m *Msg) {
    nc.Publish(m.Reply, []byte("I can help!"))
})

//invalid Request
//nc.Publish("help", []byte("help me"))

If other Programs use 'Publish' instead of 'Request' to publish something to the subject 'help', then m.Reply == "" .

Sorry for my poor English. ^_^

Possible problem with flusher() not exiting

I saw an issue where the flusher() seem not to exit. A test case running with "-race" also often shows this:

Ivans-MacBook-Pro:nats ivan$ go test -race -v -run AuthServers

=== RUN TestAuthServers

WARNING: DATA RACE
Write by goroutine 6:
github.com/nats-io/nats.(_Conn).processConnectInit()
/Users/ivan/dev/go/src/github.com/nats-io/nats/nats.go:517 +0x1ad
github.com/nats-io/nats.(_Conn).connect()
/Users/ivan/dev/go/src/github.com/nats-io/nats/nats.go:546 +0x23e
github.com/nats-io/nats.Options.Connect()
/Users/ivan/dev/go/src/github.com/nats-io/nats/nats.go:263 +0x1e5
github.com/nats-io/nats.TestAuthServers()
/Users/ivan/dev/go/src/github.com/nats-io/nats/cluster_test.go:113 +0x265
testing.tRunner()
/usr/local/Cellar/go/1.5/libexec/src/testing/testing.go:456 +0xdc

Previous read by goroutine 11:
github.com/nats-io/nats.(*Conn).flusher()
/Users/ivan/dev/go/src/github.com/nats-io/nats/nats.go:1066 +0x7a

Goroutine 6 (running) created at:
testing.RunTests()
/usr/local/Cellar/go/1.5/libexec/src/testing/testing.go:561 +0xaa3
testing.(*M).Run()
/usr/local/Cellar/go/1.5/libexec/src/testing/testing.go:494 +0xe4
main.main()
github.com/nats-io/nats/_test/_testmain.go:268 +0x20f

Goroutine 11 (running) created at:
github.com/nats-io/nats.(*Conn).spinUpSocketWatchers()

/Users/ivan/dev/go/src/github.com/nats-io/nats/nats.go:466 +0xa0

--- FAIL: TestAuthServers (2.02s)
cluster_test.go:120: Wrong error, wanted Auth failure, got 'nats: Timeout'
FAIL
exit status 1
FAIL github.com/nats-io/nats 2.030s

Unsubscribe does not allow consumption of already fetched messages

Perhaps there is a good reason for this, but it appears that Unsubscribe sets it's message channel to nil which is potentially a destructive action.

Unless there is another way to consume messages that have already been fetched by the client. I would expect the channel to be closed to prevent future writes but to remain available for reads.

s.unsubscribe()
...consume remaining messages in channel

I am working on a PR that addresses this. I am curious though if this was intended behavior and if I am misusing it.

Should we have a CONNECTING status?

When attempting to connect, the CONNECTED status is set early on. If for any reason during the connect and error is triggered (by processOpErr()), this may trigger a reconnect attempt, while we were in the middle of a connect. Maybe if we were using a CONNECTING status, we could use this in processOpErr() to simply return and not try to reconnect, since an error during connect() would correctly return the wrong status (and move on to the next on the server pool).

Better capture pending writes when getting disconnected

Imagine doing nc.Publish() in a loop. The server is stopped/restarted and the library tries to reconnect. The pending buffer is created only in doReconnect(). That leaves a window where nc.bw.write() calls may accumulate to the point of causing a flush (socket write) which will likely result in an error returned from the nc,Publish() call.
Also, the use of a buffered writer may cause situations where the buffer is flushed and some left over is then appended to the now empty buffer. If a reconnect occurs at the "right" time, this data will then be sent to the server, which may cause a parse error (if what was in the buffer was a partial), which would result in another reconnect.

Subscribe("foo", nil) inconsistency between Conn and EncodedConn

EncodedConn.Subscribe("foo", nil) throws a nil pointer exception instead of deleting the subscription as documented in the README.

Conn.Subscribe("foo", nil) returns normally but does not delete the subscription.

Since there is a Subscription.Unsubscribe() then this 'nil' behavior should be avoided and either return an error in the error field or throw an exception. Looking into the Subscribe code there is some code that looks for a nil callback and some that throws an exception when reflecting.

All async subscribers share Go routine

All async subscribers share a single Go routine for callbacks, making it fairly easy to starve the msg delivery if you are inside a callback waiting for another callback. They should probably be bound to a Subscription.

make nats.Conn as an interface

[ actually it is not an issue, more like a new feature ]

The current design is very clean, I can easy trace it.
However, I have such requirement that I want to use my own nats.Conn in the EncodedConn.
But the current implementation forbids me to do this,
In specifically, nats.Conn is implemented by a well-defined structure.

// A Conn represents a bare connection to a nats-server. It will send and receive
// []byte payloads.
type Conn struct {
...
}

and an EncodedConn is based on that Conn and provides an encoding support like JSON.

// EncodedConn are the preferred way to interface with NATS. They wrap a bare connection to
// a nats server and have an extendable encoder system that will encode and decode messages
// from raw Go types.
type EncodedConn struct {
Conn *Conn
Enc Encoder
}

Since EncodedConn use directly the type of nats.Conn, I cannot do such wrap.

Therefore, my idea is I will make such change and send a PR to you if you have such requirement as well.
Any suggestions?

Rename to nats-client-go

Just a thought: if the repo only has a client in it, perhaps include that in the project name. If it is a go client, perhaps include that in the project name.

Or are you also rewriting the nats server in Go? Going to make it distributed?

Payload size

The gnatsd enforces a maximum payload size. In looking at the gnatsd debugging output, I can see that a message I am publishing exceeds this limit. But the call to PublishMsg I'm making does not return an error.

Am I correct that nats doesn't detect when the server rejects a message (at least based on side)? If so, is there some reason that the client doesn't/can't check for this?

ReadLine() does not guarantee that the full line is read, which may cause parser to error out

When receiving the INFO protocol message, the call to ReadLine() assumes that the line will be received entirely, or the "pre" boolean will be true should the buffer be not big enough.
Actually, ReadLine() may return with a partial read, which if it contains at least "INFO" would make processExpectedInfo() to return ok, but then would make the readLoop fail on processing the remaining of the protocol.
This can be demonstrated by having gnatsd's sendInfo() send part of the buffer, pause for a second and then send the rest of the buffer.

Expose ability to reconnect

I'd like to be able to get the client to reconnect to the server. Our use case is to rebalance connections to a cluster that's behind HAProxy.

Reconnect logic flawed when servers in the list require authentication

As I was trying to change the C client to do connect asynchronously, it occurred to me that there is a problem due to the asynchronous nature during the reconnect process in the GO client.
If the client is connected to a server, and one in the list requires authentication while the client does not have it (or does not match), then when the client is disconnected and try to connect to that server, as long as the TCP connection succeeds and the expected INFO is ok, it will assume that it is connected. In reality, when the readLoop is started, it will get a -ERR with Authorization Violation, which will cause the connection to be closed. If we were doing the (re)connect process "synchronous", then we would better handle this situation.

TestAuthFailToReconnect.txt

Need to seed random to create unique INBOXes

Go by default seeds random as Seed(1). Need to seed it using unique seed to use rand() when creating INBOXes.

Additionally, it may be a good idea to embed IP and PID into INBOX to ensure uniqueness and not rely on randoms only.

Callbacks are triggered when satisfied but not guaranteed to still be true

Both the reconnected and disconnected callbacks are invoked outside the Connection lock. By doing so, the state of the connection may be different when actually invoked. In most cases, the connection has been closed which leads to no harm. However, in the case of the reconnect, another reconnect can occur since the socketwatchers have been spun up already.

Panic on 386 systems

2015/09/14 15:19:26 http: panic serving 127.0.0.1:34640: runtime error: invalid memory address or nil pointer dereference
goroutine 19 [running]:
net/http.(_conn).serve.func1(0x18902060, 0xf76aa860, 0x188fc040)
/opt/go/src/net/http/server.go:1287 +0xa2
sync/atomic.AddUint64(0x1887b964, 0x1, 0x0, 0x188fe240, 0x4)
/opt/go/src/sync/atomic/asm_386.s:112 +0xc
github.com/nats-io/nats.(_Conn).subscribe(0x1887b880, 0x1892c0f0, 0x21, 0x0, 0x0, 0x0, 0x4, 0x0, 0x0, 0x0)
/home/philipp/projects/go/src/github.com/nats-io/nats/nats.go:1298 +0x23d
github.com/nats-io/nats.(*Conn).Request(0x1887b880, 0x188ce620, 0x19, 0x188f6320, 0x12, 0x20, 0x540be400, 0x2, 0x0, 0x0, ...)
/home/philipp/projects/go/src/github.com/nats-io/nats/nats.go:1254 +0x6f

It's a known issue on 386 system that handle 64bit numbers: golang/go#6404

TestRecvChanLeakGoRoutines racey with checking goroutines

Periodic failure seems to happen with 1 goroutine going away that it didn't expect.

=== RUN TestRecvChanLeakGoRoutines
--- FAIL: TestRecvChanLeakGoRoutines (0.11 seconds)
    netchan_test.go:226: Leaked Go routine(s) : -1, closing channel should have closed them

the subscribe cannot receive more than one message

Note that this issue is related to #22

the commit ( c0bed13) actually solve the go routine leak problem, but we have another problem now:
we cannot receive more than one message using "BindRecvChan"
I also found that in c0bed13#diff-386dae9aaed25b55f29b68c9c0015aa6R79
you actually do

defer func() {
  m.Sub.Unsubscribe()
  recover()
}()

so as far as I can understand, you actually do unsubscribtion and then do recover( this recover is supposed to recover the error of sending message to a closed channel)

How about this:

defer func(){
    if r := recover(); r != nil {
      m.Sub.Unsubscribe()
    }
 }()

, which when the chan is being closed, you can do unsubscribe then.

Close on Stale Connection

Currently all errors are treated as hard closes for the client. I can see where it makes sense for most of the failures, e.g., Authn/Authz. However, for something like Stale Connection it seems a bit extreme. I was wondering if it would be ok to reconnect on a Stale Connection.

ACK subscription hitting consumer too slow in pub-req

Hello,

This is more of an FYI and follow up question, but I expect others will run into this as well.

The docs show an example like..

// Requests
var response string
err := nc.Request("help", "help me", &response, 10*time.Millisecond)

// Replying
c.Subscribe("help", func(subj, reply string, msg string) {
    c.Publish(reply, "I can help!")
})

the request and reply use the same connection, which generally works fine. But in my stress tests the consumer wasn't firing fast enough, and the publish couldn't send anymore messages since "Slow Consumer Detected".

The solution was to have two separate connections to the nats server, one for the producers and one for the consumers. This solved my problem, but it makes me think if the nats client should be connection pooling?

goroutine leaking when calls BindRecvChan

in netchan.go, the caller can pass a channel which reads data back via Subscribe on a topic.
https://github.com/apcera/nats/blob/master/netchan.go#L54

func (c *EncodedConn) bindRecvChan(subject, queue string, channel interface{}) error {

in the underlying implementation, nats defines a callback (i.e. cb) and then call the Subscribe from a conn
https://github.com/apcera/nats/blob/master/netchan.go#L81

_, err := c.Conn.subscribe(subject, queue, cb)

in the conn.subscribe, nats spawns a go routine to send the message:
https://github.com/apcera/nats/blob/master/nats.go#L1198

go nc.deliverMsgs(sub.mch)

and that go routine will be terminated "only if sub.mch is closed", which is called via conn.Unsubscribe().
https://github.com/apcera/nats/blob/master/nats.go#L1244

func (nc *Conn) unsubscribe(sub *Subscription, max int) error {

The bug actually happened, when we look BindRecvChan again: even the caller has closed the channel, the go routine who serves deliverMsg will never be destroyed. We actually have an additional zombie go routine when we call BindReceChan every time.

reconnet problem and fix

First of all, sorry for my English, I am not a English speaker..

This bug will happen when client try to reconnect to a new server, I have 20 clients to connect to 3 gnatsd servers, when I switch gnatsd service in 3 servers (kill and start which clients connected to ) client connections become less and less....

reconnect will do

  1. run readLoop, this function will check health of connection between client and cluster.
  2. check nc is Closed or reconnect and nc.conn == nil, sometimes because of latency create connection will spend a bit more time, and then:
    code:
    "
    if sb || conn == nil {
    break
    }
    "
    will make function readLoop exit before reconnect success.

I change "break" to "continue". I try to switch service between 3 gnatsd servers, reconnect success 100%.

Can't customize TLS connection configuration

In makeTLSConn() it hard codes the following for the TLS Client configuration:

&tls.Config{InsecureSkipVerify: true}

If there was an instance of tls.Config in the Opts structure, the makeTLSConn() could use it (with the default as above). The user could then override the TLS configuration to allow for custom certificates, specifying TLS versions, Cipher suites etc).

Anonymous sync.Mutex in Conn structure

The Conn structure uses anonymous sync.Mutex so Conn.Lock() is exposed, can see in godoc. Should not be exposed if it should be used only by the internal implementation but not by the user application.

Error in Publish after JSON unmarshal error in subscribed message

When using EncodedConn with JSON encoding, a decoding error will set Conn.err: https://github.com/nats-io/nats/blob/ef5165913ac88c53c2e2be39f73f8314490ea5dd/enc.go#L196

After err is set, publishing messages will not succeed because publish() will abort if a global error exists:
https://github.com/nats-io/nats/blob/ef5165913ac88c53c2e2be39f73f8314490ea5dd/nats.go#L1216

Use this test code to reproduce:

package main

import (
    "fmt"
    "github.com/nats-io/nats"
    "time"
)

func main() {
    nc, _ := nats.Connect("nats://localhost:4222")
    c, _ := nats.NewEncodedConn(nc, "json")

    c.Subscribe("test", func(msg interface{}) {
        fmt.Println("Received", msg)
    })
    time.Sleep(500 * time.Millisecond)

    toPublish := [][]byte{
        []byte(`{"msg": 1}`),
        []byte(`{"msg": 2}`),
        []byte(`foo`),
        []byte(`{"msg": 4}`),
        []byte(`{"msg": 5}`),
    }

    for i, data := range toPublish {
        err := nc.PublishMsg(&nats.Msg{
            Subject: "test",
            Data:    data,
        })
        fmt.Printf("Published #%d %s, error: %v\n", i, data, err)
        time.Sleep(500 * time.Millisecond)

    }

}

Also, I wonder how exactly the error handling is meant to work. If i do a Publish() (or any other method), I'd expect to only receive errors regarding the current operation, not any global errors. On the other hand, when the connection to gnatsd goes away, Publish() while the client ist disconnected will succeed without returning an error although the message was not sent to gnatsd.

TLS/SSL Support

Client should support secure communications to the server.

Readme.md code has a bug.

While I know the Readme sample code is just sample. It is incorrect. The line that reads
err := nc.Request("help", "help me", &response, 10*time.Millisecond)

should be
err := c.Request("help", "help me", &response, 10*time.Millisecond)

also (very minor nit.) err is defined but not used. ;)

Proper Locking

Current locking scheme protects writes to the socket and buffer but does not protect subscription and other state in case they are changed in multiple Go routines (subscribe, flush, etc).

AutoUnsubscribe() leaves deliverMsgs() go routine running

Request() uses AutoUnsubscribe(1) and after the reply is received (or has timed-out), the subscription is closed: s.Unsubscribe(). This will close the channel and the go routine will fall out.

However, if an user uses AutoUnsubscribe() because he wants to receive at most 10 messages, and does not worry about how many messages have been received, then the go routine will stay even though it is not doing anything.

I wonder if deliverMsgs() should not exit when (delivered > max)

Order or lack thereof of callbacks

It seems a bit weird that in most cases I will see a ClosedCB and maybe a ReconnectedCB before the DisconnectedCB. Because DCB is async, there is no order guarantee and it makes it difficult to actually do anything when you do see it since you cannot easily determine a reliable sequencing between the various callbacks.

Client is stuck in the readLoop

Looks like we might need to set a read deadline in the client. All we know is that this client is dead to NATs but its not budging off the read.

goroutine 55 [IO wait, 317 minutes]:
net.runtime_pollWait(0x7f4827bc75f8, 0x72, 0x0)
/usr/local/go/src/pkg/runtime/netpoll.goc:146 +0x66
net.(_pollDesc).Wait(0xc208085100, 0x72, 0x0, 0x0)
/usr/local/go/src/pkg/net/fd_poll_runtime.go:84 +0x46
net.(_pollDesc).WaitRead(0xc208085100, 0x0, 0x0)
/usr/local/go/src/pkg/net/fd_poll_runtime.go:89 +0x42
net.(_netFD).Read(0xc2080850a0, 0xc208122000, 0x8000, 0x8000, 0x0, 0x7f4827bc6418, 0xb)
/usr/local/go/src/pkg/net/fd_unix.go:242 +0x34c
net.(_conn).Read(0xc20802c120, 0xc208122000, 0x8000, 0x8000, 0x0, 0x0, 0x0)
/usr/local/go/src/pkg/net/net.go:122 +0xe7
github.com/apcera/nats.(_Conn).readLoop(0xc20802ec00)
/var/vcap/packages/stager/src/github.com/apcera/nats/nats.go:865 +0x24c
created by github.com/apcera/nats.(_Conn).spinUpSocketWatchers
/var/vcap/packages/stager/src/github.com/apcera/nats/nats.go:428 +0x9f

"go test" for nats: 5 test failures against gnatsd

5e5b726 : on OSX 10.10.2/amd64. go1.5.1.

Five nats tests fail out of the box against gnatsd. Details below.

NB, I didn't want to mess with ruby and the ruby nats server, so I changed conn_test to use gnatsd. Shouldn't this be the default since the ruby version is deprecated?

failed tests:

--- FAIL: TestServerSecureConnections (10.04s)
    conn_test.go:47: Timed out trying to connect to gnatsd
...
--- FAIL: TestServerAutoUnsub (0.00s)
    sub_test.go:29: Received 0 msgs, wanted only 10
...
--- FAIL: TestClientASyncAutoUnsub (0.00s)
    sub_test.go:75: Received 0 msgs, wanted only 10
...
--- FAIL: TestAsyncErrHandler (2.54s)
    sub_test.go:197: Failed to call async err handler
...
--- FAIL: TestAsyncSubscribersOnClose (0.01s)
    sub_test.go:259: Expected only one callback, received 0 callbacks
...

my modification to use gnatsd:

jason@jasonsmac ~/src/github.com/nats-io/nats (master) $ git diff
WARNING: terminal is not fully functional
-  (press RETURN) 
diff --git a/conn_test.go b/conn_test.go
index 883ff76..619c461 100644
--- a/conn_test.go
+++ b/conn_test.go
@@ -10,7 +10,7 @@ import (
        "time"
 )

-const natsServer = "nats-server"
+const natsServer = "gnatsd"

 type server struct {
        args []string
jason@jasonsmac ~/src/github.com/nats-io/nats (master) $

the full test run:

jason@jasonsmac ~/src/github.com/nats-io/nats (master) $ go test -v
=== RUN   TestAuthServerStart
--- PASS: TestAuthServerStart (0.06s)
=== RUN   TestAuthConnectionFail
--- PASS: TestAuthConnectionFail (0.00s)
=== RUN   TestAuthConnectionSuccess
--- PASS: TestAuthConnectionSuccess (0.00s)
=== RUN   TestAuthServerStop
--- PASS: TestAuthServerStop (0.00s)
=== RUN   TestServersRandomize
--- PASS: TestServersRandomize (0.00s)
=== RUN   TestServersOption
--- PASS: TestServersOption (0.13s)
=== RUN   TestAuthServers
--- PASS: TestAuthServers (0.12s)
=== RUN   TestSelectNextServer
--- PASS: TestSelectNextServer (0.00s)
=== RUN   TestBasicClusterReconnect
--- PASS: TestBasicClusterReconnect (0.12s)
=== RUN   TestHotSpotReconnect
--- PASS: TestHotSpotReconnect (5.35s)
=== RUN   TestProperReconnectDelay
--- PASS: TestProperReconnectDelay (1.07s)
=== RUN   TestProperFalloutAfterMaxAttempts
--- PASS: TestProperFalloutAfterMaxAttempts (0.82s)
=== RUN   TestTimeoutOnNoServers
--- PASS: TestTimeoutOnNoServers (0.17s)
=== RUN   TestPingReconnect
--- PASS: TestPingReconnect (1.08s)
=== RUN   TestDefaultConnection
--- PASS: TestDefaultConnection (0.06s)
=== RUN   TestConnectionStatus
--- PASS: TestConnectionStatus (0.00s)
=== RUN   TestConnClosedCB
--- PASS: TestConnClosedCB (0.00s)
=== RUN   TestCloseDisconnectedCB
--- PASS: TestCloseDisconnectedCB (0.00s)
=== RUN   TestServerStopDisconnectedCB
--- PASS: TestServerStopDisconnectedCB (0.00s)
=== RUN   TestRestartServer
--- PASS: TestRestartServer (0.06s)
=== RUN   TestServerSecureConnections
--- FAIL: TestServerSecureConnections (10.04s)
    conn_test.go:47: Timed out trying to connect to gnatsd
=== RUN   TestClosedConnections
--- PASS: TestClosedConnections (0.00s)
=== RUN   TestErrOnConnectAndDeadlock
--- PASS: TestErrOnConnectAndDeadlock (0.00s)
=== RUN   TestErrOnMaxPayloadLimit
--- PASS: TestErrOnMaxPayloadLimit (0.00s)
=== RUN   TestConstructorErrs
--- PASS: TestConstructorErrs (0.00s)
=== RUN   TestMarshalString
--- PASS: TestMarshalString (0.00s)
=== RUN   TestMarshalBytes
--- PASS: TestMarshalBytes (0.00s)
=== RUN   TestMarshalInt
--- PASS: TestMarshalInt (0.00s)
=== RUN   TestMarshalInt32
--- PASS: TestMarshalInt32 (0.00s)
=== RUN   TestMarshalInt64
--- PASS: TestMarshalInt64 (0.00s)
=== RUN   TestMarshalFloat32
--- PASS: TestMarshalFloat32 (0.00s)
=== RUN   TestMarshalFloat64
--- PASS: TestMarshalFloat64 (0.00s)
=== RUN   TestMarshalBool
--- PASS: TestMarshalBool (0.00s)
=== RUN   TestExtendedSubscribeCB
--- PASS: TestExtendedSubscribeCB (0.00s)
=== RUN   TestExtendedSubscribeCB2
--- PASS: TestExtendedSubscribeCB2 (0.00s)
=== RUN   TestRawMsgSubscribeCB
--- PASS: TestRawMsgSubscribeCB (0.00s)
=== RUN   TestEncRequest
--- PASS: TestEncRequest (0.00s)
=== RUN   TestEncRequestReceivesMsg
--- PASS: TestEncRequestReceivesMsg (0.00s)
=== RUN   TestAsyncMarshalErr
--- PASS: TestAsyncMarshalErr (0.00s)
=== RUN   TestEncodeNil
--- PASS: TestEncodeNil (0.00s)
=== RUN   TestDecodeDefault
--- PASS: TestDecodeDefault (0.00s)
=== RUN   TestGobMarshalString
--- PASS: TestGobMarshalString (0.00s)
=== RUN   TestGobMarshalInt
--- PASS: TestGobMarshalInt (0.00s)
=== RUN   TestGobMarshalStruct
--- PASS: TestGobMarshalStruct (0.00s)
=== RUN   TestJsonMarshalString
--- PASS: TestJsonMarshalString (0.00s)
=== RUN   TestJsonMarshalInt
--- PASS: TestJsonMarshalInt (0.00s)
=== RUN   TestJsonMarshalStruct
--- PASS: TestJsonMarshalStruct (0.00s)
=== RUN   TestNotMarshableToJson
--- PASS: TestNotMarshableToJson (0.00s)
=== RUN   TestFailedEncodedPublish
--- PASS: TestFailedEncodedPublish (0.00s)
=== RUN   TestDecodeConditionals
--- PASS: TestDecodeConditionals (0.00s)
=== RUN   TestCloseLeakingGoRoutines
--- PASS: TestCloseLeakingGoRoutines (0.02s)
=== RUN   TestConnectedServer
--- PASS: TestConnectedServer (0.00s)
=== RUN   TestMultipleClose
--- PASS: TestMultipleClose (0.00s)
=== RUN   TestBadOptionTimeoutConnect
--- PASS: TestBadOptionTimeoutConnect (0.00s)
=== RUN   TestSimplePublish
--- PASS: TestSimplePublish (0.00s)
=== RUN   TestSimplePublishNoData
--- PASS: TestSimplePublishNoData (0.00s)
=== RUN   TestAsyncSubscribe
--- PASS: TestAsyncSubscribe (0.00s)
=== RUN   TestSyncSubscribe
--- PASS: TestSyncSubscribe (0.00s)
=== RUN   TestPubSubWithReply
--- PASS: TestPubSubWithReply (0.00s)
=== RUN   TestFlush
--- PASS: TestFlush (0.00s)
=== RUN   TestQueueSubscriber
--- PASS: TestQueueSubscriber (0.00s)
=== RUN   TestReplyArg
--- PASS: TestReplyArg (0.00s)
=== RUN   TestSyncReplyArg
--- PASS: TestSyncReplyArg (0.00s)
=== RUN   TestUnsubscribe
--- PASS: TestUnsubscribe (0.00s)
=== RUN   TestDoubleUnsubscribe
--- PASS: TestDoubleUnsubscribe (0.00s)
=== RUN   TestRequestTimeout
--- PASS: TestRequestTimeout (0.01s)
=== RUN   TestRequest
--- PASS: TestRequest (0.00s)
=== RUN   TestRequestNoBody
--- PASS: TestRequestNoBody (0.00s)
=== RUN   TestFlushInCB
--- PASS: TestFlushInCB (0.00s)
=== RUN   TestReleaseFlush
--- PASS: TestReleaseFlush (0.00s)
=== RUN   TestInbox
--- PASS: TestInbox (0.00s)
=== RUN   TestStats
--- PASS: TestStats (0.00s)
=== RUN   TestRaceSafeStats
--- PASS: TestRaceSafeStats (0.20s)
=== RUN   TestBadChan
--- PASS: TestBadChan (0.02s)
=== RUN   TestSimpleSendChan
--- PASS: TestSimpleSendChan (0.00s)
=== RUN   TestFailedChannelSend
--- PASS: TestFailedChannelSend (0.00s)
=== RUN   TestSimpleRecvChan
--- PASS: TestSimpleRecvChan (0.00s)
=== RUN   TestQueueRecvChan
--- PASS: TestQueueRecvChan (0.00s)
=== RUN   TestDecoderErrRecvChan
--- PASS: TestDecoderErrRecvChan (0.00s)
=== RUN   TestRecvChanPanicOnClosedChan
--- PASS: TestRecvChanPanicOnClosedChan (0.00s)
=== RUN   TestRecvChanAsyncLeakGoRoutines
--- PASS: TestRecvChanAsyncLeakGoRoutines (0.05s)
=== RUN   TestRecvChanLeakGoRoutines
--- PASS: TestRecvChanLeakGoRoutines (0.10s)
=== RUN   TestRecvChanMultipleMessages
--- PASS: TestRecvChanMultipleMessages (0.01s)
=== RUN   TestReconnectTotalTime
--- PASS: TestReconnectTotalTime (0.00s)
=== RUN   TestReconnectDisallowedFlags
--- PASS: TestReconnectDisallowedFlags (0.06s)
=== RUN   TestReconnectAllowedFlags
--- PASS: TestReconnectAllowedFlags (0.56s)
=== RUN   TestBasicReconnectFunctionality
--- PASS: TestBasicReconnectFunctionality (0.68s)
=== RUN   TestExtendedReconnectFunctionality
--- PASS: TestExtendedReconnectFunctionality (0.17s)
=== RUN   TestParseStateReconnectFunctionality
--- PASS: TestParseStateReconnectFunctionality (0.68s)
=== RUN   TestQueueSubsOnReconnect
--- PASS: TestQueueSubsOnReconnect (0.23s)
=== RUN   TestIsClosed
--- PASS: TestIsClosed (0.12s)
=== RUN   TestIsReconnectingAndStatus
--- PASS: TestIsReconnectingAndStatus (0.16s)
=== RUN   TestServerAutoUnsub
--- FAIL: TestServerAutoUnsub (0.00s)
    sub_test.go:29: Received 0 msgs, wanted only 10
=== RUN   TestClientSyncAutoUnsub
--- PASS: TestClientSyncAutoUnsub (0.00s)
=== RUN   TestClientASyncAutoUnsub
--- FAIL: TestClientASyncAutoUnsub (0.00s)
    sub_test.go:75: Received 0 msgs, wanted only 10
=== RUN   TestCloseSubRelease
--- PASS: TestCloseSubRelease (0.01s)
=== RUN   TestIsValidSubscriber
--- PASS: TestIsValidSubscriber (0.00s)
=== RUN   TestSlowSubscriber
--- PASS: TestSlowSubscriber (0.06s)
=== RUN   TestSlowAsyncSubscriber
--- PASS: TestSlowAsyncSubscriber (0.05s)
=== RUN   TestAsyncErrHandler
--- FAIL: TestAsyncErrHandler (2.54s)
    sub_test.go:197: Failed to call async err handler
=== RUN   TestAsyncSubscriberStarvation
--- PASS: TestAsyncSubscriberStarvation (0.00s)
=== RUN   TestAsyncSubscribersOnClose
--- FAIL: TestAsyncSubscribersOnClose (0.01s)
    sub_test.go:259: Expected only one callback, received 0 callbacks
=== RUN   TestNextMsgCallOnAsyncSub
--- PASS: TestNextMsgCallOnAsyncSub (0.00s)
=== RUN   TestStopServer
--- PASS: TestStopServer (0.00s)
FAIL
exit status 1
FAIL    github.com/nats-io/nats 25.020s
jason@jasonsmac ~/src/github.com/nats-io/nats (master) $ 

versions:

jason@jasonsmac ~/src/github.com/nats-io/nats (master) $ uname -a
Darwin jasonsmac.local 14.1.1 Darwin Kernel Version 14.1.1: Thu Feb 26 22:41:49 PST 2015; root:xnu-2782.15.5~1/RELEASE_X86_64 x86_64
jason@jasonsmac ~/src/github.com/nats-io/nats (master) $ go version
go version go1.5.1 darwin/amd64
jason@jasonsmac ~/src/github.com/nats-io/nats (master) $ git log|head
commit 5e5b726926cb1640630fb80c3420187be0b45da8
Merge: f815164 5ad2583
Author: Derek Collison <[email protected]>
Date:   Sat Sep 5 10:27:09 2015 -0500

    Merge pull request #65 from wallyqs/delivered-compare-data-race
    ...

show that gnatsd is working:

jason@jasonsmac ~/src/github.com/nats-io/nats (master) $ which gnatsd
/Users/jason/bin/gnatsd
jason@jasonsmac ~/src/github.com/nats-io/nats (master) $ gnatsd
[47224] 2015/09/12 13:00:45.594616 [INF] Starting gnatsd version 0.6.6
[47224] 2015/09/12 13:00:45.594703 [INF] Listening for client connections on 0.0.0.0:4222
[47224] 2015/09/12 13:00:45.594804 [INF] gnatsd is ready [ctrl-c]
  C-c C-c[47224] 2015/09/12 13:00:46.348888 [INF] Server Exiting..

show gnatsd version:

jason@jasonsmac ~/src/github.com/nats-io/nats (master) $ cd ../gnatsd/
jason@jasonsmac ~/src/github.com/nats-io/gnatsd (master) $ git log
WARNING: terminal is not fully functional
-  (press RETURN)
commit 7620f4acc7eb832eac01fb7f0857a0ebaf0bcafb
Merge: 139ce42 093548c
Author: Derek Collison <[email protected]>
Date:   Thu Sep 10 11:18:46 2015 -0400

    Merge pull request #115 from wallyqs/update-max-payload-test
...

Reconnect Logic

The client should be able to auto-reconnect similar to the ruby client with options that can suppress the behavior and dictate how long between attempts and how many attempts total before giving up.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.