Giter Club home page Giter Club logo

jsm.go's Introduction

Overview

This is a Go based library to manage and interact with JetStream.

This package is the underlying library for the nats CLI, our Terraform provider, GitHub Actions and Kubernetes CRDs. It's essentially a direct wrapping of the JetStream API with few userfriendly features and requires deep technical knowledge of the JetStream internals.

For typical end users we suggest the nats.go package.

Initialization

This package is modeled as a Manager instance that receives a NATS Connection and sets default timeouts and validation for all interaction with JetStream.

Multiple Managers can be used in your application each with own timeouts and connection.

mgr, _ := jsm.New(nc, jsm.WithTimeout(10*time.Second))

This creates a Manager with a 10 second timeout when accessing the JetStream API. All examples below assume a manager was created as above.

Schema Registry

All the JetStream API messages and some events and advisories produced by the NATS Server have JSON Schemas associated with them, the api package has a Schema Registry and helpers to discover and interact with these.

The Schema Registry can be accessed on the cli in the nats schemas command where you can list, search and view schemas and validate data based on schemas.

Example Message

To retrieve the Stream State for a specific Stream one accesses the $JS.API.STREAM.INFO.<stream> API, this will respond with data like below:

{
  "type": "io.nats.jetstream.api.v1.stream_info_response",
  "config": {
    "name": "TESTING",
    "subjects": [
      "js.in.testing"
    ],
    "retention": "limits",
    "max_consumers": -1,
    "max_msgs": -1,
    "max_bytes": -1,
    "discard": "old",
    "max_age": 0,
    "max_msg_size": -1,
    "storage": "file",
    "num_replicas": 1,
    "duplicate_window": 120000000000
  },
  "created": "2020-10-09T12:40:07.648216464Z",
  "state": {
    "messages": 1,
    "bytes": 81,
    "first_seq": 1017,
    "first_ts": "2020-10-09T19:43:40.867729419Z",
    "last_seq": 1017,
    "last_ts": "2020-10-09T19:43:40.867729419Z",
    "consumer_count": 1
  }
}

Here the type of the message is io.nats.jetstream.api.v1.stream_info_response, the API package can help parse this into the correct format.

Message Schemas

Given a message kind one can retrieve the full JSON Schema as bytes:

schema, _ := api.Schema("io.nats.jetstream.api.v1.stream_info_response")

Once can also retrieve it based on a specific message content:

schemaType, _ := api.SchemaTypeForMessage(m.Data)
schema, _ := api.Schema(schemaType)

Several other Schema related helpers exist to search Schemas, fine URLs and more. See the api Reference.

Parsing Message Content

JetStream will produce metrics about message Acknowledgments, API audits and more, here we subscribe to the metric subject and print a specific received message type.

nc.Subscribe("$JS.EVENT.ADVISORY.>", func(m *nats.Msg){
    kind, msg, _ := api.ParseMessage(m.Data)
    log.Printf("Received message of type %s", kind) // io.nats.jetstream.advisory.v1.api_audit

    switch e := event.(type){
    case advisory.JetStreamAPIAuditV1:
        fmt.Printf("Audit event on subject %s from %s\n", e.Subject, e.Client.Name)
    }
})

Above we gain full access to all contents of the message in it's native format, but we need to know in advance what we will get, we can render the messages as text in a generic way though:

nc.Subscribe("$JS.EVENT.ADVISORY.>", func(m *nats.Msg){
    kind, msg, _ := api.ParseMessage(m.Data)

    if kind == "io.nats.unknown_message" {
        return // a message without metadata or of a unknown format was received
    }

    ne, ok := event.(api.Event)
    if !ok {
        return fmt.Errorf("event %q does not implement the Event interface", kind)
    }

    err = api.RenderEvent(os.Stdout, ne, api.TextCompactFormat)
    if err != nil {
        return fmt.Errorf("display failed: %s", err)
    }
})

This will produce output like:

11:25:49 [JS API] $JS.API.STREAM.INFO.TESTING $G
11:25:52 [JS API] $JS.API.STREAM.NAMES $G
11:25:52 [JS API] $JS.API.STREAM.NAMES $G
11:25:53 [JS API] $JS.API.STREAM.INFO.TESTING $G

The api.TextCompactFormat is one of a few we support, also api.TextExtendedFormat for a full multi line format, api.ApplicationCloudEventV1Format for CloudEvents v1 format and api.ApplicationJSONFormat for JSON.

API Validation

The data structures sent to JetStream can be validated before submission to NATS which can speed up user feedback and provide better errors.

type SchemaValidator struct{}

func (v SchemaValidator) ValidateStruct(data any, schemaType string) (ok bool, errs []string) {
	s, err := api.Schema(schemaType)
	if err != nil {
		return false, []string{"unknown schema type %s", schemaType}
	}

	ls := gojsonschema.NewBytesLoader(s)
	ld := gojsonschema.NewGoLoader(data)
	result, err := gojsonschema.Validate(ls, ld)
	if err != nil {
		return false, []string{fmt.Sprintf("validation failed: %s", err)}
	}

	if result.Valid() {
		return true, nil
	}

	errors := make([]string, len(result.Errors()))
	for i, verr := range result.Errors() {
		errors[i] = verr.String()
	}

	return false, errors
}

This is a api.StructValidator implementation that uses JSON Schema to do deep validation of the structures sent to JetStream.

This can be used by the Manager to validate all API access.

mgr, _ := jsm.New(nc, jsm.WithAPIValidation(new(SchemaValidator)))

jsm.go's People

Contributors

1995parham avatar atombender avatar boris-ilijic avatar bruth avatar codegangsta avatar coffeeri avatar davedotdev avatar gcolliso avatar jarema avatar jnmoyne avatar kozlovic avatar kuklyy avatar mashinamashina avatar masudur-rahman avatar matthewdevenny avatar matthiashanel avatar neilalexander avatar philpennock avatar piotrpio avatar ramonberrutti avatar ripienaar avatar scottf avatar shurya-kumar avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

jsm.go's Issues

Missing DeliverNew, DeliverLast, DeliverNext methods

DeliverAllAvailable() exists for the creation of a consumer. There are no functions for DeliverPolicy of new, last or next.

func DeliverAllAvailable() ConsumerOption {
	return func(o *ConsumerCfg) error {
		resetDeliverPolicy(o)
		o.ConsumerConfig.DeliverPolicy = api.DeliverAll
		return nil
	}
}

Recommended:

func DeliverNextAvailable() ConsumerOption {
	return func(o *ConsumerCfg) error {
		resetDeliverPolicy(o)
		o.ConsumerConfig.DeliverPolicy = api.DeliverNext
		return nil
	}
}
func DeliverNew() ConsumerOption {
	return func(o *ConsumerCfg) error {
		resetDeliverPolicy(o)
		o.ConsumerConfig.DeliverPolicy = api.DeliverNew
		return nil
	}
}
func DeliverLast() ConsumerOption {
	return func(o *ConsumerCfg) error {
		resetDeliverPolicy(o)
		o.ConsumerConfig.DeliverPolicy = api.DeliverLast
		return nil
	}
}

stream configuration message related variables

In nats-server, stream.go there is this:

type StreamConfig struct {
        ...
	MaxMsgs      int64           `json:"max_msgs"`
	MaxBytes     int64           `json:"max_bytes"`
        ...
	MaxMsgsPer   int64           `json:"max_msgs_per_subject"`

In jsm.go, definition for stream_configuration the same are defined as

"$ref": "#/definitions/golang_uint64"

Which one needs to change?

support new jetstream api structure

nats-io/nats-server#1369

This includes some changes to existing endpoint inputs and outputs

Best way to read all the messages of a pull stream from the first to the last

Hi,

could you please somebody advise on the best way to read all the messages from a pull stream with(or if it is not supported, without ) this library?

I have a stream with a single hello message.

$ nats req "FOO.BAR" hello

I tried the following code:

package main

import (
  "fmt"
  "github.com/nats-io/nats.go"
  "github.com/nats-io/jsm.go"
)

func main() {
  nc, _ := nats.Connect(nats.DefaultURL)

  stream, err := jsm.LoadOrNewStreamFromDefault(
    "MYSTREAM",
    jsm.DefaultStreamConfiguration,
    jsm.Subjects("FOO.*"),
    jsm.StreamConnection(jsm.WithConnection(nc)),
    jsm.FileStorage(),
    jsm.DiscardNew(),
  )

  if err != nil {
    panic(err)
  }

  consumer, err := stream.LoadOrNewConsumerFromDefault(
    "FOO",
    jsm.SampledDefaultConsumer,
    jsm.DurableName("MYSTREAM"),
  )

  if err != nil {
    panic(err)
  }


  // Load all the events ...
  for {
    msg, err := consumer.NextMsg()

    if err != nil {
      fmt.Println("Error while reading a message:", err)
      continue
    }
    fmt.Println(".MESSAGE:", msg)
    meta, err := jsm.ParseJSMsgMetadata(msg)
    if err != nil {
      fmt.Println(err)
      continue
    }
    fmt.Println(meta)
  }
}

The above code is an infinite loop, which most of the times just times out, but after a random number of time outs it reads the first message from the stream.

How can one accomplish my task of just iterating over all of the messages in the stream and after the last message just stop the iteration?

I think we need more timeout options

A single timeout value for things as different as IsJetStreamEnabled() and consumer.NextMsg() makes it hard to use pull mode to block the client until some Msg is available.

It can be either the ability to add timeout to the NextMsg (and all the other functions that return something related to next messages) call, but maybe better would be to have NextMsgWithContext(). That way the timeout would be the callers responsibility

What should I use as a jetstream go client

Hi,

This is more of a documentation request. As a go developper interacting with Jetstream nats server, should I use :

The whole model and logic seems to differ, between pull-push consumers / subcriptions : for instance, using this library, it feels like pull based consumer are kind of "deprecated".

Bref, If you can help me find the current reference implementation of jetstream go client that would be great.

Best,

Nats server cpu spike usage 100%

In order to reproduce this issue you need to run this program twice. First run after it publish 2 msgs you can terminate, then on a second run you have to wait ~1-2 min for CPU to spike and that's coming from nats server.
Nats server and jsm.go are latest version I just pulled today.

As you can see in the program I'm not ack-ing (m.Respond(nil)), if I do then everything works.

package main

import (
	"context"
	"fmt"
	"log"

	"time"

	"github.com/nats-io/jsm.go"
	"github.com/nats-io/nats.go"
)

func main() {

	nc, err := nats.Connect("localhost:4222")
	if err != nil {
		log.Fatal(err)
	}
	defer nc.Close()

	jsm.SetConnection(nc)

	stream, err := jsm.NewStreamFromDefault(
		"ORDERS",
		jsm.DefaultStream,
		jsm.StreamConnection(jsm.WithConnection(nc)),
		jsm.FileStorage(),
		jsm.MaxAge(24*time.Hour),
		jsm.Subjects("ORDERS.*"),
	)
	if err != nil {
		log.Fatal(err)
	}

	defer stream.Delete()

	nc.Publish("ORDERS.test", []byte("something"))
	nc.Publish("ORDERS.test", []byte("something"))

	ctx, cancel := context.WithTimeout(context.Background(), time.Hour*1)
	defer cancel()

	c, err := jsm.NewConsumer("ORDERS",
		jsm.DurableName("PULL"),
		jsm.FilterStreamBySubject("ORDERS.test"),
		jsm.StartAtSequence(1),
	)
	if err != nil {
		log.Fatal(err)
	}

	readPullMsg(c, ctx)
	if err := c.Delete(); err != nil {
		log.Fatal(err)
	}
}

func readPullMsg(c *jsm.Consumer, ctx context.Context) {
	state, err := c.DeliveredState()
	if err != nil {
		fmt.Println(err)
		return
	}
	pendingMsgs, err := c.PendingMessageCount()
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println("durableName:", c.DurableName(), "consumerSeq:", state.ConsumerSeq, "streamSeq:", state.StreamSeq, "startSequence:", c.StartSequence(), "pendingMsgs:", pendingMsgs)
	for {
		m, err := c.NextMsg(jsm.WithContext(ctx))
		if err != nil {
			log.Println(err)
			break
		}

		fmt.Printf("msg data: %s\n", m.Data)
		//m.Respond(nil)
	}
}

jsm library sends invalid non configured consumer

MSG_PAYLOAD: ["{"stream_name":"faas-cluster","config":{"delivery_subject":"","durable_name":"faas","start_time":"0001-01-01T00:00:00Z","deliver_all":true,"deliver_last":true,"ack_policy":"explicit","ack_wait":30000000000,"filter_subject":"faas-cluster.faas-request","replay_policy":"instant"}}"]

I would fix this by modifying the consumer format to be similar to ack_policy:
"delivery_policy":"all" or "delivery_policy":"last"

not have two mutually exclusive fields.

17:59:00  ~/repos/jetstream/nats  master ?1  ./nats -s 0.0.0.0:4222 consumer info faas-cluster  TEST -j
{
  "stream_name": "faas-cluster",
  "name": "TEST",
  "config": {
    "delivery_subject": "",
    "durable_name": "TEST",
    "start_time": "0001-01-01T00:00:00Z",
    "deliver_last": true,
    "ack_policy": "explicit",
    "ack_wait": 30000000000,
    "max_deliver": -1,
    "filter_subject": "faas-cluster.faas-reques",
    "replay_policy": "instant"
  },
  "state": {
    "delivered": {
      "consumer_seq": 0,
      "stream_seq": 0
    },
    "ack_floor": {
      "consumer_seq": 0,
      "stream_seq": 0
    },
    "pending": null,
    "redelivered": null
  }
}
 17:59:03  ~/repos/jetstream/nats  master ?1  ./nats -s 0.0.0.0:4222 consumer info faas-cluster  TEST2 -j
{
  "stream_name": "faas-cluster",
  "name": "TEST2",
  "config": {
    "delivery_subject": "",
    "durable_name": "TEST2",
    "start_time": "0001-01-01T00:00:00Z",
    "deliver_all": true,
    "ack_policy": "explicit",
    "ack_wait": 30000000000,
    "max_deliver": -1,
    "filter_subject": "faas-cluster.faas-reques",
    "replay_policy": "instant"
  },
  "state": {
    "delivered": {
      "consumer_seq": 0,
      "stream_seq": 0
    },
    "ack_floor": {
      "consumer_seq": 0,
      "stream_seq": 0
    },
    "pending": null,
    "redelivered": null
  }
}
 17:59:04  ~/repos/jetstream/nats  master ?1 

Why consumer's stream_paper is created auto?

In my case, When I check consumers in a Streams, I see many consumers' stream_paper is created with pull mode. This makes many messages that cannot ack in these consumers' stream_paper. Can you have me prevent consumer stream_paper auto?

LoadOrNewConsumer is not working well

So I have this code

orderConsumer, err = mgr.LoadOrNewConsumer("ORDERS",
	"CONSUMER_NAME",
	jsm.DurableName("CONSUMER_DURABLE"),
	jsm.AcknowledgeExplicit(),
	jsm.ReplayInstantly(),
	jsm.SamplePercent(20),
	jsm.StartAtTimeDelta(3*time.Hour))
if err != nil {
	log.Fatalf("Failed to create a jetstream consumer! Reason: %q\n", err)
}

When the consumer stops and restart, it got a consumer already exists error.
This also happens when you have multiple consumers with the same name and durable (supposedly they can connect just well, similar with nats-streaming's queue subscribe. CMIIW).

I checked the code, then I found out that when the consumer is not found, then a new consumer is created by the durable_name instead of the name. And when loading consumer, it searches by the name, not the durable_name.

So far, I can do something like this to handle the error. And it works.

	orderConsumer, err := mgr.LoadConsumer("ORDERS",
		"CONSUMER_DURABLE")
	if err != nil {
		log.Printf("Failed to create a jetstream consumer! Reason: %q\n", err)
	}

	if orderConsumer == nil {
		orderConsumer, err = mgr.NewConsumer(event."ORDERS",
			jsm.DurableName("CONSUMER_DURABLE"),
			jsm.AcknowledgeExplicit(),
			jsm.ReplayInstantly(),
			jsm.SamplePercent(20),
			jsm.StartAtTimeDelta(3*time.Hour))
		if err != nil {
			log.Fatalf("Failed to create a jetstream consumer! Reason: %q\n", err)
		}
	}

or

orderConsumer, err = mgr.LoadOrNewConsumer("ORDERS",
	"CONSUMER_DURABLE",
	jsm.DurableName("CONSUMER_DURABLE"),
	jsm.AcknowledgeExplicit(),
	jsm.ReplayInstantly(),
	jsm.SamplePercent(20),
	jsm.StartAtTimeDelta(3*time.Hour))
if err != nil {
	log.Fatalf("Failed to create a jetstream consumer! Reason: %q\n", err)
}

Is there anything I'm missing? Or should the consumer name and durable name be the same?

Thanks in advance.

Feature request: API for server/cluster info

I would find useful to have an API to learn about the cluster state, not specific to any streams or consumers.

JSM seems like a good place for it.

Looking at the natscli, I see that the implementation of server list is doing this via PING (source).

I could duplicate this code, but that seems like a fragile approach.

Could the natscli implementation get pushed down to JSM, where it can be reused?

Consumer Delete returns NotFoundError

I made small util to tail messages in the stream. Just to monitor if the messages in the stream are expected.
That util creates new consumer and deletes it at the exit.

Most of the calls (not all) to consumer delete return "consumer not found" error. Although consumer was existing before the call to the delete and deleted after.
Not a big deal I just skip that type of the errors:

if err := cs.Delete(); err != nil {
	if jserr, ok := err.(api.ApiError); ok && jserr.NotFoundError() {
		return nil
	}
	return fmt.Errorf("delete consumer failed %w", err)
}

But it is unexpected behavior.

Push Base msg not being delivered when consumer is deleted and recreated

package main

import (
	"context"
	"fmt"
	"log"

	"time"

	"github.com/nats-io/jsm.go"
	"github.com/nats-io/nats.go"
)

func main() {

	nc, err := nats.Connect("localhost:4222")
	if err != nil {
		log.Fatal(err)
	}
	defer nc.Close()

	jsm.SetConnection(nc)

	stream, err := jsm.NewStreamFromDefault(
		"ORDERS",
		jsm.DefaultStream,
		jsm.StreamConnection(jsm.WithConnection(nc)),
		jsm.FileStorage(),
		jsm.MaxAge(24*time.Hour),
		jsm.Subjects("ORDERS.*"),
	)
	if err != nil {
		log.Fatal(err)
	}

	defer stream.Delete()

	nc.Publish("ORDERS.test", []byte("something"))

	ctx, _ := context.WithTimeout(context.Background(), time.Second*1)

	consumer, err := jsm.NewConsumer("ORDERS",
		jsm.DurableName("PUSH"),
		jsm.DeliverySubject("test"),
		jsm.StartAtSequence(1),
	)
	if err != nil {
		log.Fatal(err)
	}

	readPushMsg(consumer, ctx)
	if err := consumer.Delete(); err != nil {
		log.Fatal(err)
	}

	ctx, _ = context.WithTimeout(context.Background(), time.Second*1)

	consumer2, err := jsm.NewConsumer("ORDERS",
		jsm.DurableName("PUSH"),
		jsm.DeliverySubject("test"),
		jsm.StartAtSequence(1),
	)
	if err != nil {
		log.Fatal(err)
	}

	readPushMsg(consumer2, ctx)
	if err := consumer2.Delete(); err != nil {
		log.Fatal(err)
	}
}

func readPushMsg(c *jsm.Consumer, ctx context.Context) {
	state, err := c.DeliveredState()
	if err != nil {
		fmt.Println(err)
		return
	}
	pendingMsgs, err := c.PendingMessageCount()
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println("durableName:", c.DurableName(), "consumerSeq:", state.ConsumerSeq, "streamSeq:", state.StreamSeq, "startSequence:", c.StartSequence(), "pendingMsgs:", pendingMsgs)
	for {
		sub, err := c.SubscribeSync()
		m, err := sub.NextMsgWithContext(ctx)
		if err != nil {
			log.Println(err)
			break
		}

		fmt.Printf("msg data: %s\n", m.Data)
		m.Respond(nil)
	}
}

output:

durableName: PUSH consumerSeq: 0 streamSeq: 0 startSequence: 1 pendingMsgs: 0
msg data: something
2020/06/24 00:01:41 context deadline exceeded
durableName: PUSH consumerSeq: 1 streamSeq: 1 startSequence: 1 pendingMsgs: 1
2020/06/24 00:01:42 context deadline exceeded

support mirrors

consumer.NextMsgs does not work as expected

If remaining number of messages is fewer than, old msgs will be returned instead of receiving a smaller slice.

execution of the provided example code:

connect
check if stream exists
remove old stream
creating stream
creating consumer
start to emit msgs
emitted 15k msgs, all with successful ack 1.387078373s
start consuming, timeout set to 5 seconds but NextMsgs uses context
we're at 14900, time spent is 621.162288ms
0s, received msg strseq: 14991 conseq: 14991 ...acked
0s, received msg strseq: 14992 conseq: 14992 ...acked
0s, received msg strseq: 14993 conseq: 14993 ...acked
0s, received msg strseq: 14994 conseq: 14994 ...acked
NextMsgs, got 7 msgs
4s, received msg strseq: 14995 conseq: 14995 ...acked
4s, received msg strseq: 14996 conseq: 14996 ...acked
4s, received msg strseq: 14997 conseq: 14997 ...acked
4s, received msg strseq: 14998 conseq: 14998 ...acked
4s, received msg strseq: 14999 conseq: 14999 ...acked
4s, received msg strseq: 15000 conseq: 15000 ...acked
4s, received msg strseq: 14995 conseq: 15001 ...acked
NextMsgs, got 7 msgs
8s, received msg strseq: 14996 conseq: 15002 ...acked
8s, received msg strseq: 14997 conseq: 15003 ...acked
8s, received msg strseq: 14998 conseq: 15004 ...acked
8s, received msg strseq: 14999 conseq: 15005 ...acked
8s, received msg strseq: 15000 conseq: 15006 ...acked
8s, received msg strseq: 14996 conseq: 15007 ...acked
8s, received msg strseq: 14997 conseq: 15008 ...acked
NextMsgs, got 7 msgs
14s, received msg strseq: 14998 conseq: 15009 ...acked
14s, received msg strseq: 14999 conseq: 15010 ...acked
14s, received msg strseq: 15000 conseq: 15011 ...acked
14s, received msg strseq: 14998 conseq: 15012 ...acked
14s, received msg strseq: 14999 conseq: 15013 ...acked
14s, received msg strseq: 15000 conseq: 15014 ...acked
14s, received msg strseq: 14998 conseq: 15015 ...acked

package main

import (
	"bytes"
	"context"
	"errors"
	"fmt"
	"os"
	"os/signal"
	"strconv"
	"syscall"
	"time"

	"github.com/nats-io/jsm.go"
	"github.com/nats-io/nats.go"
)

func main() {

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// graceful terminate on interrupt
	c := make(chan os.Signal, 1)
	signal.Notify(c, os.Interrupt, syscall.SIGTERM)
	go func() {
		<-c
		cancel()
	}()

	println("connect")
	nc, err := nats.Connect("nats://localhost:4222",
		nats.MaxReconnects(6),
		nats.ReconnectWait(5*time.Second),
		nats.ClosedHandler(func(nc *nats.Conn) {
			println("nats closed")
			cancel()
		}),
		nats.DisconnectHandler(func(nc *nats.Conn) {
			println("nats disconnected")
		}))
	if err != nil {
		panic(err)
	}

	jsm.SetConnection(nc)

	println("check if stream exists")
	exists, err := jsm.IsKnownStream("demostream")
	if err != nil {
		panic(err)
	}

	if exists {
		println("remove old stream")
		// not sure how to remove stream without loading it first
		stream, err := jsm.LoadStream("demostream")
		if err != nil {
			panic(err)
		}
		if err := stream.Delete(); err != nil {
			panic(err)
		}
	}

	println("creating stream")
	stream, err := jsm.NewStream("demostream",
		jsm.FileStorage(),
		jsm.LimitsRetention(),
		jsm.MaxMessages(-1),
		jsm.MaxAge(1<<63-1),
		jsm.Subjects([]string{"test.subject"}...),
		jsm.Replicas(1),                              // TODO: increase when jetstream support it
		jsm.StreamConnection(jsm.WithConnection(nc)), //TODO: handle when we have multiple nc so we connect to correct one
	)
	if err != nil {
		panic(err)
	}

	println("creating consumer")
	consumer, err := stream.NewConsumer(
		jsm.DurableName("democonsumer"),
		jsm.DeliverAllAvailable(),
		jsm.AckWait(2*time.Second),
		jsm.AcknowledgeExplicit(),
		jsm.MaxDeliveryAttempts(10),
		jsm.ConsumerConnection(jsm.WithContext(ctx)),
	)
	if err != nil {
		panic(err)
	}

	println("start to emit msgs")
	startPub := time.Now()
	for n := 0; n < 15000; n++ {
		resp, err := nc.Request("test.subject", []byte(strconv.Itoa(n+1)), 2*time.Second)
		if err != nil {
			panic(err)
		}
		if !bytes.Equal(resp.Data, []byte("+OK")) {
			panic(fmt.Errorf("Did not get ack on emit for no %d", n+1))
		}
	}
	println("emitted 15k msgs, all with successful ack ", time.Since(startPub).String())

	jsm.SetTimeout(5 * time.Second)
	println("start consuming, timeout set to 5 seconds but NextMsgs uses context")
	startCon := time.Now()
	no := 0
	for {
		msgs, err := consumer.NextMsgs(7)
		if errors.Is(err, context.Canceled) {
			return
		}
		if err != nil {
			panic(err)
		}
		if no > 14990 {
			println("NextMsgs, got ", len(msgs), " msgs")
		}
		for _, msg := range msgs {
			no++
			if no == 14990 {
				println("we're at 14900, time spent is ", time.Since(startCon).String())
			}

			i, err := jsm.ParseJSMsgMetadata(msg)
			if err != nil {
				panic(err)
			}

			if no > 14990 {
				print(int(time.Since(startCon).Seconds()), "s, received msg strseq: ", i.StreamSequence(), " conseq: ", i.ConsumerSequence(), " ...")
			}

			if err := msg.Respond(nil); err != nil {
				panic(err)
			}
			if no > 14990 {
				println("acked")
			}
		}
	}
}

question: can jsm.ParseJSMMetadata (or another helper) also provide metadata for StoredMsg

for push consumer:

can jsm.ParseJSMMetadata (or another helper) also provide metadata for the StoredMsg?

This would avoid a extra roundtrip to retrive the stored message just to get it's metadata (time and subject, both of them needed if the any of the event's data is in the subject and/or if the actual time the event was emitted/became visible is needed).

It is otherwise one extra roundtrip and one extra haul of the msg.Data for each received msg from the push consumer if it it's solved clientside.

Updates from JS clustering work

Can no longer save context with --user and --password

I have utility scripts that create contexts for new environments including using simple username and password credentials. Using today, received error:

$ nats ctx save UserA1 --server "nats://vbox1.tinghus.net:4222" --user UserA1 --password s3cr3t                                                                 
nats: error: too many types of credentials. Choose only one from 'user/password', 'creds', 'nkey', 'token', 'nsc'

Verified no ghost context:

$ nats ctx --no-context save UserA1 --server "nats://vbox1.tinghus.net:4222" --user UserA1 --password s3cr3t
nats: error: too many types of credentials. Choose only one from 'user/password', 'creds', 'nkey', 'token', 'nsc'

Suspect this December commit broke it:
5aa2262b Jeremy Saenz 

@codegangsta
@ripienaar

natscontext: support loading Context from a filepath

Add a method to the natscontext package to support loading a context from a filepath

This would enable use cases such as supporting a NATS_CONTEXT env var for the NATS CLI.

Example method signature would be func NewFromFile(filename string, opts ...Option) (*Context, error)

jsm.MsgInfo sequence methods return int

This is an observation, but partly a question. I noticed the StreamSequence() and ConsumerSequence() returns an int which implies to me the max size of the stream that can be represented is the max int32 value. I presume a stream is not bound by this upper limit? I would assume max int64 would be the upper bound at least?

Multiple subscribers to the same stream don't receive messages

Hello.

I am experienced some problems with this library v0.20.0. I don't if because there is a bug or because I am not creating consumers properly.

I have created a stream with subjects "TIMESERIES.*".
If I created one consumers and start pulling messages using NextMsgContext I can receive messages successfuly. I have tested with consumer's FilterSubject set to "TIMESERIES.IN", "TIMESERIES.PERFORMANCE" and "TIMESERIES.PREDICTIVE".

If I create the stream and create 2 consumers, one with FilterSubject set to "TIMESERIES.PERFORMANCE" and another set to "TIMESERIES.PREDICTIVE" I am not able to receive any message in any consumer. Both consumers have different Durable names.

I have tried creating the consumers using stream.LoadOrNewConsumerFromDefault and also with manager.LoadOrNewConsumerFromDefault.

Could someone help me with this issue, please?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.