Giter Club home page Giter Club logo

tstorage's Introduction

tstorage Go Reference

tstorage is a lightweight local on-disk storage engine for time-series data with a straightforward API. Especially ingestion is massively optimized as it provides goroutine safe capabilities of write into and read from TSDB that partitions data points by time.

Motivation

I'm working on a couple of tools that handle a tremendous amount of time-series data, such as Ali and Gosivy. Especially Ali, I had been facing a problem of increasing heap consumption over time as it's a load testing tool that aims to perform real-time analysis. I little poked around a fast TSDB library that offers simple APIs but eventually nothing works as well as I'd like, that's why I settled on writing this package myself.

To see how much tstorage has helped improve Ali's performance, see the release notes here.

Usage

Currently, tstorage requires Go version 1.16 or greater

By default, tstorage.Storage works as an in-memory database. The below example illustrates how to insert a row into the memory and immediately select it.

package main

import (
	"fmt"

	"github.com/nakabonne/tstorage"
)

func main() {
	storage, _ := tstorage.NewStorage(
		tstorage.WithTimestampPrecision(tstorage.Seconds),
	)
	defer storage.Close()

	_ = storage.InsertRows([]tstorage.Row{
		{
			Metric: "metric1",
			DataPoint: tstorage.DataPoint{Timestamp: 1600000000, Value: 0.1},
		},
	})
	points, _ := storage.Select("metric1", nil, 1600000000, 1600000001)
	for _, p := range points {
		fmt.Printf("timestamp: %v, value: %v\n", p.Timestamp, p.Value)
		// => timestamp: 1600000000, value: 0.1
	}
}

Using disk

To make time-series data persistent on disk, specify the path to directory that stores time-series data through WithDataPath option.

storage, _ := tstorage.NewStorage(
	tstorage.WithDataPath("./data"),
)
defer storage.Close()

Labeled metrics

In tstorage, you can identify a metric with combination of metric name and optional labels. Here is an example of insertion a labeled metric to the disk.

metric := "mem_alloc_bytes"
labels := []tstorage.Label{
	{Name: "host", Value: "host-1"},
}

_ = storage.InsertRows([]tstorage.Row{
	{
		Metric:    metric,
		Labels:    labels,
		DataPoint: tstorage.DataPoint{Timestamp: 1600000000, Value: 0.1},
	},
})
points, _ := storage.Select(metric, labels, 1600000000, 1600000001)

For more examples see the documentation.

Benchmarks

Benchmark tests were made using Intel(R) Core(TM) i7-8559U CPU @ 2.70GHz with 16GB of RAM on macOS 10.15.7

$ go version
go version go1.16.2 darwin/amd64

$ go test -benchtime=4s -benchmem -bench=. .
goos: darwin
goarch: amd64
pkg: github.com/nakabonne/tstorage
cpu: Intel(R) Core(TM) i7-8559U CPU @ 2.70GHz
BenchmarkStorage_InsertRows-8                  	14135685	       305.9 ns/op	     174 B/op	       2 allocs/op
BenchmarkStorage_SelectAmongThousandPoints-8   	20548806	       222.4 ns/op	      56 B/op	       2 allocs/op
BenchmarkStorage_SelectAmongMillionPoints-8    	16185709	       292.2 ns/op	      56 B/op	       1 allocs/op
PASS
ok  	github.com/nakabonne/tstorage	16.501s

Internal

Time-series database has specific characteristics in its workload. In terms of write operations, a time-series database has to ingest a tremendous amount of data points ordered by time. Time-series data is immutable, mostly an append-only workload with delete operations performed in batches on less recent data. In terms of read operations, in most cases, we want to retrieve multiple data points by specifying its time range, also, most recent first: query the recent data in real-time. Besides, time-series data is already indexed in time order.

Based on these characteristics, tstorage adopts a linear data model structure that partitions data points by time, totally different from the B-trees or LSM trees based storage engines. Each partition acts as a fully independent database containing all data points for its time range.

  │                 │
Read              Write
  │                 │
  │                 V
  │      ┌───────────────────┐ max: 1600010800
  ├─────>   Memory Partition
  │      └───────────────────┘ min: 1600007201
  │
  │      ┌───────────────────┐ max: 1600007200
  ├─────>   Memory Partition
  │      └───────────────────┘ min: 1600003601
  │
  │      ┌───────────────────┐ max: 1600003600
  └─────>   Disk Partition
         └───────────────────┘ min: 1600000000

Key benefits:

  • We can easily ignore all data outside of the partition time range when querying data points.
  • Most read operations work fast because recent data get cached in heap.
  • When a partition gets full, we can persist the data from our in-memory database by sequentially writing just a handful of larger files. We avoid any write-amplification and serve SSDs and HDDs equally well.

Memory partition

The memory partition is writable and stores data points in heap. The head partition is always memory partition. Its next one is also memory partition to accept out-of-order data points. It stores data points in an ordered Slice, which offers excellent cache hit ratio compared to linked lists unless it gets updated way too often (like delete, add elements at random locations).

All incoming data is written to a write-ahead log (WAL) right before inserting into a memory partition to prevent data loss.

Disk partition

The old memory partitions get compacted and persisted to the directory prefixed with p-, under the directory specified with the WithDataPath option. Here is the macro layout of disk partitions:

$ tree ./data
./data
├── p-1600000001-1600003600
│   ├── data
│   └── meta.json
├── p-1600003601-1600007200
│   ├── data
│   └── meta.json
└── p-1600007201-1600010800
    ├── data
    └── meta.json

As you can see each partition holds two files: meta.json and data. The data is compressed, read-only and is memory-mapped with mmap(2) that maps a kernel address space to a user address space. Therefore, what it has to store in heap is only partition's metadata. Just looking at meta.json gives us a good picture of what it stores:

$ cat ./data/p-1600000001-1600003600/meta.json
{
  "minTimestamp": 1600000001,
  "maxTimestamp": 1600003600,
  "numDataPoints": 7200,
  "metrics": {
    "metric-1": {
      "name": "metric-1",
      "offset": 0,
      "minTimestamp": 1600000001,
      "maxTimestamp": 1600003600,
      "numDataPoints": 3600
    },
    "metric-2": {
      "name": "metric-2",
      "offset": 36014,
      "minTimestamp": 1600000001,
      "maxTimestamp": 1600003600,
      "numDataPoints": 3600
    }
  }
}

Each metric has its own file offset of the beginning. Data point slice for each metric is compressed separately, so all we have to do when reading is to seek, and read the points off.

Out-of-order data points

What data points get out-of-order in real-world applications is not uncommon because of network latency or clock synchronization issues; tstorage basically doesn't discard them. If out-of-order data points are within the range of the head memory partition, they get temporarily buffered and merged at flush time. Sometimes we should handle data points that cross a partition boundary. That is the reason why tstorage keeps more than one partition writable.

More

Want to know more details on tstorage internal? If so see the blog post: Write a time-series database engine from scratch.

Acknowledgements

This package is implemented based on tons of existing ideas. What I especially got inspired by are:

A big "thank you!" goes out to all of them.

tstorage's People

Contributors

dpgil avatar fedorlitau avatar mattdsteele avatar nakabonne avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

tstorage's Issues

Consider another compression way

Currently, right before flushing data points to disk, it encodes DataPoint with encoding/binary then compresses with gzip. But time series data can be compressed quite well; we'd better look into another way to compress.

tstorage/storage.go

Lines 148 to 150 in 2ed7de6

// TODO: Make gzip compressor/decompressor changeable
compressorFactory: newGzipCompressor,
decompressorFactory: newGzipDecompressor,

Gorilla is one of the effective ways: http://www.vldb.org/pvldb/vol8/p1816-teller.pdf

WAL recovery issue

Hey @nakabonne ,
I appreciate your quick work on the WAL, that's awesome! I just tried it and apparently I ran into an issue with the recovery from the WAL file. I let it run for ~2minutes where it added a few metrics into the TSDB and I noticed that the WAL file is also written. During recovery the following error popped up:

failed to recover WAL: failed to read WAL: encounter an error while reading WAL segment file "0": failed to read the metric name: unexpected EOF

My settings are:

	tsdbOpts := []tstorage.Option{
		tstorage.WithTimestampPrecision(tstorage.Seconds),
		tstorage.WithPartitionDuration(cfg.CacheRetention / 2),
		tstorage.WithDataPath(cfg.Persistence.Disk.DataPath),
		tstorage.WithRetention(cfg.Persistence.Disk.Retention),
	}

        storage, err := tstorage.NewStorage(tsdbOpts...)

I sent you the WAL file via EMail.

tstorage + gRPC = tstorage-server

Hello @nakabonne,

I saw your post about tstorage on HackerNews and I spent some time looking through the source code - and I love the project.

The challenge I have is I want to access tstorage by many applications simultaneously; as a result, I spent some time writing a gRPC service definition overtop of tstorage to turn it into a server application. I've created the repo tstorage-server.

Have a look and feel free to use any bit of the code if you like. If not, no worries! Just wanted to say thanks again for your great work!

Take care,
-Bart

Support for external TSDB storage

Hey,
I'd like to use tstorage but I'd like to use an external persistence (in my case Kafka). Do you think this would make sense to support so that we can use external systems like Databases or even streams like Kafka for the persistence?

My usecase is to utilize tstorage to collect some metrics for Kowl so that we can show some graphs (e.g. storage usage over time). Ideally we don't need our users to configure a filestorage, but instead we could use something like a Kafka topic that can be shared by all instances.

Partial label matching in selects

Unfortunately, it currently only returns data points with an exact match of the label combination, which means, let's say you run storage.Select("consumergroup_topic_lag", []tstorage.Label{"consumer_group": "foo"}) then:

  • consumergroup_topic_lag {"consumer_group": "foo"} will be returned
  • consumergroup_topic_lag {} will not be returned
  • consumergroup_topic_lag {"consumer_group": "foo", "another_label": "bar"} will not be returned

I think it would be beneficial if we had the option to query for metrics using partial labels. Example:

storage.Select("consumergroup_topic_lag", []tstorage.Label{"consumer_group": "foo"})

Returns all metrics where a label with key consumer_group equals to foo , regardless what other labels are set as long as the queried matches exist in the target:

consumergroup_topic_lag {"consumer_group": "foo", "topic": "bla"} 1000
consumergroup_topic_lag {"consumer_group": "foo", "topic": "bar"} 2000
consumergroup_topic_lag {"consumer_group": "foo", "topic": "basket"} 0

RFC: more generic DataPoint values

Hi,

I've been toying with the idea of using tstorage for a series of location data, i.e. 3D positions (latitude, longitude, altitude) for different devices.
If I understand correctly, using tstorage I'd have to model the various serious and values belonging to the same "measurement" as something like:

[
(DataPoint(now, 42), Label("id", "device0"), Label("latitude", "")),
(DataPoint(now, 23), Label("id", "device0"), Label("longitude", "")),
...
]

Is my understanding correct? If so and it's probably a big ask, but would it be possible/make-sense to generalize the data representation to e.g. a byte array? In that case, I could store all related measures as a single point. Does the storage format currently require fixed value/data-point sizes?

Thanks!

Compress WAL file

Another WAL related question: Is it expected to grow really fast? I don't scrape a lot of metrics (I'd guess 1000 metrics / 15s), but after 2 minutes the WAL file has already reached 11MB in file size.

Originally posted by @weeco in #37 (comment)

As noticed in the referenced issue the WAL file gets really big quickly. I wanted to create a separate issue for this to possibly introduce a more compressed way how the WAL file is written.

Maybe you can recommend a workaround to reduce the size of the WAL file. I assume smaller partitions could help with that?

Wildcard in Select method

It seems very convenient to be able selecting more than one metric using the following syntax:
(Assume having "metric-1" and "metric-2" data tags)

storage.Select("metric?", nil, timestart, timeend)

to select all tags with any symbol on ? place
or

storage.Select("*", nil, timestart, timeend)

to select all data.

Similarly it could be regex.

Fail to find metrics in-mem mode when partitionList size is bigger then 3

I tried to run in-memory mode and pushed thousands of metrics to the storage.

The datapoints span more then 1d - about 10 metrics 1 each minute.

partition-size is 1h which is the default, retention is 14d which is the default.

I've noticed that once the 3rd partitionList is generated the following method will be invoked flushPartitions and the following code will delete the older partition:

		if s.inMemoryMode() {
			if err := s.partitionList.remove(part); err != nil {
				return fmt.Errorf("failed to remove partition: %w", err)
			}
			continue
		}

This will cause Select to fail on metrics that were part of the first partition that was removed.

I don't understand what does flush mean for in-memory. I guess it should be no-op and the only method that should delete partitions is: removeExpiredPartitions

I understand for disk storage we would like to flush the partition to disk for persistence but the current code will not hold more then 2 partitions in-memory at all times.

nil tail when recovering existing partitions

When recovering / restoring from an existing data directory the tail is nil and results in a segment fault during inserting.

I have introduced a recovery mode on my fork but as a work around, I have added the following code to getTail on partitionListImpl.

func (p *partitionListImpl) getTail() partition {
	if p.size() <= 0 {
		return nil
	}
	p.mu.RLock()
	defer p.mu.RUnlock()

	if p.tail == nil {
		log.Warn("tail partition not found, attempting to recover tail")
		i := p.newIterator()
		for i.next() {
			p.tail = i.currentNode()
		}
	}

	return p.tail.value()
}

examples

how about adding an examples folder ?

the readme has gists or bits but it would be easier to grok on this with examples.

Missing meta.json file

When I abruptly stop the application it apparently does not necessarily write a meta.json file into the disk directories. On startup a log message like this will pop up then:

failed to open disk partition for config\\tsdb-data\\xy\\p-1635758430-1635769231: failed to read metadata: open config\\tsdb-data\\xy\\p-1635758430-1635769231\\meta.json: The system cannot find the file specified."

I'm unsure when the meta.json file is supposed to be written, hence I can't tell whether it's dependent on the fact that I abruptly stop the application or not. It's not easily testable for me because it takes some time until it writes these files I think? Let me know if I can provide further details, has happened a couple times already.

Accept out-of-order data points

The remaining tasks are:

Try to insert outdated rows to head's next partition

tstorage/storage.go

Lines 241 to 242 in 2ed7de6

// TODO: Try to insert outdated rows to head's next partition
_ = outdatedRows

Merge out-of-order data points (updates: resolved by #16)

tstorage/storage.go

Lines 414 to 418 in 2ed7de6

// TODO: Merge out-of-order data points
points := make([]*DataPoint, 0, len(mt.points)+len(mt.outOfOrderPoints))
for _, p := range mt.points {
points = append(points, p)
}

Windows support

tstorage depends on the mmap system call. Currently it uses the syscall standard package, but within this package there are a couple of unix dependent properties like syscall.Mmap, syscall.PROT_READ, and syscall.MAP_SHARED.

Return labels as part of the returned datapoints

Hey,
I'd like to access the labels as part of the returned datapoints. Let me describe my usecase in more detail:

Similiar like I'd do with Prometheus I'm collecting several metrics for Kafka consumer groups. Each consumer group may consume one or more Kafka topics. The consumer group may have a delay/lag which is different for each topic. I'm scraping this lag for each <consumerGroup, topic> tuple and later on I'd like to render a graph of the lag for each of this tuple.

This would currently require me to know the labels in advance since they are not returned in storage.Select() (just the value and timestamp are returned as part of the DataPoint structure). If I had an option to also return the labels like this:

storage.Select("consumergroup_topic_lag", []tstorage.Label{"consumer_group": "foo"}, start. end, tstorage.WithLabels())

that would be very helpful. I would be able to group the returned datapoints by a certain label key and therefore further process my results.

Read during writing

Hello,

In my use case I store data every second but I want to read also at the same time but from another thread. When I do the following from my main thread.

t.db.InsertRows([]tstorage.Row{
		{
			Metric:    hardware,
			DataPoint: tstorage.DataPoint{Timestamp: time_unix, Value: reflect.ValueOf(pair.Value).Float()},
		},
	})

when a request has come i want to read information via

	var reader tstorage.Reader

        points, _ := reader.Select(req.Metric, nil, req.Start, req.End)
	        for _, p := range points {
	                fmt.Printf("timestamp: %v, value: %v\n", p.Timestamp, p.Value)
		        // => timestamp: 1600000000, value: 0.1
	         }

but I get

panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x2 addr=0x0 pc=0x104671c40]

Partitions can end up out of order

Let's say we have a storage instance with partition size = 2, and we insert rows at timestamp 1 and timestamp 3.

We then have a full partition with minTimestamp=1, maxTimestamp=3. On the next insert, a new partition will be created. When a new partition is created, the first time a row is inserted the minimum timestamp is set (code ref), so let's say we now insert a row at timestamp 2.

We'll now have two partitions with data points out of order. This breaks the query functionality as well.

inclusive end in Select

During Select in this code using >= instead of > making end inclusive which contradict with documentation for Select

if end >= maxTimestamp {
	endIdx = int(size)
} else {
	// Use binary search because points are in-order.
	endIdx = sort.Search(int(size), func(i int) bool {
		return m.points[i].Timestamp >= end
	})
}

Unable to load previous metrics

Hi,
I am getting follwing error while loading data from disk.
failed to open disk partition for /usr/Hosting/metrics/p-1646741592161-1646741652162: failed to perform mmap: cannot allocate memory
Metrics pointer is nil.
my code

metrics, err = tstorage.NewStorage(
			tstorage.WithTimestampPrecision(tstorage.Milliseconds),
			tstorage.WithDataPath("/usr/Hosting/metrics"),
			tstorage.WithWALBufferedSize(0),
		)
		if err != nil {
			log.Print(err.Error())
		}

Stop sharing lock between write and read

All data points are immutable, and all write operation is append-only. Logically any read operation and write operation don't conflict at all.

In spite of that, we currently lock with Mutex whenever reading and writing.

// TODO: Consider to stop using mutex every time.
// Instead, fix the capacity of points slice, kind of like:
/*
m.points := make([]*DataPoint, 1000)
for i := 0; i < 1000; i++ {
m.points[i] = point
}
*/
m.mu.Lock()
defer m.mu.Unlock()

Handle data points outside range of the head partition

Basically, all data points get ingested into the head partition. But sometimes data points get across the partition boundary, which should get into the next partition.

tstorage/storage.go

Lines 251 to 252 in b1bbbaf

// TODO: Try to insert outdated rows to head's next partition
_ = outdatedRows

For points older than the head's next partition, it will get discarded.

Proposal: gzip partition data

Given a naive write of sequential data:

for i := int64(1); i < 10000; i++ {
	_ = storage.InsertRows([]tstorage.Row{{
			Metric:    "metric1",
			DataPoint: tstorage.DataPoint{Timestamp: 1600000000 + i, Value: float64(i)},
		}})
}

The file contains many stretches of repeated values (mostly 0x00's). This is great for gzip, a byte-stream de-duplicatior.

If I gzip the file:

$ gzip -k data && ls -alsh data*                                                                                                                                           130 ↵
 84K -rw-rw-r-- 1 wmeitzler wmeitzler  82K Sep 20 09:45 data
4.0K -rw-rw-r-- 1 wmeitzler wmeitzler 1.3K Sep 20 09:45 data.gz

I achieve file compression of 21x!

Note that this only really provides value when long stretches of adjacent datapoints are similar. If I populate a file with truly random values, rather than ascending, I can only compress from 82kb to 80kb. And I'm paying CPU time to achieve these small files. I suspect, but have not validated, that a meaningful amount of use cases for a TSDB will generate these adjacent-similarity series that would benefit from gzip compression.

Given that Go allows access to the streaming nature of gzip operations, I propose exploring the use of these functions in the reading and writing of data files.

Read during writing

Hello,

In my use case I store data every second but I want to read also at the same time but from another thread. When I do the following

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.