Giter Club home page Giter Club logo

instream's Introduction

Instream

InfluxDB driver for Elixir

InfluxDB Support

Tested InfluxDB versions:

  • 1.7.11
  • 1.8.10
  • 2.0.9
  • 2.1.1
  • 2.2.0
  • 2.3.0
  • 2.4.0
  • 2.5.1
  • 2.6.1
  • 2.7.6

Package Setup

To use Instream with your projects, edit your mix.exs file and add the required dependencies:

defp deps do
  [
    # ...
    {:instream, "~> 2.0"},
    # ...
  ]
end

Testing

To run the tests you need to have HTTP authentication enabled.

The following environment variables are used to select some test suites and the InfluxDB version under test:

  • INFLUXDB_HOST: the hostname where the InfluxDB can be reached (e.g. localhost)
  • INFLUXDB_PORT: the port where InfluxDB receives queries on (e.g. 8086)
  • INFLUXDB_SCHEME: the scheme (protocol) to connect to InfluxDB with (e.g. http or https)
  • INFLUXDB_VERSION: the tested InfluxDB version as major.minor, e.g. "1.8", "2.0", or "2.4", use "cloud" for testing against an InfluxDB Cloud instance
  • INFLUXDB_V1_DATABASE: the database used for InfluxDB v1.x tests (will receive a DROP and CREATE during test start!)
  • INFLUXDB_V1_PASSWORD: password for the INFLUXDB_V1_USERNAME account
  • INFLUXDB_V1_PORT_UDP: the UDP port used for writer testing (should be configured to write to INFLUXDB_V1_DATABASE)
  • INFLUXDB_V1_SOCKET: path to the InfluxDB unix socket (InfluxDB 1.8 only)
  • INFLUXDB_V1_USERNAME: username with admin privileges for the InfluxDB test instance
  • INFLUXDB_V2_BUCKET: the bucket used for InfluxDB v2.x tests
  • INFLUXDB_V2_DATBASE: the mapped database used for testing the legacy API
  • INFLUXDB_V2_ORG: organization associated with the INFLUXDB_V2_BUCKET
  • INFLUXDB_V2_RETENTION: the retention policy associated with the INFLUXDB_V2_DATBASE
  • INFLUXDB_V2_TOKEN: the authentication token used

Usage

Connections

To connect to an InfluxDB server you need a connection module:

defmodule MyConnection do
  use Instream.Connection, otp_app: :my_app
end

The :otp_app name and the name of the module can be freely chosen but have to be linked to a corresponding configuration entry. This defined connection module needs to be hooked up into your supervision tree:

children = [
  # ...
  MyConnection,
  # ...
]

Example of the matching configuration entry:

# InfluxDB v2.x
config :my_app, MyConnection,
  auth: [method: :token, token: "my_token"],
  bucket: "my_default_bucket",
  org: "my_default_org",
  host: "my.influxdb.host",
  version: :v2

# InfluxDB v1.x
config :my_app, MyConnection,
  auth: [username: "my_username", password: "my_password"],
  database: "my_default_database",
  host: "my.influxdb.host"

More details on connections and configuration options can be found with the Instream.Connection and Instream.Connection.Config modules.

Queries

# Flux query
MyConnection.query(
  """
    from(bucket: "#{MyConnection.config(:bucket)}")
    |> range(start: -5m)
    |> filter(fn: (r) =>
      r._measurement == "instream_examples"
    )
    |> first()
  """
)

# InfluxQL query
MyConnection.query("SELECT * FROM instream_examples")

A more detailed documentation on queries (reading/writing/options) is available in the documentation for the modules Instream and Instream.Connection.

Series Definitions

If you do not want to define the raw maps for writing data you can pre-define a series for later usage:

defmodule MySeries do
  use Instream.Series

  series do
    measurement "my_measurement"

    tag :bar
    tag :foo

    field :value
  end
end

More information about series definitions can be found in the module documentation of Instream.Series.

Writing Series Points

You can then use this module to assemble a data point (one at a time) for writing:

data = %MySeries{}
data = %{data | fields: %{data.fields | value: 17}}
data = %{data | tags: %{data.tags | bar: "bar", foo: "foo"}}

And then write one or many at once:

MyConnection.write(point)
MyConnection.write([point_1, point_2, point_3])

License

Apache License, Version 2.0

instream's People

Contributors

agramichael avatar astery avatar duff avatar gazler avatar h4cc avatar jeffdeville avatar lazarristickantox avatar ma233 avatar mneudert avatar nallwhy avatar nikhilbelchada avatar odarriba avatar optikfluffel avatar patchkord avatar quangdatv avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

instream's Issues

Difficulty specifying timeout for write operation

Hi there,

I was hoping you might be able to help me understand if I'm doing something incorrectly. I'm writing about 1500 points per minute to Influx and everything is working great. If I attempt a lot more volume (+60,000 writes/minute) I'm getting some timeouts.

I was trying to increase the timeout to see if it would affect things. I've been experimenting by increasing every timeout I could find, but it doesn't seem to be affecting anything. So my config has:

config :lava_consumer, LavaConsumer.Influx.Connection,
  ...
  pool:      [ max_overflow: 20, size: 20 ],
  http_opts: [ recv_timeout: 50000 ],
  query_timeout: 50000,
  writer:    Instream.Writer.Line
  ...

And when I do the write, I attempted to override things too:

      :ok = Influx.Connection.write(points, Keyword.put(opts, :timeout, 50000))

But I'm still getting this error:

2017-06-09T17:07:02.589316+00:00 app[consumer.1]: 17:07:02.589 [info] event#terminate=#PID<0.30109.2> reason={%RuntimeError{message: "Raised error: {:timeout, {GenServer, :call, [#PID<0.30931.2>, {:execute, %Instream.Query{method: :get, opts: [database: nil, timeout: 50000], payload: %{points: [%{fields: %{delta: 1}, measurement: \"retain_deltas\", tags: %{organization_key: \"qb592Bc4v7WGNbrKYUkTVU36We\", payment_method_type: \"CreditCard\"}, timestamp: 1497028016372269271}]}, type: :write}, [database: nil]}, 5000]}}"}, [{LavaConsumer.MessageHandler, :handle_failure, 1, [file: 'lib/lava_consumer/message_handler.ex', line: 18]}, {LavaConsumer.MessageHandler, :handle_messages, 1, [file: 'lib/lava_consumer/message_handler.ex', line: 10]}, {Kaffe.Worker, :handle_cast, 2, [file: 'lib/kaffe/group_member/worker/worker.ex', line: 35]}, {:gen_server, :try_dispatch, 4, [file: 'gen_server.erl', line: 601]}, {:gen_server, :handle_msg, 5, [file: 'gen_server.erl', line: 667]}, {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 247]}]}
2017-06-09T17:07:02.590917+00:00 app[consumer.1]: 17:07:02.590 [error] GenServer :partition_worker_8 terminating
2017-06-09T17:07:02.590926+00:00 app[consumer.1]: ** (RuntimeError) Raised error: {:timeout, {GenServer, :call, [#PID<0.30931.2>, {:execute, %Instream.Query{method: :get, opts: [database: nil], payload: %{points: [%{fields: %{delta: 1}, measurement: "retain_deltas", tags: %{organization_key: "qb592Bc4v7WGNbrKYUkTVU36We", payment_method_type: "CreditCard"}, timestamp: 1497028016372269271}]}, type: :write}, [database: nil]}, 5000]}}

I see a 5000 in that error which seems to indicate that I'm not specifying things correctly.

Any chance you might point me in the right direction?

Thanks!!

Flux Queries With V1 Client Against OSS 2.x

The org is not added to the URL parameters when using a connection configured for v1 against an InfluxDB OSS 2.x.
The API documentation states that either one of those is required in the query parameters.

defmodule V1 do
  use Instream.Connection,
    config: [
      auth: [
        method: :token,
        token: System.fetch_env!("LB_INFLUXDB_LOCAL_TOKEN")
      ],
      org: "instream_test",
      version: :v1,
      database: "mapped_database"
    ]
end

V1.query(
  """
  import "influxdata/influxdb/schema"

  schema.measurements(bucket: "test_bucket")
  """,
  query_language: :flux
)

%{code: "invalid", message: "failed to decode request body: Please provide either orgID or org"}

I'm currently working on a fix.

Another hackney issue?

[error] GenServer #PID<0.471.0> terminating ** (FunctionClauseError) no function clause matching in :hackney_headers_new.do_fold/3 (hackney) /home/home/dev2/phoenix/installer/proj/deps/hackney/src/hackney_headers_new.erl:147: :hackney_headers_new.do_fold([[{3, "Content-Type", 116}], [{4, "Content-Type", 101}], [{5, "Content-Type", 120}], [{6, "Content-Type", 116}], [{7, "Content-Type", 47}], [{8, "Content-Type", 112}], [{9, "Content-Type", 108}], [{10, "Content-Type", 97}], [{11, "Content-Type", 105}], [{12, "Content-Type", 110}]], #Function<6.29524015/3 in :hackney_headers_new.to_iolist/1>, [["Content-Length", ': ', "54", '\r\n'], ["User-Agent", ': ', "hackney/1.7.1", '\r\n'], ["Host", ': ', "localhost:8086", '\r\n']])

Bug or am I missing something?

Pool Spec is broken.

** (Mix) Could not start application app: App.start(:normal, []) returned an error: bad start spec: invalid mfa: {{App.Influx, {:poolboy, :start_link, [[worker_module: Instream.Pool.Worker, name: {:local, App.Influx.Pool}, size: 5, max_overflow: 10], [module: App.Influx, writer: Instream.Writer.JSON, otp_app: :mixdown, hosts: ["localhost"], pool: [max_overflow: 0, size: 1], port: 8086, scheme: "http"]]}, :permanent, 5000, :worker, [:poolboy]}, :start_link, []}

According to @josevalim, the mfa spec should be: {module, atom, args}. I don't claim to fully understand the implementation details related to this, but I can't seem to make the module work under Erlang/OTP 17 / Elixir 1.0.5.

Influxdb may return a value without point character in `double` type field

Influxdb returned a "35" as value in a double type field. The original value I wrote by connection was String.to_float("35.00").

** (ArgumentError) errors were found at the given arguments:

  * 1st argument: not a textual representation of a float

    :erlang.binary_to_float("35")
    (instream 2.0.0-dev) lib/instream/decoder/csv.ex:58: Instream.Decoder.CSV.parse_datatypes/1
    (elixir 1.12.3) lib/enum.ex:1582: Enum."-map/2-lists^map/1-0-"/2
    (elixir 1.12.3) lib/enum.ex:1582: Enum."-map/2-lists^map/1-0-"/2
    (instream 2.0.0-dev) lib/instream/decoder/csv.ex:80: anonymous fn/4 in Instream.Decoder.CSV.parse_rows/1
    (elixir 1.12.3) lib/enum.ex:1582: Enum."-map/2-lists^map/1-0-"/2
    (elixir 1.12.3) lib/enum.ex:1582: Enum."-map/2-lists^map/1-0-"/2
    (instream 2.0.0-dev) lib/instream/connection/query_runner_v2.ex:75: Instream.Connection.QueryRunnerV2.read/3

Some version info:

{:instream, github: "mneudert/instream", ref: "6fff364132d048810e4e4ab0ce4d52d9cb654d0e"}
InfluxDB 2.0.9 (git: d1233b7951) build_date: 2021-09-24T20:04:53Z

Write Logging Always Returns 0 for Response

Hi, I've been using this library for some time and I'm quite loving it for writing some tick data into influxDB. I recently ran into an issue where writes were failing cause of data type mis matches (trying to write integer to a float field).

The logs were returning response of 0, which should be okay, but then I dug deeper and saw it's ALWAYS returning 0.

Is there a way to change the log WriteEntry to actually return an error response instead of response always being set to 0?

erlang error: :"function not exported"

I just tried to write some sample values but got stuck pretty fast. If I log my points right before putting them into the query they look like this:

[debug] points = [%{fields: %{value: 1.1}, name: "a"}, %{fields: %{value: 2.2}, name: "b"}, %{fields: %{value: 3.3}, name: "c"}]

but then I only get an error from erlang:

[error] GenServer #PID<0.337.0> terminating
Last message: {:execute, %Instream.Query{payload: "{\"points\":[{\"name\":\"a\",\"fields\":{\"value\":1.1}},{\"name\":\"b\",\"fields\":{\"value\":2.2}},{\"name\":\"3\",\"fields\":{\"value\":3.3}}],\"database\":\"dev_db\"}", type: :write}, []}
State: [module: MyApp.Connection, otp_app: :my_app, hosts: ["localhost"], pool: [max_overflow: 0, size: 1], port: 8086, scheme: "http", username: "phoenix", password: "swordfish"]
** (exit) an exception was raised:
    ** (ErlangError) erlang error: :"function not exported"
        (instream) Instream.Pool.Worker.terminate({%Poison.SyntaxError{message: "Unexpected token: p", token: "p"}, [{Poison.Parser, :parse!, 2, [file: 'lib/poison/parser.ex', line: 56]}, {Poison, :decode!, 2, [file: 'lib/poison.ex', line: 83]}, {Instream.Pool.Worker, :handle_call, 3, [file: 'lib/instream/pool/worker.ex', line: 29]}, {:gen_server, :try_handle_call, 4, [file: 'gen_server.erl', line: 607]}, {:gen_server, :handle_msg, 5, [file: 'gen_server.erl', line: 639]}, {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 237]}]}, [module: MyApp.Connection, otp_app: :my_app, hosts: ["localhost"], pool: [max_overflow: 0, size: 1], port: 8086, scheme: "http", username: "phoenix", password: "swordfish"])
        (stdlib) gen_server.erl:621: :gen_server.try_terminate/3
        (stdlib) gen_server.erl:787: :gen_server.terminate/7
        (stdlib) proc_lib.erl:237: :proc_lib.init_p_do_apply/3

I tried to simply follow the usage examples in your README, do you know where I went wrong?

Schema defination

Could you please tell me where I define my schema using this library? Is it still in a section of the influxdb.conf? Many Thanks.

InfluxDB 2.x and InfluxDB Cloud Support

I am configuring monitoring for an Elixir application and plan to use InfluxDB Cloud to store my metric data. It is recommended to use their new service, which only supports InfluxDB 2.x. The configuration for their cloud product uses token based authentication, which doesn't appear to be possible in the current version. There are probably other compatibility issues which I am not aware of.

Just wondering if there are plans to support 2.x and/or InfluxDB cloud. I know their company is committed to supporting the BEAM ecosystem, so I will probably reach out to them directly if there isn't work being done already. Thank you.

[proposal] Series struct like Ecto.Schema

why not use a struct like Ecto.Schema for the Series?

ex:

defmodule MySeries
  use Instream.Schema

  series do
    database    "my_database_optional"
    measurement "cpu_load"

    tag :host, default: "www"
    tag :core

    field :value, default: 100
    field :value_desc
  end
end

# or measurement name with argument of `series/2`
defmodule MySeries
  use Instream.Schema

  series "cpu_load" do
  end
end

serie = %MySeries{
  value: 100,
  value_desc: nil,
  host: "www",
  core: nil,
  timestamp: nil
}

serie.__meta__(:fields)
%MySeries.Fields{ value: 100, value_desc: nil }

serie.__meta__(:tags)
%MySeries.Tags{ host: "www", core: nil }

Query Builder where clause should support more operators

As I read from the code, currently, Instream.Query.Builder where clause only supports equal operator. It should support more operators like >, <, >=, <=

One idea is for each {key, value} pair in the where conditions map, the key can be a string with format "field[space]op" whereas op is either ">", "<", ">=" nor "<=". If the key is an atom or op is omitted, we can consider the operator is equal.

For example:

# SELECT * FROM some_measurement WHERE binary = 'foo' AND numeric >= 42 AND location = 'abc'
from("some_measurement")
|> where(%{ binary: "foo", "numeric >=": 42, "location": "abc" })
|> MyApp.MyConnection.query()

Query Builder should also support LIMIT and OFFSET clauses
For example:

# SELECT * FROM some_measurement WHERE binary = 'foo' LIMIT 15 OFFSET 78
from("some_measurement")
|> where(%{ binary: "foo"})
|> limit(15)
|> offset(78)
|> MyApp.MyConnection.query()

If you don't mind, I can create a PR for this

Parametrized flux queries against influxdb cloud

Missing support for parametrized flux queries which influxdb api supports

We should ideally be able to write a parametrized flux query like below:

  query = """
  from(bucket: params.bucket)
  |> range(start: #{range.from}T04:00:00.000Z, stop: #{range.to}T03:59:00.000Z)
  |> filter(fn: (r) => r._measurement == params.series)
  |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> group()
  |> map(fn: (r) => ({ r with _value: 1 }))
  |> count()
  """

InfluxConnection.query(query, params: %{bucket: "test", series: "series"})

Encoding series to a string

It is possible currently to take a Instream.Series implementation and encode it to string?

The use-case is to send metrics from server to a metrics poller. Meaning that the poller will actually insert the data in database and all I have to do is send series encoded as string.

Enforcing field types on write

Hello,

is there a way to enforce/specify field types on write commands?

Currently I'm trying to write integer values to fields. I get partial drops while writing because of values that mix up integer and float. When I look at the measurement keys afterwards, it says there would be integer and float values, despite the fact that a select only shows integers.

InfluxDB's docs say that an integer value can be suffixed with "i" to force Influx to interpret it as integer. I would like to do that with Instream.

Greets
Nils

Dialyxir error

Hello, I have several error using dialyxir with instream 0.18

lib/youpi/influx.ex:2:no_return Function ping/0 has no local return.
lib/youpi/influx.ex:2:no_return Function ping/1 has no local return.
lib/youpi/influx.ex:2:no_return Function ping/2 has no local return.
lib/youpi/influx.ex:2:call_without_opaque Call without opaqueness type mismatch.
lib/youpi/influx.ex:2:no_return Function status/0 has no local return.
lib/youpi/influx.ex:2:no_return Function status/1 has no local return.
lib/youpi/influx.ex:2:no_return Function status/2 has no local return.
lib/youpi/influx.ex:2:call_without_opaque Call without opaqueness type mismatch.
lib/youpi/influx.ex:2:no_return Function version/0 has no local return.
lib/youpi/influx.ex:2:no_return Function version/1 has no local return.
lib/youpi/influx.ex:2:no_return Function version/2 has no local return.
lib/youpi/influx.ex:2:call_without_opaque Call without opaqueness type mismatch.

lib/youpi/influx.ex

defmodule Youpi.Influx do
  use Instream.Connection, otp_app: :youpi

Best regards

Is writing multiple points broken?

First off, thanks for all of the work on this!

I've been trying to write multiple points using both series and raw points but both return saying N points were written yet only the last map in the list will actually write to the db. Any ideas?

Usage examples

Hey! I know this is experimental, but does it work? If so, is there any chance you could provide some documentation around basic configuration and usage?

Thanks!

Add ability to write explicit timestamps to series

The system I'm measuring generates usec timestamps with it's events, so I need to push them to influx along with the tags and fields. I'll start hacking on a PR to add it, but let me know if I need to know anything in particular.

New release (0.23)

Any timeline for when there will be a new release?

There has been plenty of work done since 0.22. I am currently using the repo's master branch as a dependency because 0.22 has outdated dependencies that won't work in our project.

unmatched query result

unhandled result shape from hackney.body (query result). Just need to return {:error, :timeout} up to the caller

[error] GenServer #PID<0.365.0> terminating
** (MatchError) no match of right hand side value: {:error, :timeout}
    (instream) lib/instream/connection/query_runner.ex:79: Instream.Connection.QueryRunner.read/3

[question] how to get timestamp as time result

When I do a query like

"SELECT *
      FROM my_lot
LIMIT 10"
|> MyApp.Connection.query(database: "jarvis_cloaking_trace")

It gives me time like:

"2018-02-07T08:02:13.34138023Z"

So my question is: how can I get a timestamp as result? And acutally I don't quite understand this: is this a UTC time? Kind of complex if I have to parse a string rather than a timestamp or a DateTime object

Write multiple points

It would be awesome to be able to write many points with one query at once!
If you give some pointers - really needed since I'm fairly new to elixir and especially to the macro system, I could write a PR.

Anyways, thanks for this super useful library.

Query with series like for write ?

Is there an easy way to do the equivalent of a write with a series but for a query ?
As of now, the two sides are completely dissociated, right?

support for mix release

Hi there, I'm using instream with 1.9 elixir and when trying to create release with mix release, I get the error since :influxql is in the included_applications. Here is the error message:

** (Mix) Undefined applications: [influxql]

Simply removing included_applications from mix.exs solves the issue, and it works perfectly, but I'm not sure if it breaks the library on older versions of Elixir (i believe prior to 1.6 or 1.5, it was necessary to define other applications that you want started by the BEAM). For now I have my own fork with this modification. If you think this change is something you would like to include, I can PR it.

Of course, if there is a different way to solve this issue, let me know.

P.S. Thanks for the work put in to this project, appreciate it!

Influx DB Cloud access returns an error

We are trying to access an Cloud Influx DB 2.4 instance.

Querying the data with:

MyService.Connection.query("from(bucket: "some_bucket") |> range(start: -24h) |> first()", log: true)

Results in:

%{code: "invalid", message: "invalid: unsupported Content-Type header value"}

We are using the latest master of instream and this configuration.

config :my_service, MyService.Connection,
   auth: [
     method: :token,
     token: "SOME_TOKEN"
   ],
   bucket: "some_bucket",
   org: "some_org",
   scheme: "https",
   host: "westeurope-1.azure.cloud2.influxdata.com",
   port: 443,
   http_opts: [recv_timeout: 30_000],
   version: :v2

Any idea how to solve this issue?

Pool Spec is broken.

** (Mix) Could not start application app: App.start(:normal, []) returned an error: bad start spec: invalid mfa: {{App.Influx, {:poolboy, :start_link, [[worker_module: Instream.Pool.Worker, name: {:local, App.Influx.Pool}, size: 5, max_overflow: 10], [module: App.Influx, writer: Instream.Writer.JSON, otp_app: :mixdown, hosts: ["localhost"], pool: [max_overflow: 0, size: 1], port: 8086, scheme: "http"]]}, :permanent, 5000, :worker, [:poolboy]}, :start_link, []}

According to @josevalim, the mfa spec should be: {module, atom, args}. I don't claim to fully understand the implementation details related to this, but I can't seem to make the module work under Erlang/OTP 17 / Elixir 1.0.5.

Hackney error

Ran into this error quite a few times:

** (ArgumentError) argument error
            (stdlib 3.13) :ets.lookup_element(:hackney_config, :mod_metrics, 2)
            /Users/michael/Documents/alchemist/deps/hackney/src/hackney_metrics.erl:27: :hackney_metrics.get_engine/0
            /Users/michael/Documents/alchemist/deps/hackney/src/hackney_connect.erl:75: :hackney_connect.create_connection/5
            /Users/michael/Documents/alchemist/deps/hackney/src/hackney_connect.erl:44: :hackney_connect.connect/5
            /Users/michael/Documents/alchemist/deps/hackney/src/hackney.erl:333: :hackney.request/5
            (stdlib 3.13) timer.erl:166: :timer.tc/1
            lib/instream/connection/query_runner.ex:70: Instream.Connection.QueryRunner.read/3

Solved it by adding a Application.ensure_all_started(:hackney) on startup.

Write operation could return the created object

From what I've tested, the write function returns :ok in case of success.

It would be interesting if it returned a tuple {:ok, object} containing the object that has been created during this operation.

Series `database` not optional

I have defined a series without a database: "some_db".
I was expecting that the database i defined in config.exs will be taken.
Instead there is a error like %{error: "database is required"}.

Is there a way to achieve this?

Database name could be set on config

We set many settings on the config, such as host, port, writer, etc.
I believe that a default database name could also be set, and it would be used in every query, unless another database name is provided.

Mix with custom configuration

How would you add custom configuration keys to the config.exs?

For example I would like to set the database I am talking to by adding a database key like so:

config :influxdb, InfluxDB.Connection,
  hosts:  ["localhost"],
  pool:   [max_overflow: 0, size: 1]
  port:   8086,
  scheme: "http",
  write:  Instream.Writer.Line,
  database: "test"

But if fails horribly with:

  failure on setup_all callback, tests invalidated
     ** (FunctionClauseError) no function clause matching in Regex.match?/2
     stacktrace:
       (elixir) lib/regex.ex:158: Regex.match?(~r/^[a-zA-Z0-9_\-]+$/, nil)
       (instream) lib/instream/validate.ex:13: Instream.Validate.database!/1
       (instream) lib/instream/cluster/database.ex:18: Instream.Cluster.Database.create/2
       test/influxdb_test.exs:7: InfluxDBTest.__ex_unit_setup_all_0/1
       test/influxdb_test.exs:1: InfluxDBTest.__ex_unit__/2

Adding documentation for writing raw maps

In the README at "Series Definitions" there is only described how to use the use Instream.Series way for writing metrics.

Is it possible to use "raw maps for writing data" like this?

MyApp.MyConnection.write(
   %{measurement: "example", value: 1, tags: %{foo: "bar"}, fields: %{answer: 42}}
)

If yes, it should be documented because i cant find it by reading the code.

Background: Using Instream.Series is not flexible enough for my need from first sight.

write queries fail silently

First of all, thanks a lot for this library ๐Ÿ‘

I kept trying to write some series today and while this is what I saw on iex:

[debug] [write] 5 points
:ok

The series was not written and influxdb logs showed it was returning 400.

I eventually found the error on my part, but thought maybe it would be better if Connection.write would return an error tuple or something if the write wasn't successful. What do you think? I could make some time to work on this in the coming weeks if you'd be willing to accept a PR.

Crash on write timeout using Instream.Writer.Line and hackney problem

Today we had a connection issue to our InfluxDB and seen some errors like this:

** (MatchError) no match of right hand side value: {:error, :connect_timeout}
(instream) lib/instream/writer/line.ex:26: Instream.Writer.Line.write/3
(instream) lib/instream/connection/query_runner.ex:144: anonymous fn/4 in Instream.Connection.QueryRunner.write/3
(stdlib) timer.erl:166: :timer.tc/1
(instream) lib/instream/connection/query_runner.ex:142: Instream.Connection.QueryRunner.write/3
(instream) lib/instream/pool/worker.ex:36: Instream.Pool.Worker.handle_call/3
(stdlib) gen_server.erl:615: :gen_server.try_handle_call/4
(stdlib) gen_server.erl:647: :gen_server.handle_msg/5
(stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Last message: {:execute, %Instream.Query{method: :get, opts: [database: nil], payload: %{points: [%{fields: %{bandwidth: 125}, measurement: "readings", tags: %{uuid: "847D501619122277"}, timestamp: 1483612556000000000}]}, type: :write}, [database: nil]}
State: %{module: MyApp.Influx.Connection}
=CRASH REPORT==== 5-Jan-2017::10:36:04 ===
crasher:
initial call: Elixir.Instream.Pool.Worker:init/1
pid: <0.28185.4>
registered_name: []
exception exit: {{badmatch,{error,connect_timeout}},
[{'Elixir.Instream.Writer.Line',write,3,
[{file,"lib/instream/writer/line.ex"},{line,26}]},
{'Elixir.Instream.Connection.QueryRunner',
'-write/3-fun-0-',4,
[{file,"lib/instream/connection/query_runner.ex"},
{line,144}]},
{timer,tc,1,[{file,"timer.erl"},{line,166}]},
{'Elixir.Instream.Connection.QueryRunner',write,3,
[{file,"lib/instream/connection/query_runner.ex"},
{line,142}]},
{'Elixir.Instream.Pool.Worker',handle_call,3,
[{file,"lib/instream/pool/worker.ex"},{line,36}]},
{gen_server,try_handle_call,4,
[{file,"gen_server.erl"},{line,615}]},
{gen_server,handle_msg,5,
[{file,"gen_server.erl"},{line,647}]},
{proc_lib,init_p_do_apply,3,
[{file,"proc_lib.erl"},{line,247}]}]}
in function gen_server:terminate/7 (gen_server.erl, line 812)
ancestors: [<0.1659.0>,'Elixir.MyApp.Influx.Connection.Pool',
'Elixir.MyApp.Supervisor',<0.1528.0>]
messages: []
links: [<0.1658.0>,<0.1659.0>]
dictionary: []
trap_exit: false
status: running
heap_size: 2586
stack_size: 27
reductions: 4045
[error] GenServer #PID<0.28185.4> terminating
{shutdown,5000},
Supervisor: {<0.1659.0>,poolboy_sup}
Context: child_terminated
Reason: {{badmatch,{error,connect_timeout}},

The part of the implementation of Instream.Writer.Line looks like this:

    { :ok, status, headers, client } = :hackney.post(url, headers, body, http_opts)
    { :ok, response }                = :hackney.body(client)

I would like to handle the error other than crashing.

Could the Instream.Writer.Line be changed, so it will not crash on {:error, _} ?
Also giving that :error up to the call of Connection.write ?


What i also discovered, is that after all workers from Instream have crashed, also any other calls using hackney failed. Could that be connected?

(UndefinedFunctionError) undefined function: Instream.Pool.Worker.terminate/2

Hey Marc,

I'm having a bit of a problem with the lib, could you help me a bit? I'm just trying it out, so I've set up a config and stuff. Started a link and then i'm just trying to drop create a database and it explodes like so

14:30:38.189 [error] GenServer #PID<0.447.0> terminating
Last message: {:execute, %Instream.Query{payload: "DROP DATABASE blah_test", type: :cluster}, []}
State: [module: Blah.Database, writer: Instream.Writer.JSON, otp_app: :blah_app, auth: [username: "blah", password: "blah"], hosts: ["localhost"], pool: [max_overflow: 0, size: 1], port: 8086, scheme: "http"]
** (exit) an exception was raised:
    ** (ErlangError) erlang error: :"function not exported"
        Instream.Pool.Worker.terminate({:noproc, {:gen_server, :call, [:hackney_manager, {:new_request, #PID<0.447.0>, #Reference<0.0.3.1417>, {:client, :undefined, :hackney_dummy_metrics, :hackney_tcp_transport, 'localhost', 8086, "localhost:8086", [], nil, nil, nil, true, :hackney_pool, 5000, false, 5, false, 5, nil, nil, nil, :undefined, :start, nil, :normal, false, false, false, :undefined, false, nil, :waiting, nil, 4096, "", [], :undefined, nil, nil, nil, nil, :undefined, ...}}, :infinity]}}, [module: Blah.Database, writer: Instream.Writer.JSON, otp_app: :blah_app, auth: [username: "blah", password: "blah"], hosts: ["localhost"], pool: [max_overflow: 0, size: 1], port: 8086, scheme: "http"])
        (stdlib) gen_server.erl:643: :gen_server.try_terminate/3
        (stdlib) gen_server.erl:809: :gen_server.terminate/7
        (stdlib) proc_lib.erl:239: :proc_lib.init_p_do_apply/3
** (exit) exited in: GenServer.call(#PID<0.447.0>, {:execute, %Instream.Query{payload: "DROP DATABASE blah_test", type: :cluster}, []}, 5000)
    ** (EXIT) an exception was raised:
        ** (UndefinedFunctionError) undefined function: Instream.Pool.Worker.terminate/2
            Instream.Pool.Worker.terminate({:noproc, {:gen_server, :call, [:hackney_manager, {:new_request, #PID<0.447.0>, #Reference<0.0.3.1417>, {:client, :undefined, :hackney_dummy_metrics, :hackney_tcp_transport, 'localhost', 8086, "localhost:8086", [], nil, nil, nil, true, :hackney_pool, 5000, false, 5, false, 5, nil, nil, nil, :undefined, :start, nil, :normal, false, false, false, :undefined, false, nil, :waiting, nil, 4096, "", [], :undefined, nil, nil, nil, nil, :undefined, ...}}, :infinity]}}, [module: Blah.Database, writer: Instream.Writer.JSON, otp_app: :blah_app, auth: [username: "blah", password: "blah"], hosts: ["localhost"], pool: [max_overflow: 0, size: 1], port: 8086, scheme: "http"])
            (stdlib) gen_server.erl:643: :gen_server.try_terminate/3
            (stdlib) gen_server.erl:809: :gen_server.terminate/7
            (stdlib) proc_lib.erl:239: :proc_lib.init_p_do_apply/3
    (elixir) lib/gen_server.ex:356: GenServer.call/3
    src/poolboy.erl:76: :poolboy.transaction/3
    test/test_helper.exs:3: (file)
    (elixir) lib/code.ex:307: Code.require_file/2
    (elixir) lib/enum.ex:537: Enum."-each/2-lists^foreach/1-0-"/2

The code looks dead simple

Supervisor.start_link(
  [ Blah.Database.child_spec ],
  strategy: :one_for_one
)

_ = "blah_test" |> Instream.Cluster.Database.drop()   |> Blah.Database.execute()

I'm no expert but it seems that erlang's poolboy behavior expects the terminate/2 method on your Pool.Worker thing

I tried to define the method manually in my deps/instream copy. Didn't quite help. I'm a bit out of my depth there. So, if you could give it a thought, would be great. Thanks!


The env.

influxdb - 1.9.3 (vanilla install)
erlang - 7.0.3
elixir - 1.0.5
instream - github: "mneudert/instream"

let me know if you need any more details or want me to try something on my machine

Issue with distillery

Hello, when I use instream with distillery, I get an error

** (Protocol.UndefinedError) protocol Enumerable not implemented for {GenServer, :call, [#PID<11160.2205.0>, {:execute, %Instream.Query{method: :get, opts: [host: nil], payload: nil, type: :ping}, [log: false]}, :infinity]}
    (elixir) lib/enum.ex:1: Enumerable.impl_for!/1
    (elixir) lib/enum.ex:141: Enumerable.reduce/3
    (elixir) lib/enum.ex:2979: Enum.map_join/3
    (elixir) lib/exception.ex:572: Exception.format_stacktrace/1
    (elixir) lib/exception.ex:145: Exception.format/3
    (distillery) lib/mix/lib/releases/runtime/control.ex:643: Mix.Releases.Runtime.Control.rpc/2
    (distillery) lib/entry.ex:44: Mix.Releases.Runtime.Control.main/1
    (stdlib) erl_eval.erl:680: :erl_eval.do_apply/6

when I run

./bin/bonsai rpc 'Bonsai.InfluxConnection.ping(log: false) '

It is quite difficult to find what's wrong. Any idea ?

Instream can not write in database

I am trying to write test data in BD which is created and exists.
as its on readme:

%MySeries{} |> Fluxter.InfluxConnection.write()                                                                                    
defmodule MySeries do                                                                                                                          
  use Instream.Series                                                                                                                          
                                                                                                                                               
  series do                                                                                                                                    
    database    "my_database"                                                                                                                  
    measurement "provider"                                                                                                                     
                                                                                                                                               
    tag :method, default: "definemethod"                                                                                                       
    field :provider, default: "defineplease"                                                                                                   
  end                                                                                                                                          
end                                                                                                                                            
15:32:17.017 [error] GenServer #PID<0.381.0> terminating
** (ArgumentError) argument error
    (kernel) gen_tcp.erl:149: :gen_tcp.connect/4
    (hackney) src/hackney_connect.erl:246: :hackney_connect.do_connect/5
    (hackney) src/hackney_connect.erl:37: :hackney_connect.connect/5
    (hackney) src/hackney.erl:328: :hackney.request/5
    (instream) lib/instream/writer/line.ex:27: Instream.Writer.Line.write/3
    (instream) lib/instream/connection/query_runner.ex:130: anonymous fn/4 in Instream.Connection.QueryRunner.write/3
    (stdlib) timer.erl:166: :timer.tc/1
    (instream) lib/instream/connection/query_runner.ex:128: Instream.Connection.QueryRunner.write/3
Last message: {:execute, %Instream.Query{method: :get, opts: [], payload: %{database: "my_database", points: [%{fields: %{provider: "defineplease"}, measurement: "provider", tags: %{method: "definemethod"}, timestamp: nil}]}, type: :write}, []}
State: %{module: Fluxter.InfluxConnection}

15:32:17.030 [error] Task #PID<0.391.0> started from #PID<0.389.0> terminating
** (ArgumentError) argument error: {GenServer, :call, [#PID<0.381.0>, {:execute, %Instream.Query{method: :get, opts: [], payload: %{database: "my_database", points: [%{fields: %{provider: "defineplease"}, measurement: "provider", tags: %{method: "definemethod"}, timestamp: nil}]}, type: :write}, []}, 5000]}
    (elixir) lib/gen_server.ex:604: GenServer.call/3
    (poolboy) src/poolboy.erl:76: :poolboy.transaction/3
    (ndc_ex_gateway) lib/ndc_thread_request/ndc_thread_request.ex:17: NDCThreadRequest.start_thread/3
    (ndc_ex_gateway) lib/ndc_core_request/ndc_core_request.ex:11: anonymous fn/3 in NDCCoreRequest.process_request/4
    (elixir) lib/task/supervised.ex:94: Task.Supervised.do_apply/2
    (elixir) lib/task/supervised.ex:45: Task.Supervised.reply/5
    (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Function: #Function<1.132609312/0 in NDCCoreRequest.process_request/4>
    Args: []

15:32:17.030 [error] Ranch protocol #PID<0.389.0> (:cowboy_protocol) of listener HTTPRouter.HTTP terminated
** (exit) exited in: GenServer.call(#PID<0.381.0>, {:execute, %Instream.Query{method: :get, opts: [], payload: %{database: "my_database", points: [%{fields: %{provider: "defineplease"}, measurement: "provider", tags: %{method: "definemethod"}, timestamp: nil}]}, type: :write}, []}, 5000)
    ** (EXIT) :badarg

What about creating a separated version for InfluxDB v2?

I think InfluxDB v1.x and v2 are similar but also different in many points.
(ex. series)

It's going to be a hurdle to make a library that is compatible with both versions.

What about creating a separated version for InfluxDB v2?

Trying to connect to InfluxDB 2.0.7 without success.

Hi, thank you for the library.

I am getting this error when trying to connect to the server:
%{code: "unauthorized", message: "Unauthorized"}

My Server is running in docker on Azure. Web interface is working fine.

I created this config:
config :my_app, MyConnection,
auth: [method: :basic, username: "root", password: "root"]

I tried to change to use :query instead of :basic

I also noticed there's no longer enable-auth for influx 2.0 right?

How can I connect via user/password?

Thanks

Influxdb 3.0

Hi there,

influxdb 3.0 was released last year. Is a new version of instream with support for 3.0 planned soon (maybe this year)?

Best regards
Patrick

from_result does not return timestamp or time

I'm wondering how to get the time or timestamp back from the function from_result.
I get timestamp nil.

%Signal{
fields: %Signal.Fields{
chart_timeframe: "15M",
signal_price: 8888,
signal_type: "BUY"
},
tags: %Signal.Tags{
algo: "ETHI",
exchange: "KRAKEN",
ticker: "ETHUSD"
},
timestamp: nil
}

Reconnection fails after prolonged Internet interruption

Hi!
Thanks for this repository. It enabled me to switch to Elixir for my little project.
In my application I regularly want to write data to Influx with an intermittent internet connection.
To address this I use Connection.status to see if there is currently Internet available.
If :error is returned data is buffered, otherwise data is sent. In my test scenario this was working just fine.
However if internet is not available for a longer time (this means tens or hundreds of failed Connection.status queries), it won't ever reconnect, bus returns constantly :error. After a complete restart of the application it works fine again.
Unfortunately I am no elixir expert and don't really now how I could see why this happens.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.