Giter Club home page Giter Club logo

go-diskqueue's Introduction

go-diskqueue's People

Contributors

bitpeng avatar jehiah avatar kev1n80 avatar mgurevin avatar mreiferson avatar ploxiln avatar qinhao avatar stoneyunzhao avatar swanspouse avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

go-diskqueue's Issues

an extra read after we finish reading a file

in "writeOne" method ,we move foward to next file according to the following condition
if d.writePos >= d.maxBytesPerFile { d.writeFileNum++ ... }

but in "readOne" method,we move foward to next file according to the following condition
if d.nextReadPos > d.maxBytesPerFile { ... d.nextReadFileNum++ }

This may cause an extra read after we finish reading a file, and this read will cause an "EOF" error

maxBytesPerFile acts like minBytesPerFile

Since writeOne() checks if the file size exceeds the limit after writing the data instead of checking if writing the data will cause the file size to exceed the limit before writing, this causes the maximum file size to be maxBytesPerFile + maxMsgSize + 3.

Files marked .bad after "Too many open files" read error

The current diskqueue implementation does not discriminate between different types of errors returned by os.OpenFile(). We experienced a problem in production where an nsqd host ran out of file descriptors and os.OpenFile() returned a "Too many open files" error, so a number of NSQ diskqueue files were incorrectly marked as .bad, requiring manual repair (by renaming them and updating the cursor position in the metadata files). It would be good if NSQ was able to discriminate between transient "try again" read errors vs other more serious errors.

I don't have a PR to fix unfortunately, I just thought I would flag up.

tag new release

shall we tag a v1.1.0 release? release notes so far:

 * #11 #12 `Depth()` method could return a stale value for a millisecond or so, fixed
 * #22 include `minMsgSize` in in invalid-message-size logging
 * #23 fix reading existing diskqueue with bigger `maxBytesPerFile` then originally written with
 * #31 avoid possible data-loss and corruption when metadata is not synced during unclean exit
 * #32 fix inaccurate depth after read error on "current" (last) file
 * #34 switch to next file before maxBytesPerFile is reached, instead of after. (Previously, most files would exceed maxBytesPerFile.)
 * #33 #35 switch from Travis-CI to GitHub Actions for CI

cc @jehiah @mreiferson

Possible data loss and/or corruption if disk queue metadata is out of sync

We found a data loss/corruption issue in nsqd 0.3.7 before diskqueue was split in its own module, but AFAIK it is still present in diskqueue.

If a diskqueue's metadata is not in sync with the data file (e,.g nsqd was terminated abruptly before it had the chance to sync metadata), on restart the diskqueue will start reading messages from d.readPos, and writing at d.writePos from the metadata, either or both not the end of file. At some point incoming messages will overwrite messages still being read, the read buffer will have a part of the old message then read the new message, and this race will cause corruption (because the next message size will be reading essentially random data from the new message), cause lost messages and the file being marked as bad.

Our fix was to check the file size against the metadata and if d.writePos is < the file size, we force rotation to a new file, while allowing reading messages to the end of file (possibly causing duplicate messages, but that's better than losing messages and NSQ has no guarantees of once-only delivery anyway):

@@ -449,6 +449,26 @@ func (d *diskQueue) retrieveMetaData() error {
 	d.nextReadFileNum = d.readFileNum
 	d.nextReadPos = d.readPos
 
+	// if the metadata was not sync'd at the last shutdown of nsqd
+	// then the file might actually be larger than the writePos, in
+	// which case the safest thing to do is skip to the next file for
+	// writes, and let the reader salvage what it can
+	fileName = d.fileName(d.writeFileNum)
+	fileInfo, err := os.Stat(fileName)
+	if err != nil {
+		return err
+	}
+	fileSize := fileInfo.Size()
+	if d.writePos < fileSize {
+		d.logf("ERROR: DISKQUEUE(%s) out of sync metadata %d < %d, skipping",
+			d.name, d.writePos, fileSize)
+		d.writeFileNum += 1
+		d.writePos = 0
+		if d.writeFile != nil {
+			d.writeFile.Close()
+			d.writeFile = nil
+		}
+	}
 	return nil
 }

This diff is against the 0.3.7 diskqueue.go, but should be readily transposable to go-diskqueue.

Peek multiple

I use diskqueue as a staging local queue before pushing to Kafka. I use sarama go kafka library for Kafka part.

I need to have high reliability of delivery, and can tolerate small amount of duplicates (they have extra metadata in data to deduplicate easily on a consumer side). To achieve high reliability and also absorb traffic spikes, this is what I do: In one go routine I queue to diskqueue, and in other I do blocking peek, and when there is something I send it to Kafka, once Kafka is happy, I do a read, removing it from a local diskqueue.

I could achieve this also in more complex fashion by using various synchronization primitives and switching between local diskqueue and kafka depending on a backpressure and in-memory queue size, but that would be really quite complex and error prone. So I just decopouled the two (at the cost of allways going to disk, which in the end is not bad, because this protect me from any program crash or exit).

The issue is peeking performance. Because I can only peek one element, I cannot do Kafka batching, which limits throughput, and makes compression less effective.

I would like to peek at say up to 100 next elements, send them in batch, then remove all sent elements (using readChan).

Inaccurate depth count after readError on "current" file

It seems like DiskQueue does not accurately update depth when a read error occurs (in handleReadError()) or in the scenario where the machine finishes reading a file, deletes a file, but crashes before writing to the MetaData file.

confused about the depth field assignment way when retrieveMetaData()

// retrieveMetaData initializes state from the filesystem
func (d *diskQueue) retrieveMetaData() error {
	var f *os.File
	var err error

	fileName := d.metaDataFileName()
	f, err = os.OpenFile(fileName, os.O_RDONLY, 0600)
	if err != nil {
		return err
	}
	defer f.Close()

	var depth int64
	_, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n",
		&depth,
		&d.readFileNum, &d.readPos,
		&d.writeFileNum, &d.writePos)
	if err != nil {
		return err
	}
	atomic.StoreInt64(&d.depth, depth)
	d.nextReadFileNum = d.readFileNum
	d.nextReadPos = d.readPos

	return nil
}

I noticed that the depth was assigned to a local depth variable. then use the atomic.StoreInt64 to set the local depth value to the struct field. I'm confused for this, Is there anything special for the d.depth, why not just

_, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n",
		&d.depth,
		&d.readFileNum, &d.readPos,
		&d.writeFileNum, &d.writePos)

Consistent EOF error when reading immediately after write rolls files

Consistent EOF error happens if the reading position is keeping up with the writing position and if the write rolls a file, then read will throw an EOF error since it missed its chance to roll.

It is to do with how the reader rolls, it checks after it has read a message if it should roll to the next file and when it checks it uses a variable that is called maxBytesPerFileRead. The variable is either the files size if the file is complete or the maxBytesPerFile which is configured (e.g 100 MB).

Say readFileNum = d.writeFileNum and maxBytesPerFile = 100 MB
Say we read Msg-1 and after which the nextReadPos is at 96 MB, this is under maxBytesPerFile (100 MB) so we don’t roll.
Then say we write Msg-2 which is 10 MB. It doesn’t fit so we roll the write file and write the message in the new file. Then we set maxBytesPerFileRead = 96 MB
Now when we read we get an EOF error as there is no more data in the file and we missed our chance to roll when we read Msg-1.

Because we hit an EOF we will mark the file as bad and roll to the next file, so we are not missing any messages but we accumulate a lot of unnecessary bad files.

This problem however doesn’t present itself if the reader is more than a message behind the writer, because after the writer has rolled there are still messages in the previous file for the reader to read. He will then be able to use the updated maxBytesPerFileRead value when reading those messages.

A fix is to move the check for roll at the start of the readOne function and roll before we read if necessary.

Flowchart below hopefully helps with the text above.
image

new release with PeekChan?

Hi.

For my application i need to peek at the queue oldest element before removing it. Could we make a new release with #24 ?

it'll be better if supports async delete interface

Since data have been read, the moveForward will call os.Remove to remove the associated file on disk. But in some scenarios, the caller program maybe crashes and restarts, and the data have been read which also means the file may be removed, but the data haven't been applied in the caller. So the data may lost.
So, in my point of view, it's better to add an interface to remove old files asynchronously to guarantee atomicity.

Close() may cause data lose

func (d *diskQueue) exit(deleted bool) error {
	d.Lock()
	defer d.Unlock()

	d.exitFlag = 1

	if deleted {
		d.logf(INFO, "DISKQUEUE(%s): deleting", d.name)
	} else {
		d.logf(INFO, "DISKQUEUE(%s): closing", d.name)
	}

	close(d.exitChan)
	// ensure that ioLoop has exited
	<-d.exitSyncChan

	close(d.depthChan)

	if d.readFile != nil {
		d.readFile.Close()
		d.readFile = nil
	}

	if d.writeFile != nil {
		d.writeFile.Close()
		d.writeFile = nil
	}

	return nil
}

d.readFile.Close()
d.readFile = nil

but sync() do not d.writeFile.Sync(),may cause data loss, why not add .Sync() in exit().
for example:

func (d *diskQueue) exit(deleted bool) error {
	d.Lock()
	defer d.Unlock()

	d.exitFlag = 1

	if deleted {
		d.logf(INFO, "DISKQUEUE(%s): deleting", d.name)
	} else {
		d.logf(INFO, "DISKQUEUE(%s): closing", d.name)
	}

	close(d.exitChan)
	// ensure that ioLoop has exited
	<-d.exitSyncChan

	close(d.depthChan)

	if d.readFile != nil {
		d.readFile.Close()
		d.readFile = nil
	}

	if d.writeFile != nil {
		d.writeFile.Close()
                 d.writeFile.Sync()
		d.writeFile = nil
	}

	return nil
}

Licensing: MIT?

Hi,

is it fair to assume the same license tas NSQ itself, ie. MIT? Could this be made more explicit?

Thanks
kind regards
Thilo

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.