Giter Club home page Giter Club logo

influxdb-client-go's Introduction

InfluxDB Client Go

CircleCI codecov License Slack Status

This repository contains the Go client library for use with InfluxDB 2.x and Flux. InfluxDB 3.x users should instead use the lightweight v3 client library. InfluxDB 1.x users should use the v1 client library.

For ease of migration and a consistent query and write experience, v2 users should consider using InfluxQL and the v1 client library.

Features

Documentation

This section contains links to the client library documentation.

Examples

Examples for basic writing and querying data are shown below in this document

There are also other examples in the API docs:

How To Use

Installation

Go 1.17 or later is required.

Go mod project

  1. Add the latest version of the client package to your project dependencies (go.mod).
    go get github.com/influxdata/influxdb-client-go/v2
  2. Add import github.com/influxdata/influxdb-client-go/v2 to your source code.

GOPATH project

```sh
go get github.com/influxdata/influxdb-client-go
```

Note: To have go get in the GOPATH mode, the environment variable GO111MODULE must have the off value.

Basic Example

The following example demonstrates how to write data to InfluxDB 2 and read them back using the Flux language:

package main

import (
    "context"
    "fmt"
    "time"

    "github.com/influxdata/influxdb-client-go/v2"
)

func main() {
    // Create a new client using an InfluxDB server base URL and an authentication token
    client := influxdb2.NewClient("http://localhost:8086", "my-token")
    // Use blocking write client for writes to desired bucket
    writeAPI := client.WriteAPIBlocking("my-org", "my-bucket")
    // Create point using full params constructor
    p := influxdb2.NewPoint("stat",
        map[string]string{"unit": "temperature"},
        map[string]interface{}{"avg": 24.5, "max": 45.0},
        time.Now())
    // write point immediately
    writeAPI.WritePoint(context.Background(), p)
    // Create point using fluent style
    p = influxdb2.NewPointWithMeasurement("stat").
        AddTag("unit", "temperature").
        AddField("avg", 23.2).
        AddField("max", 45.0).
        SetTime(time.Now())
    err := writeAPI.WritePoint(context.Background(), p)
	if err != nil {
		panic(err)
	}
    // Or write directly line protocol
    line := fmt.Sprintf("stat,unit=temperature avg=%f,max=%f", 23.5, 45.0)
    err = writeAPI.WriteRecord(context.Background(), line)
	if err != nil {
		panic(err)
	}

    // Get query client
    queryAPI := client.QueryAPI("my-org")
    // Get parser flux query result
    result, err := queryAPI.Query(context.Background(), `from(bucket:"my-bucket")|> range(start: -1h) |> filter(fn: (r) => r._measurement == "stat")`)
    if err == nil {
        // Use Next() to iterate over query result lines
        for result.Next() {
            // Observe when there is new grouping key producing new table
            if result.TableChanged() {
                fmt.Printf("table: %s\n", result.TableMetadata().String())
            }
            // read result
            fmt.Printf("row: %s\n", result.Record().String())
        }
        if result.Err() != nil {
            fmt.Printf("Query error: %s\n", result.Err().Error())
        }
    } else {
		panic(err)
    }
    // Ensures background processes finishes
    client.Close()
}

Options

The InfluxDBClient uses set of options to configure behavior. These are available in the Options object Creating a client instance using

client := influxdb2.NewClient("http://localhost:8086", "my-token")

will use the default options.

To set different configuration values, e.g. to set gzip compression and trust all server certificates, get default options and change what is needed:

client := influxdb2.NewClientWithOptions("http://localhost:8086", "my-token",
    influxdb2.DefaultOptions().
        SetUseGZip(true).
        SetTLSConfig(&tls.Config{
            InsecureSkipVerify: true,
        }))

Writes

Client offers two ways of writing, non-blocking and blocking.

Non-blocking write client

Non-blocking write client uses implicit batching. Data are asynchronously written to the underlying buffer and they are automatically sent to a server when the size of the write buffer reaches the batch size, default 5000, or the flush interval, default 1s, times out. Writes are automatically retried on server back pressure.

This write client also offers synchronous blocking method to ensure that write buffer is flushed and all pending writes are finished, see Flush() method. Always use Close() method of the client to stop all background processes.

Asynchronous write client is recommended for frequent periodic writes.

package main

import (
    "fmt"
    "math/rand"
    "time"

    "github.com/influxdata/influxdb-client-go/v2"
)

func main() {
    // Create a new client using an InfluxDB server base URL and an authentication token
    // and set batch size to 20
    client := influxdb2.NewClientWithOptions("http://localhost:8086", "my-token",
        influxdb2.DefaultOptions().SetBatchSize(20))
    // Get non-blocking write client
    writeAPI := client.WriteAPI("my-org","my-bucket")
    // write some points
    for i := 0; i <100; i++ {
        // create point
        p := influxdb2.NewPoint(
            "system",
            map[string]string{
                "id":       fmt.Sprintf("rack_%v", i%10),
                "vendor":   "AWS",
                "hostname": fmt.Sprintf("host_%v", i%100),
            },
            map[string]interface{}{
                "temperature": rand.Float64() * 80.0,
                "disk_free":   rand.Float64() * 1000.0,
                "disk_total":  (i/10 + 1) * 1000000,
                "mem_total":   (i/100 + 1) * 10000000,
                "mem_free":    rand.Uint64(),
            },
            time.Now())
        // write asynchronously
        writeAPI.WritePoint(p)
    }
    // Force all unwritten data to be sent
    writeAPI.Flush()
    // Ensures background processes finishes
    client.Close()
}

Handling of failed async writes

WriteAPI by default continues with retrying of failed writes. Retried are automatically writes that fail on a connection failure or when server returns response HTTP status code >= 429.

Retrying algorithm uses random exponential strategy to set retry time. The delay for the next retry attempt is a random value in the interval retryInterval * exponentialBase^(attempts) and retryInterval * exponentialBase^(attempts+1). If writes of batch repeatedly fails, WriteAPI continues with retrying until maxRetries is reached or the overall retry time of batch exceeds maxRetryTime.

The defaults parameters (part of the WriteOptions) are:

  • retryInterval=5,000ms
  • exponentialBase=2
  • maxRetryDelay=125,000ms
  • maxRetries=5
  • maxRetryTime=180,000ms

Retry delays are by default randomly distributed within the ranges:

  1. 5,000-10,000
  2. 10,000-20,000
  3. 20,000-40,000
  4. 40,000-80,000
  5. 80,000-125,000

Setting retryInterval to 0 disables retry strategy and any failed write will discard the batch.

WriteFailedCallback allows advanced controlling of retrying. It is synchronously notified in case async write fails. It controls further batch handling by its return value. If it returns true, WriteAPI continues with retrying of writes of this batch. Returned false means the batch should be discarded.

Reading async errors

WriteAPI automatically logs write errors. Use Errors() method, which returns the channel for reading errors occuring during async writes, for writing write error to a custom target:

package main

import (
    "fmt"
    "math/rand"
    "time"

    "github.com/influxdata/influxdb-client-go/v2"
)

func main() {
    // Create a new client using an InfluxDB server base URL and an authentication token
    client := influxdb2.NewClient("http://localhost:8086", "my-token")
    // Get non-blocking write client
    writeAPI := client.WriteAPI("my-org", "my-bucket")
    // Get errors channel
    errorsCh := writeAPI.Errors()
    // Create go proc for reading and logging errors
    go func() {
        for err := range errorsCh {
            fmt.Printf("write error: %s\n", err.Error())
        }
    }()
    // write some points
    for i := 0; i < 100; i++ {
        // create point
        p := influxdb2.NewPointWithMeasurement("stat").
            AddTag("id", fmt.Sprintf("rack_%v", i%10)).
            AddTag("vendor", "AWS").
            AddTag("hostname", fmt.Sprintf("host_%v", i%100)).
            AddField("temperature", rand.Float64()*80.0).
            AddField("disk_free", rand.Float64()*1000.0).
            AddField("disk_total", (i/10+1)*1000000).
            AddField("mem_total", (i/100+1)*10000000).
            AddField("mem_free", rand.Uint64()).
            SetTime(time.Now())
        // write asynchronously
        writeAPI.WritePoint(p)
    }
    // Force all unwritten data to be sent
    writeAPI.Flush()
    // Ensures background processes finishes
    client.Close()
}

Blocking write client

Blocking write client writes given point(s) synchronously. It doesn't do implicit batching. Batch is created from given set of points. Implicit batching can be enabled with WriteAPIBlocking.EnableBatching().

package main

import (
    "context"
    "fmt"
    "math/rand"
    "time"

    "github.com/influxdata/influxdb-client-go/v2"
)

func main() {
    // Create a new client using an InfluxDB server base URL and an authentication token
    client := influxdb2.NewClient("http://localhost:8086", "my-token")
    // Get blocking write client
    writeAPI := client.WriteAPIBlocking("my-org","my-bucket")
    // write some points
    for i := 0; i <100; i++ {
        // create data point
        p := influxdb2.NewPoint(
            "system",
            map[string]string{
                "id":       fmt.Sprintf("rack_%v", i%10),
                "vendor":   "AWS",
                "hostname": fmt.Sprintf("host_%v", i%100),
            },
            map[string]interface{}{
                "temperature": rand.Float64() * 80.0,
                "disk_free":   rand.Float64() * 1000.0,
                "disk_total":  (i/10 + 1) * 1000000,
                "mem_total":   (i/100 + 1) * 10000000,
                "mem_free":    rand.Uint64(),
            },
            time.Now())
        // write synchronously
        err := writeAPI.WritePoint(context.Background(), p)
        if err != nil {
            panic(err)
        }
    }
    // Ensures background processes finishes
    client.Close()
}

Queries

Query client offers retrieving of query results to a parsed representation in a QueryTableResult or to a raw string.

QueryTableResult

QueryTableResult offers comfortable way how to deal with flux query CSV response. It parses CSV stream into FluxTableMetaData, FluxColumn and FluxRecord objects for easy reading the result.

package main

import (
    "context"
    "fmt"

    "github.com/influxdata/influxdb-client-go/v2"
)

func main() {
    // Create a new client using an InfluxDB server base URL and an authentication token
    client := influxdb2.NewClient("http://localhost:8086", "my-token")
    // Get query client
    queryAPI := client.QueryAPI("my-org")
    // get QueryTableResult
    result, err := queryAPI.Query(context.Background(), `from(bucket:"my-bucket")|> range(start: -1h) |> filter(fn: (r) => r._measurement == "stat")`)
    if err == nil {
        // Iterate over query response
        for result.Next() {
            // Notice when group key has changed
            if result.TableChanged() {
                fmt.Printf("table: %s\n", result.TableMetadata().String())
            }
            // Access data
            fmt.Printf("value: %v\n", result.Record().Value())
        }
        // check for an error
        if result.Err() != nil {
            fmt.Printf("query parsing error: %s\n", result.Err().Error())
        }
    } else {
        panic(err)
    }
    // Ensures background processes finishes
    client.Close()
}

Raw

QueryRaw() returns raw, unparsed, query result string and process it on your own. Returned csv format can be controlled by the third parameter, query dialect.

package main

import (
    "context"
    "fmt"

    "github.com/influxdata/influxdb-client-go/v2"
)

func main() {
    // Create a new client using an InfluxDB server base URL and an authentication token
    client := influxdb2.NewClient("http://localhost:8086", "my-token")
    // Get query client
    queryAPI := client.QueryAPI("my-org")
    // Query and get complete result as a string
    // Use default dialect
    result, err := queryAPI.QueryRaw(context.Background(), `from(bucket:"my-bucket")|> range(start: -1h) |> filter(fn: (r) => r._measurement == "stat")`, influxdb2.DefaultDialect())
    if err == nil {
        fmt.Println("QueryResult:")
        fmt.Println(result)
    } else {
        panic(err)
    }
    // Ensures background processes finishes
    client.Close()
}

Parametrized Queries

InfluxDB Cloud supports Parameterized Queries that let you dynamically change values in a query using the InfluxDB API. Parameterized queries make Flux queries more reusable and can also be used to help prevent injection attacks.

InfluxDB Cloud inserts the params object into the Flux query as a Flux record named params. Use dot or bracket notation to access parameters in the params record in your Flux query. Parameterized Flux queries support only int , float, and string data types. To convert the supported data types into other Flux basic data types, use Flux type conversion functions.

Query parameters can be passed as a struct or map. Param values can be only simple types or time.Time. The name of the parameter represented by a struct field can be specified by JSON annotation.

Parameterized query example:

โš ๏ธ Parameterized Queries are supported only in InfluxDB Cloud. There is no support in InfluxDB OSS currently.

package main

import (
	"context"
	"fmt"

	"github.com/influxdata/influxdb-client-go/v2"
)

func main() {
	// Create a new client using an InfluxDB server base URL and an authentication token
	client := influxdb2.NewClient("http://localhost:8086", "my-token")
	// Get query client
	queryAPI := client.QueryAPI("my-org")
	// Define parameters
	parameters := struct {
		Start string  `json:"start"`
		Field string  `json:"field"`
		Value float64 `json:"value"`
	}{
		"-1h",
		"temperature",
		25,
	}
	// Query with parameters
	query := `from(bucket:"my-bucket")
				|> range(start: duration(params.start))
				|> filter(fn: (r) => r._measurement == "stat")
				|> filter(fn: (r) => r._field == params.field)
				|> filter(fn: (r) => r._value > params.value)`

	// Get result
	result, err := queryAPI.QueryWithParams(context.Background(), query, parameters)
	if err == nil {
		// Iterate over query response
		for result.Next() {
			// Notice when group key has changed
			if result.TableChanged() {
				fmt.Printf("table: %s\n", result.TableMetadata().String())
			}
			// Access data
			fmt.Printf("value: %v\n", result.Record().Value())
		}
		// check for an error
		if result.Err() != nil {
			fmt.Printf("query parsing error: %s\n", result.Err().Error())
		}
	} else {
		panic(err)
	}
	// Ensures background processes finishes
	client.Close()
}

Concurrency

InfluxDB Go Client can be used in a concurrent environment. All its functions are thread-safe.

The best practise is to use a single Client instance per server URL. This ensures optimized resources usage, most importantly reusing HTTP connections.

For efficient reuse of HTTP resources among multiple clients, create an HTTP client and use Options.SetHTTPClient() for setting it to all clients:

    // Create HTTP client
    httpClient := &http.Client{
        Timeout: time.Second * time.Duration(60),
        Transport: &http.Transport{
            DialContext: (&net.Dialer{
                Timeout: 5 * time.Second,
            }).DialContext,
            TLSHandshakeTimeout: 5 * time.Second,
            TLSClientConfig: &tls.Config{
                InsecureSkipVerify: true,
            },
            MaxIdleConns:        100,
            MaxIdleConnsPerHost: 100,
            IdleConnTimeout:     90 * time.Second,
        },
    }
    // Client for server 1
    client1 := influxdb2.NewClientWithOptions("https://server:8086", "my-token", influxdb2.DefaultOptions().SetHTTPClient(httpClient))
    // Client for server 2
    client2 := influxdb2.NewClientWithOptions("https://server:9999", "my-token2", influxdb2.DefaultOptions().SetHTTPClient(httpClient))

Client ensures that there is a single instance of each server API sub-client for the specific area. E.g. a single WriteAPI instance for each org/bucket pair, a single QueryAPI for each org.

Such a single API sub-client instance can be used concurrently:

package main

import (
	"math/rand"
	"sync"
	"time"

	influxdb2 "github.com/influxdata/influxdb-client-go"
	"github.com/influxdata/influxdb-client-go/v2/api/write"
)

func main() {
    // Create client
    client := influxdb2.NewClient("http://localhost:8086", "my-token")
    // Ensure closing the client
    defer client.Close()

    // Get write client
    writeApi := client.WriteAPI("my-org", "my-bucket")

    // Create channel for points feeding
    pointsCh := make(chan *write.Point, 200)

    threads := 5

    var wg sync.WaitGroup
    go func(points int) {
        for i := 0; i < points; i++ {
            p := influxdb2.NewPoint("meas",
                map[string]string{"tag": "tagvalue"},
                map[string]interface{}{"val1": rand.Int63n(1000), "val2": rand.Float64()*100.0 - 50.0},
                time.Now())
            pointsCh <- p
        }
        close(pointsCh)
    }(1000000)

    // Launch write routines
    for t := 0; t < threads; t++ {
        wg.Add(1)
        go func() {
            for p := range pointsCh {
                writeApi.WritePoint(p)
            }
            wg.Done()
        }()
    }
    // Wait for writes complete
    wg.Wait()
}

Proxy and redirects

You can configure InfluxDB Go client behind a proxy in two ways:

  1. Using environment variable Set environment variable HTTP_PROXY (or HTTPS_PROXY based on the scheme of your server url). e.g. (linux) export HTTP_PROXY=http://my-proxy:8080 or in Go code os.Setenv("HTTP_PROXY","http://my-proxy:8080")

  2. Configure http.Client to use proxy
    Create a custom http.Client with a proxy configuration:

    proxyUrl, err := url.Parse("http://my-proxy:8080")
    httpClient := &http.Client{
        Transport: &http.Transport{
            Proxy: http.ProxyURL(proxyUrl)
        }
    }
    client := influxdb2.NewClientWithOptions("http://localhost:8086", token, influxdb2.DefaultOptions().SetHTTPClient(httpClient))

Client automatically follows HTTP redirects. The default redirect policy is to follow up to 10 consecutive requests. Due to a security reason Authorization header is not forwarded when redirect leads to a different domain. To overcome this limitation you have to set a custom redirect handler:

token := "my-token"

httpClient := &http.Client{
    CheckRedirect: func(req *http.Request, via []*http.Request) error {
        req.Header.Add("Authorization","Token " + token)
        return nil
    },
}
client := influxdb2.NewClientWithOptions("http://localhost:8086", token, influxdb2.DefaultOptions().SetHTTPClient(httpClient))

Checking Server State

There are three functions for checking whether a server is up and ready for communication:

Function Description Availability
Health() Detailed info about the server status, along with version string OSS
Ready() Server uptime info OSS
Ping() Whether a server is up OSS, Cloud

Only the Ping() function works in InfluxDB Cloud server.

InfluxDB 1.8 API compatibility

InfluxDB 1.8.0 introduced forward compatibility APIs for InfluxDB 2.0. This allow you to easily move from InfluxDB 1.x to InfluxDB 2.0 Cloud or open source.

Client API usage differences summary:

  1. Use the form username:password for an authentication token. Example: my-user:my-password. Use an empty string ("") if the server doesn't require authentication.
  2. The organization parameter is not used. Use an empty string ("") where necessary.
  3. Use the form database/retention-policy where a bucket is required. Skip retention policy if the default retention policy should be used. Examples: telegraf/autogen, telegraf. ย 

The following forward compatible APIs are available:

API Endpoint Description
WriteAPI (also WriteAPIBlocking) /api/v2/write Write data to InfluxDB 1.8.0+ using the InfluxDB 2.0 API
QueryAPI /api/v2/query Query data in InfluxDB 1.8.0+ using the InfluxDB 2.0 API and Flux endpoint should be enabled by the flux-enabled option
Health() /health Check the health of your InfluxDB instance

Example

package main

import (
    "context"
    "fmt"
    "time"

    "github.com/influxdata/influxdb-client-go/v2"
)

func main() {
    userName := "my-user"
    password := "my-password"
     // Create a new client using an InfluxDB server base URL and an authentication token
    // For authentication token supply a string in the form: "username:password" as a token. Set empty value for an unauthenticated server
    client := influxdb2.NewClient("http://localhost:8086", fmt.Sprintf("%s:%s",userName, password))
    // Get the blocking write client
    // Supply a string in the form database/retention-policy as a bucket. Skip retention policy for the default one, use just a database name (without the slash character)
    // Org name is not used
    writeAPI := client.WriteAPIBlocking("", "test/autogen")
    // create point using full params constructor
    p := influxdb2.NewPoint("stat",
        map[string]string{"unit": "temperature"},
        map[string]interface{}{"avg": 24.5, "max": 45},
        time.Now())
    // Write data
    err := writeAPI.WritePoint(context.Background(), p)
    if err != nil {
        fmt.Printf("Write error: %s\n", err.Error())
    }

    // Get query client. Org name is not used
    queryAPI := client.QueryAPI("")
    // Supply string in a form database/retention-policy as a bucket. Skip retention policy for the default one, use just a database name (without the slash character)
    result, err := queryAPI.Query(context.Background(), `from(bucket:"test")|> range(start: -1h) |> filter(fn: (r) => r._measurement == "stat")`)
    if err == nil {
        for result.Next() {
            if result.TableChanged() {
                fmt.Printf("table: %s\n", result.TableMetadata().String())
            }
            fmt.Printf("row: %s\n", result.Record().String())
        }
        if result.Err() != nil {
            fmt.Printf("Query error: %s\n", result.Err().Error())
        }
    } else {
        fmt.Printf("Query error: %s\n", err.Error())
    }
    // Close client
    client.Close()
}

Contributing

If you would like to contribute code you can do through GitHub by forking the repository and sending a pull request into the master branch.

License

The InfluxDB 2 Go Client is released under the MIT License.

influxdb-client-go's People

Contributors

aliriegray avatar andig avatar bednar avatar benbjohnson avatar cnbailian avatar dependabot[bot] avatar diamondburned avatar docmerlin avatar georgemac avatar gianarb avatar ivankudibal avatar jdstrand avatar jonathan-dev avatar lesam avatar naveensrinivasan avatar nigelzhang avatar oskarwojciski avatar pabigot avatar peerxu avatar powersj avatar reimda avatar seth-hunter avatar shubhamcoc avatar sniperking1234 avatar soolaugust avatar soudrug avatar sranka avatar sspaink avatar tysonkamp avatar vlastahajek avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

influxdb-client-go's Issues

Option to run arbitrary commands

Hello!

I'm using "github.com/influxdata/influxdb/client/v2" and I'm quite disappointed to see old client completely removed.

I like new client but there are some features which aren't available in new one.

Previously, you had option to run arbitrary commands using client:

    q := influx_client.Query{
        Command:  cmd,
        Database: database_name,
    }
    if response, err := influx_client_connection.Query(q); err == nil {
        res = response.Results
    } 

Unfortunately, it's not possible with new client.

How can I implement it?

Thank you!

Cannot login with user/password

Specifying correct credentials for admin member, the login fails using development branch:

http := &http.Client{Timeout: writeTimeout}
options := []influxdb.Option{
    influxdb.WithAddress(url),
    influxdb.WithUserAndPass(user, password),
}
client, err := influxdb.New(http, options...)

result:

unauthorized: unauthorized access

Help with QueryCSV

Hello,
I'd like to know if this snippet is a correct way to use the QueryCSV function:

type influxRecord struct {
	Zone   ***string `flux:"name" json:"zone"`
	Stop   time.Time `flux:"_stop" json:"-"`
	Start  time.Time `flux:"_start" json:"-"`
	Time   time.Time `flux:"_time" json:"date"`
	HostIP string    `flux:"host_ip" json:"-"`
	Count  int32     `flux:"_value" json:"count"`
}


q := fmt.Sprintf(
	`from(bucket: "%s")
		|> range(start: -1h)
		|> filter(fn: (r) => r._measurement == "occupation" and r._field == "%s")
		|> last()`, me.c.InfluxDB.PullBucket, sensor)

response, err := me.cli.QueryCSV(
	context.Background(),
	q,
	me.c.InfluxDB.Org,
)

r := influxRecord{}
for response.Next() {
		err = response.Unmarshal(&r)
		if err != nil {
			...
		}
}

I'm having this error flux: unsupported type: is not supported to generate flux, try a map or a struct with public keys at the moment.

Thank you.

edit: I realized my error right after having posted the issue. Anyway since there are very few examples, except the tests, I put the final snippet as an example. It may helps someone.

Deadlock case LPWriter.Write

see:

w.lock.Lock()
i, err := w.enc.Write(name, ts, tagKeys, tagVals, fieldKeys, fieldVals)
// asyncronously flush if the size of the buffer is too big.
if err != nil {
return i, err
}
w.asyncFlush()
w.lock.Unlock()

Either this Unlock needs to be defered or Unlock needs to be called at all return points in this function. Otherwise we will cause a deadlock.

Same goes for WriteMetrics:

w.lock.Lock()
for i := range m {
j, err := w.enc.Encode(m[i])
if err != nil {
return j, err
}
}
w.asyncFlush()
w.lock.Unlock()

arbitrary " added to tag fields

I'm inserting large amounts of data via the golang library, and I'm seeing arbitrary trailing " characters added to some of the tag fields.

I've done step-by-step debugging to ensure that the " is not there when the influxdb.Metric() object is built and submitted to the influxdb Client write method, but the quotes still show up (not on all tags, just some).

myMetrics = []influxdb.Metric{ influxdb.NewRowMetric( map[string]interface{}{"confirmed": confirmed, "deaths": dead, "recovered": recovered, "lat": latitude, "lon": longitude}, meas, map[string]string{ "state_province": Case.Province, "country_region": Case.Country, "s2_cell_id": cell, "last_update": stringTime, }, t), }

I've verified that Case.Province does NOT have extra trailing quotes, I've run the string through strings.Trim() as well as strings.Replace() to ensure the " character is not there, but it still ends up in the database.

can't get the package

go get -u github.com/influxdata/influxdb-client-go
# github.com/influxdata/influxdb-client-go/internal/ast
..\github.com\influxdata\influxdb-client-go\internal\ast\ast.go:33:10: v.MapRange undefined (type reflect.Value has no field or method MapRange)
..\github.com\influxdata\influxdb-client-go\internal\ast\ast.go:243:10: v.MapRange undefined (type reflect.Value has no field or method MapRange)

go get -u runs berserk

Consider this- takes almost a hundred dependencies:

โฏ mkdir foo && cd foo

~/foo
โฏ go mod init foo
go: creating new go.mod: module foo

~/foo
โฏ go get -u github.com/influxdata/influxdb-client-go@develop
go: finding github.com/influxdata/influxdb-client-go develop
go: finding github.com/influxdata/tdigest latest
go: finding github.com/influxdata/line-protocol latest
go: finding golang.org/x/crypto latest
go: finding golang.org/x/sync latest
go: finding github.com/andreyvit/diff latest
go: finding gonum.org/v1/netlib latest
go: finding golang.org/x/tools latest
go: finding golang.org/x/exp latest
go: finding github.com/pkg/term latest
go: finding golang.org/x/net latest
go: finding github.com/apache/arrow/go/arrow latest
go: finding golang.org/x/sys latest
go: finding gopkg.in/check.v1 latest
go: finding gonum.org/v1/gonum latest
go: finding github.com/remyoudompheng/bigfft latest
go: finding golang.org/x/mobile latest
go: finding github.com/kevinburke/ssh_config latest
go: finding github.com/BurntSushi/xgb latest
go: finding github.com/alecthomas/units latest
go: finding github.com/jbenet/go-context latest
go: finding golang.org/x/image latest
go: finding github.com/anmitsu/go-shlex latest
go: finding github.com/alcortesm/tgz latest
go: finding github.com/blakesmith/ar latest
go: finding github.com/alecthomas/template latest
go: finding golang.org/x/xerrors latest
go: finding golang.org/x/oauth2 latest
go: finding github.com/jpillora/backoff latest
go: finding github.com/eapache/go-xerial-snappy latest
go: finding github.com/aybabtme/rgbterm latest
go: finding github.com/armon/consul-api latest
go: finding github.com/flynn/go-shlex latest
go: finding github.com/aphistic/golf latest
go: finding github.com/mattn/go-tty latest
go: finding github.com/tj/assert latest
go: finding github.com/smartystreets/go-aws-auth latest
go: finding github.com/campoy/unique latest
go: finding gopkg.in/tomb.v1 latest
go: finding github.com/jmespath/go-jmespath latest
go: finding contrib.go.opencensus.io/exporter/aws latest
go: finding google.golang.org/genproto latest
go: finding gonum.org/v1/plot latest
go: finding github.com/golang/freetype latest
go: finding github.com/ajstarks/svgo latest
go: finding github.com/GoogleCloudPlatform/cloudsql-proxy latest
go: finding github.com/golang/glog latest
go: finding github.com/tmc/grpc-websocket-proxy latest
go: finding github.com/xdg/scram latest
go: finding golang.org/x/lint latest
go: finding github.com/coreos/go-systemd latest
go: finding golang.org/x/time latest
go: finding github.com/tj/go-kinesis latest
go: finding github.com/mgutz/ansi latest
go: finding github.com/jstemmer/go-junit-report latest
go: finding github.com/coreos/pkg latest
go: finding github.com/mattn/go-ieproxy latest
go: finding github.com/google/pprof latest
go: finding github.com/codahale/hdrhistogram latest
go: finding github.com/xiang90/probing latest
go: finding github.com/golang/groupcache latest
go: finding github.com/kr/logfmt latest
go: finding github.com/tj/go-elastic latest
go: finding github.com/dgryski/go-sip13 latest
go: finding github.com/prometheus/client_model latest
go: finding github.com/armon/go-socks5 latest
go: finding github.com/mwitkow/go-conntrack latest
go: finding github.com/ruudk/golang-pdf417 latest
go: finding github.com/modern-go/concurrent latest
go: finding github.com/rcrowley/go-metrics latest
go: finding github.com/streadway/amqp latest
go: finding istio.io/gogo-genproto latest
go: downloading github.com/influxdata/influxdb-client-go v0.0.2-0.20190805165203-23da33b60c81
go: extracting github.com/influxdata/influxdb-client-go v0.0.2-0.20190805165203-23da33b60c81

Implement SignIn()

Closes #45

All actions for influxdb 2.0 need to be authenticated either via a token or session (and in the future JWT).

Client currently only supports Setup() which completes onboarding procedure for new user / org / bucket creation. Calling this does retrieve a session token and subsequently uses it for auth. However, this is only suitable once per unique user + org + bucket combo.

We should leverage the /signin basic auth api call when username is provided via WithUsernameAndPassword and no authentication has been set.

Example:

func (c *Client) SignIn() error {
    c.mu.Lock()
    defer c.mu.Unlock()

    //...
    resp, err := c.do(req)
    // ...
    c.session = token
}

func WithUsernamePassword(username, password string) Option {
    return func(c *Client) {
        c.username = username
        c.password = password
        c.once = sync.Once{}
    }
}

func (c *Client) signIn() {
    c.once.Do(func(){
        if err := c.SignIn(); err != nil {
            c.logger.Error(err)
        }
    })
}

func (c *Client) Write(...) {
    c.signIn()
}

Update:

For reference /signin returns session via Set-Cookie: session=token

Another Update:

Need to actually use this token as a session cookie. Consider using https://golang.org/pkg/net/http/cookiejar/

CSVReader incorrectly maps rows and column names

A query with the resulting CSV:

,result,table,_start,_stop,uptime,_time
,_result,0,2020-01-27T05:04:48.70477677Z,2020-01-27T17:04:48.70477677Z,0.5454545454545454,2020-01-27T06:00:00Z
,_result,0,2020-01-27T05:04:48.70477677Z,2020-01-27T17:04:48.70477677Z,1,2020-01-27T07:00:00Z

yields incorrect row and column name information:

ColNames: [result table _start _stop uptime _time]
Row: [  0 2020-01-27T04:49:51.607050038Z 2020-01-27T16:49:51.607050038Z 0 2020-01-27T05:00:00Z]

It seems that the preceding comma creates an extra empty element in the Row slice, offsetting each row by 1 index.

QueryCSV panic

image

in query.go line 37, it will panic if resp is nil, because you use defer function to close resp body.
defer function should be follow the error, like this:
image

Implement Token Support

Currently the only authentication (which is broken see #45) we support is username + password combination.

see:

c.authorization = "Token " + setupResult.Auth.Token

We should also add support for token based authentication.

i.e. Authorization: Token xxxxxx

Perhaps a WithToken(token string) here: https://github.com/influxdata/influxdb-client-go/blob/develop/clientoptions.go#L116 which just sets a static token on client.authorization.

`ping` is broken

I have checked the code, ping is simply HTTP Geting the url: ip:port/api/v2/ready, and then a 401 {"code":"unauthorized","message":"unauthorized access"} is returned.

When I try this in the browser after login, a 404 {"code":"not found","message":"path not found"} is returned.

I also check the url: ip:port/ready is worked as expected without Auth.

Is this a client problem, or I should wait for the api/v2/ready implement in influxdb?

Retries cause metrics payload to duplicate

Due to the where the goto statement is placed, in the event of a retry the metrics are encoded again to the same buffer. This leads to duplicate metrics in the event of retries.

doRequest:
select {
case <-ctx.Done():
return ctx.Err()
default:
}
for i := range m {
if _, err := e.Encode(m[i]); err != nil {
return err
}
}

Add Retry Implementation of writer.BucketMetricWriter

The root influxdb.Client implements the writer.BucketMetricWriter interface (https://github.com/influxdata/influxdb-client-go/blob/develop/writer/writer.go#L11).
A new decorating implementation is required which handles automatically retrying on error conditions.

This was previously implemented in the influxdb.Client, however, it was removed in the buffered writer refactor. This should now be reinstated as a decorator for the writer.BucketMetricWriter interface.

Should expose a method to just compile Flux

I would like to have an endpoint to validate that some Flux is, at least, syntactically correct.

This would be great for users, because your go module shouldn't depend on Flux to validate a script, and you shouldn't care about versioning, but just request to your InfluxDB server ๐Ÿ‘

Unable to install influxdb-client-go package due to v.MapRange undefined error

$ go get github.com/influxdata/influxdb-client-go

github.com/influxdata/influxdb-client-go/internal/ast

../github.com/influxdata/influxdb-client-go/internal/ast/ast.go:33:10: v.MapRange undefined (type reflect.Value has no field or method MapRange)
../github.com/influxdata/influxdb-client-go/internal/ast/ast.go:243:10: v.MapRange undefined (type reflect.Value has no field or method MapRange)

Can't set precision

According to the docs: the API supports setting precision. But, I couldn't find any function exposed for doing the same here :(

Does this precision need to be set at a bucket level? Any more tips would be appreciated. @russorat

Is it the client's purpose to handle the whole HTTP API?

We're writing a small terraform provider for influx v2 to automate the creation of buckets, etc. Hence, we're trying to use this client as much as possible.

There's already a file for the setup/ route, but only for this one. Do you intend to add support for all other routes, and should we contribute to that, or should we write our own http requests because the goal of the client is simply to allow handy read/writes (and secondary operations such as setup)?

docs: Update README to clearly mention the 1.x client repository

package main

import (
	"context"
	"github.com/influxdata/influxdb-client-go"
	"log"
	"net/http"
	"time"
)

func main() {

	var myHTTPClient *http.Client

	influx, err := influxdb.New(myHTTPClient, influxdb.WithAddress("http://localhost:8086"), influxdb.WithToken("mytoken"))
	if err != nil {
		panic(err) // error handling here, normally we wouldn't use fmt, but it works for the example
	}

	// we use client.NewRowMetric for the example because its easy, but if you need extra performance
	// it is fine to manually build the []client.Metric{}.
	myMetrics := []influxdb.Metric{
		influxdb.NewRowMetric(
			map[string]interface{}{"memory": 1000, "cpu": 0.93},
			"system-metrics",
			map[string]string{"hostname": "hal9000"},
			time.Date(2018, 3, 4, 5, 6, 7, 8, time.UTC)),
		influxdb.NewRowMetric(
			map[string]interface{}{"memory": 1000, "cpu": 0.93},
			"system-metrics",
			map[string]string{"hostname": "hal9000"},
			time.Date(2018, 3, 4, 5, 6, 7, 9, time.UTC)),
	}

	// The actual write..., this method can be called concurrently.
	if err := influx.Write(context.Background(), "my-awesome-bucket", "my-very-awesome-org", myMetrics...); err != nil {
		log.Fatal(err) // as above use your own error handling here.
	}
	influx.Close() // closes the client.  After this the client is useless.
}

ERROR 2019/05/10 01:07:57 json: cannot unmarshal number into Go value of type influxdb.genericRespError exit status 1

E2E Test Passes But Raises Exception In Influx

E2E test output:

#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string,string,string,string
#group,false,false,true,true,false,false,false,false,false,false,false,false
#default,_result,,,,,,,,,,,
,result,table,_start,_stop,_time,_value,_field,_measurement,ktest1,ktest2,"ktest2,k-test3",ktest3
,,0,2019-06-21T19:44:37.421358Z,2019-08-02T11:44:37.421358Z,2019-08-02T11:43:32.396033Z,3,ftest1,test,k-test1,k-test2,,

PASS
ok  	github.com/influxdata/influxdb-client-go	5.112s

InfluxDB revision 945f16ff4be321e5ff4eda783be03ac27d9e68b7

Influxdb logs:

2019-08-02T11:44:37.442925Z	info	Error writing response to client	{"log_id": "0H0YANUG000", "handler": "query", "handler": "flux", "error": "csv encoder error: expected integer cursor type, got *reads.stringMultiShardArrayCursor", "errorVerbose": "expected integer cursor type, got *reads.stringMultiShardArrayCursor\ncsv encoder error\ngithub.com/influxdata/flux/csv.wrapEncodingError\n\t/Users/georgemac/go/pkg/mod/github.com/influxdata/[email protected]/csv/result.go:753\ngithub.com/influxdata/flux/csv.(*ResultEncoder).Encode.func1\n\t/Users/georgemac/go/pkg/mod/github.com/influxdata/[email protected]/csv/result.go:843\ngithub.com/influxdata/flux/execute.(*result).Do\n\t/Users/georgemac/go/pkg/mod/github.com/influxdata/[email protected]/execute/result.go:70\ngithub.com/influxdata/influxdb/query/control.(*errorCollectingTableIterator).Do\n\t/Users/georgemac/github/influxdata/influxdb/query/control/controller.go:803\ngithub.com/influxdata/flux/csv.(*ResultEncoder).Encode\n\t/Users/georgemac/go/pkg/mod/github.com/influxdata/[email protected]/csv/result.go:771\ngithub.com/influxdata/flux.(*DelimitedMultiResultEncoder).Encode\n\t/Users/georgemac/go/pkg/mod/github.com/influxdata/[email protected]/result.go:287\ngithub.com/influxdata/influxdb/query.ProxyQueryServiceAsyncBridge.Query\n\t/Users/georgemac/github/influxdata/influxdb/query/bridges.go:103\ngithub.com/influxdata/influxdb/http.(*FluxHandler).handleQuery\n\t/Users/georgemac/github/influxdata/influxdb/http/query_handler.go:167\nnet/http.HandlerFunc.ServeHTTP\n\t/usr/local/Cellar/go/1.12.6/libexec/src/net/http/server.go:1995\ngithub.com/NYTimes/gziphandler.GzipHandlerWithOpts.func1.1\n\t/Users/georgemac/go/pkg/mod/github.com/!n!y!times/[email protected]/gzip.go:289\nnet/http.HandlerFunc.ServeHTTP\n\t/usr/local/Cellar/go/1.12.6/libexec/src/net/http/server.go:1995\ngithub.com/julienschmidt/httprouter.(*Router).Handler.func1\n\t/Users/georgemac/go/pkg/mod/github.com/julienschmidt/[email protected]/params_go17.go:26\ngithub.com/julienschmidt/httprouter.(*Router).ServeHTTP\n\t/Users/georgemac/go/pkg/mod/github.com/julienschmidt/[email protected]/router.go:334\ngithub.com/influxdata/influxdb/http.(*APIHandler).ServeHTTP\n\t/Users/georgemac/github/influxdata/influxdb/http/api_handler.go:262\ngithub.com/influxdata/influxdb/http.(*AuthenticationHandler).ServeHTTP\n\t/Users/georgemac/github/influxdata/influxdb/http/authentication_middleware.go:89\ngithub.com/influxdata/influxdb/http.(*PlatformHandler).ServeHTTP\n\t/Users/georgemac/github/influxdata/influxdb/http/platform_handler.go:71\ngithub.com/influxdata/influxdb/http.(*Handler).ServeHTTP\n\t/Users/georgemac/github/influxdata/influxdb/http/handler.go:151\ngithub.com/influxdata/influxdb/http.DebugFlush.func1\n\t/Users/georgemac/github/influxdata/influxdb/http/debug.go:22\nnet/http.HandlerFunc.ServeHTTP\n\t/usr/local/Cellar/go/1.12.6/libexec/src/net/http/server.go:1995\nnet/http.serverHandler.ServeHTTP\n\t/usr/local/Cellar/go/1.12.6/libexec/src/net/http/server.go:2774\nnet/http.(*conn).serve\n\t/usr/local/Cellar/go/1.12.6/libexec/src/net/http/server.go:1878\nruntime.goexit\n\t/usr/local/Cellar/go/1.12.6/libexec/src/runtime/asm_amd64.s:1337"}

Unconfigurable timeout causing long queries to fail.

Timeout: time.Second * 20,

The Line above is unconfigurable through the provided API-Interface and it essentially seems to force all queries to be <=20 seconds. Am I missing something or is this configurable? I can't seem to get around it.

I constantly get the error:
Post "https://localhost/api/v2/query?org=testing_queries": context deadline exceeded (Client.Timeout exceeded while awaiting headers)

client.QueryCSV fails for multiple yields

I have a flux query which returns 2 yields in a single script as follows:
(Assuming all the values are sent)

workbench = from(bucket: "test")
  |> range(start: v.timeStart, stop: v.timeEnd)
  |> filter(fn: (r) => r._measurement == "mem")
  |> filter(fn: (r) => r.host == v.host)

count = workbench
    |> count()
    |> yield(name:"count")
data = workbench
    |> limit(n: v.limit, offset : v.offset)
    |> sort(columns: ["_time"], desc: v.isDescending)
    |> yield(name:"data")

From the resulting query I get 2 series in the HTTP request. But the package fails to
read the second series returning the error record on line 61: wrong number of fields. So I tried setting the csvreader with FieldsPerRecord as -1 (as per this). It fails for that too. Need help.

Can't write float values to Influx Cloud

The following doesn't work for me:

p := influxdb2.NewPoint("sensor-data",
		map[string]string{"id": "1"},
		map[string]interface{}{"temp": 45.5,"lightIntensity": 120},
		time.Now())

Only the lightIntensity is written to the cloud, any float value does not work. Issue only came up with the newest client.

proposal: Make buffered writer one time use

As per suggestion in #28

The semantics of Start and Stop are a little tricky. Ideally Start and Stop need to be synchronized and flip/flop between being able to be called. The current implementation raises a number of race and deadlock hazards and is a good example of how tricky it is to implement.

That said I see little value in having Start and Stop semantics. Personally I would like to start a periodic flushing buffer and eventually close it. If I need another, I would just create a new instance. Without a solid use-case for Start and Stop I would suggest that we remove it in favor of something like:

type LPWriter struct {}

func (*LPWriter) Start() {
//...
}

func (*LPWriter) Flush() {
//...
}

func (*LPWriter) Close() error { return nil }

A further suggestion would be that we remove periodic flushing altogether. In favor of just a Flush method. Then we write a type which takes a type WriteFlusher interface {} and moves periodic flushing respoinsibility elsewhere.

Thoughts?

writer keep return last error without request influxdb API

Hi, I use the PointWriter to write metrics with an underlying buffer to influxdb.

Here is the code

client, _ := influxdb.New(config.Endpoint, config.Token)
writer := influxdb_writer.New(client, config.Bucket, config.Organization)

Then I got something weird. When the writer returns an error from influxdb (for instance, exceeded rate limit), then every time the writer writes, it returns the last error, even the influxdb service back to normal. And there is no more network connection with influxdb service.

Then I found this code:

if p.err != nil {
return 0, p.err
}
// check if the underlying flush will flush
if len(m) > p.w.Available() {
// tell the ticker to reset flush interval
select {
case p.resetTick <- struct{}{}:
default:
}
}
var n int
n, p.err = p.w.Write(m...)
return n, p.err

Once the p.err is an error, there is no way that it will invoke the underlying writer to write metrics. And there are many places in code like that.

I have no idea what's going on. And I miss something?

Concept: Metric vs Measurement

Docs, Fluxlang, and InfluxDB's TSDB all use the concept of Measurement, rarely use the concept of Metric.

Now using the concept of Metric in the go client may be confusing to users.

// Metric is just a github.com/influxdata/line-protocol.Metric.
// We alias here to keep abstractions from leaking.
type Metric = lp.Metric
// Tag is just a github.com/influxdata/line-protocol.Tag.
// We alias here to keep abstractions from leaking.
type Tag = lp.Tag
// Field is just a github.com/influxdata/line-protocol.Field.
// We alias here to keep abstractions from leaking.
type Field = lp.Field
// RowMetric is a Metric,
// that has methods to make it easy to add tags and fields
type RowMetric struct {
NameStr string
Tags []*lp.Tag
Fields []*lp.Field
TS time.Time
}

Need to defer waitgroup decrement

See:

w.wg.Add(1)
w.lock.Lock()
if w.buf.Len() == 0 {
w.lock.Unlock()
return nil
}
buf := w.buf.Buffer
w.buf.Buffer = bufferPool.Get().(*bytes.Buffer)
w.lock.Unlock()
err := w.flush(ctx, buf)
if err != nil {
return err
}
if buf.Len() <= maxPooledBuffer {
buf.Reset()
bufferPool.Put(buf)
}
w.wg.Done()

There are two early exit points on non-nil errors which will lead to this waitgroup not getting dremented. This will ultimately lead to a deadlock on Wait() in Stop().

Should defer w.wg.Done() asap

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.