Giter Club home page Giter Club logo

tape's Introduction

Tape by Square, Inc.

Tape is a collection of queue-related classes for Android and Java.

QueueFile is a lightning-fast, transactional, file-based FIFO. Addition and removal from an instance is an O(1) operation and is atomic. Writes are synchronous; data will be written to disk before an operation returns. The underlying file is structured to survive process and even system crashes and if an I/O exception is thrown during a mutating change, the change is aborted.

NOTE: The current implementation is built for file systems that support atomic segment writes (like YAFFS). Most conventional file systems don't support this; if the power goes out while writing a segment, the segment will contain garbage and the file will be corrupt.

An ObjectQueue represents an ordering of arbitrary objects which can be backed either by the filesystem (via QueueFile) or in memory only.

Download

Download the latest JAR or grab via Maven:

<dependency>
  <groupId>com.squareup.tape2</groupId>
  <artifactId>tape</artifactId>
  <version>2.0.0-beta1</version>
</dependency>

or Gradle:

compile 'com.squareup.tape2:tape:2.0.0-beta1'

Snapshots of the development version are available in Sonatype's snapshots repository.

Usage

Create a QueueFile instance.

File file = // ...
QueueFile queueFile = new QueueFile.Builder(file).build();

Add some data to the queue to the end of the queue. QueueFile accepts a byte[] of arbitrary length.

queueFile.add("data".getBytes());

Read data at the head of the queue.

byte[] data = queueFile.peek();

Remove processed elements.

// Remove the eldest element.
queueFile.remove();

// Remove multiple elements.
queueFile.remove(n);

// Remove all elements.
queueFile.clear();

Read and process multiple elements with the iterator API.

Iterator<byte[]> iterator = queueFile.iterator();
while (iterator.hasNext()) {
  byte[] element = iterator.next();
  process(element);
  iterator.remove();
}

While QueueFile works with byte[], ObjectQueue works with arbitrary Java objects with a similar API. An ObjectQueue may be backed by a persistent QueueFile, or in memory. A persistent ObjectQueue requires a Converter to encode and decode objects.

// A persistent ObjectQueue.
ObjectQueue<String> queue = ObjectQueue.create(queueFile, converter);

// An in-memory ObjectQueue.
ObjectQueue<String> queue = ObjectQueue.createInMemory();

Add some data to the queue to the end of the queue.

queue.add("data");

Read data at the head of the queue.

// Peek the eldest elements.
String data = queue.peek();

// Peek the eldest `n` elements.
List<String> data = queue.peek(n);

// Peek all elements.
List<String> data = queue.asList();

Remove processed elements.

// Remove the eldest element.
queue.remove();

// Remove multiple elements.
queue.remove(n);

// Remove all elements.
queue.clear();

Read and process multiple elements with the iterator API.

Iterator<String> iterator = queue.iterator();
while (iterator.hasNext()) {
  String element = iterator.next();
  process(element);
  iterator.remove();
}

Converter

A Converter encodes objects to bytes and decodes objects from bytes.

/** Converter which uses Moshi to serialize instances of class T to disk. */
class MoshiConverter<T> implements Converter<T> {
  private final JsonAdapter<T> jsonAdapter;

  public MoshiConverter(Moshi moshi, Class<T> type) {
    this.jsonAdapter = moshi.adapter(type);
  }

  @Override public T from(byte[] bytes) throws IOException {
    return jsonAdapter.fromJson(new Buffer().write(bytes));
  }

  @Override public void toStream(T val, OutputStream os) throws IOException {
    try (BufferedSink sink = Okio.buffer(Okio.sink(os))) {
      jsonAdapter.toJson(sink, val);
    }
  }
}

License

Copyright 2012 Square, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

   http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

tape's People

Contributors

bitmanlger avatar casidiablo avatar chrissmith-mcafee avatar crazybob avatar dnkoutso avatar eburke avatar edenman avatar f2prateek avatar holmes avatar jakewharton avatar kryali avatar macarse avatar matthewmichihara avatar mfurtak avatar nightlynexus avatar nirhart avatar paulo-raca avatar pforhan avatar rjrjr avatar swankjesse avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

tape's Issues

All Data Not Properly Zeroed Out

From @casidiablo:

Let me give you an example to make this even clearer and then we'll define what to test:

$ hexdump test_file
00 00 00 39 00 00 00 05 00 00 00 19 00 00 00 35  <- file header
66 66 66 00 00 00 00 00 00 00 00 00 03 62 62 62
00 00 00 03 63 63 63 00 00 00 03 64 64 64 00 00
00 03 65 65 65 00 00 00 03

It's a 57B file length, 5 elems, first position is 0x19 (25), last position is 0x35 (53).

Without this commit, after adding an element (whose content is 0xFFs) of size 5 I got this:

$ hexdump test_file # file was doubled since it was not enough to save the element
00 00 00 39 00 00 00 06 00 00 00 19 00 00 00 3C
66 66 66 00 00 00 00 00 00 00 00 00 03 62 62 62
00 00 00 03 63 63 63 00 00 00 03 64 64 64 00 00
00 03 65 65 65 00 00 00 03 66 66 66 00 00 00 05  
FF FF FF FF FF 03 62 62 00 00 00 00 00 00 00 00  <- queue ends in the last 0xFF
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00     '03 62 62' are from the second row
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00 00 

If we do it with this commit:

$ hexdump test_file
00 00 00 39 00 00 00 06 00 00 00 19 00 00 00 3C
66 66 66 00 00 00 00 00 00 00 00 00 03 62 62 62
00 00 00 03 63 63 63 00 00 00 03 64 64 64 00 00
00 03 65 65 65 00 00 00 03 66 66 66 00 00 00 05  
FF FF FF FF FF 00 00 00 00 00 00 00 00 00 00 00  <- no garbage
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00     
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00 00 

Note that both versions leave garbage in the second row (0x66s). Should we also remove that garbage? Should a test make sure that all unused areas is filled with zeros?

source code is not distributed

the 1.2.3 jar on maven central has no source jar and is class files only. there seems to only be older jars available for download.

Clarification on concurrency

I've been programming for many years, but I'm relatively new to Java and Android development, so please excuse if this is a noobish question asked in the wrong place. Any help is greatly appreciated.

Can somebody please explain to me how to deal with concurrency with tape? I'm building an android client that writes logs to the tape queue as stuff happens, and a few times every day, PeriodicTask kicks off logic that empties the queue and writes it to a remote server. If both of these processes execute at the same time (the appending and the peeking) will that be handled by tape, or will it corrupt the file? If the latter, is there a common or best practice approach to deal with this problem?

Gson 2.1 or above

Using tape is leading to crashes on API 17.

Here's the discussion on this
https://groups.google.com/forum/#!topic/google-gson-codereviews/JtqOGa7mQLY

Hers's the bug filed with gson for this

https://code.google.com/p/google-gson/issues/detail?id=440

05-24 15:09:55.782: E/AndroidRuntime(17609): java.lang.StackOverflowError
05-24 15:09:55.782: E/AndroidRuntime(17609): at com.google.gson.internal.$Gson$Types.resolve($Gson$Types.java:371)
05-24 15:09:55.782: E/AndroidRuntime(17609): at com.google.gson.internal.$Gson$Types.resolve($Gson$Types.java:380)
05-24 15:09:55.782: E/AndroidRuntime(17609): at com.google.gson.internal.$Gson$Types.resolve($Gson$Types.java:375)
05-24 15:09:55.782: E/AndroidRuntime(17609): at com.google.gson.internal.$Gson$Types.resolve($Gson$Types.java:380)
05-24 15:09:55.782: E/AndroidRuntime(17609): at com.google.gson.internal.$Gson$Types.resolve($Gson$Types.java:375)
05-24 15:09:55.782: E/AndroidRuntime(17609): at com.google.gson.internal.$Gson$Types.resolve($Gson$Types.java:380)
05-24 15:09:55.782: E/AndroidRuntime(17609): at com.google.gson.internal.$Gson$Types.resolve($Gson$Types.java:375)
05-24 15:09:55.782: E/AndroidRuntime(17609): at com.google.gson.internal.$Gson$Types.resolve($Gson$Types.java:380)
05-24 15:09:55.782: E/AndroidRuntime(17609): at com.google.gson.internal.$Gson$Types.resolve($Gson$Types.java:375)
05-24 15:09:55.782: E/AndroidRuntime(17609): at com.google.gson.internal.$Gson$Types.resolve($Gson$Types.java:380)
05-24 15:09:55.782: E/AndroidRuntime(17609): at com.google.gson.internal.$Gson$Types.resolve($Gson$Types.java:375)
05-24 15:09:55.782: E/AndroidRuntime(17609): at com.google.gson.internal.$Gson$Types.resolve($Gson$Types.java:380)
05-24 15:09:55.782: E/AndroidRuntime(17609): at com.google.gson.internal.$Gson$Types.resolve($Gson$Types.java:375)
05-24 15:09:55.782: E/AndroidRuntime(17609): at com.google.gson.internal.$Gson$Types.resolve($Gson$Types.java:380)
05-24 15:09:55.782: E/AndroidRuntime(17609): at com.google.gson.internal.$Gson$Types.resolve($Gson$Types.java:375)
05-24 15:09:55.782: E/AndroidRuntime(17609): at com.google.gson.internal.$Gson$Types.resolve($Gson$Types.java:380)
05-24 15:09:55.782: E/AndroidRuntime(17609): at com.google.gson.internal.$Gson$Types.resolve($Gson$Types.java:375)
05-24 15:09:55.782: E/AndroidRuntime(17609): at com.google.gson.internal.$Gson$Types.resolve($Gson$Types.java:380)
05-24 15:09:55.782: E/AndroidRuntime(17609): at com.google.gson.internal.$Gson$Types.resolve($Gson$Types.java:375)
05-24 15:09:55.782: E/AndroidRuntime(17609): at com.google.gson.internal.$Gson$Types.resolve($Gson$Types.java:380)
05-24 15:09:55.782: E/AndroidRuntime(17609): at com.google.gson.internal.$Gson$Types.resolve($Gson$Types.java:375)
05-24 15:09:55.782: E/AndroidRuntime(17609): at com.google.gson.internal.$Gson$Types.resolve($Gson$Types.java:380)
05-24 15:09:55.782: E/AndroidRuntime(17609): at com.google.gson.internal.$Gson$Types.resolve($Gson$Types.java:375)
05-24 15:09:55.782: E/AndroidRuntime(17609): at com.google.gson.internal.$Gson$Types.resolve($Gson$Types.java:380)
05-24 15:09:55.782: E/AndroidRuntime(17609): at com.google.gson.internal.$Gson$Types.resolve($Gson$Types.java:375)
05-24 15:09:55.782: E/AndroidRuntime(17609): at com.google.gson.internal.$Gson$Types.resolve($Gson$Types.java:380)
05-24 15:09:55.782: E/AndroidRuntime(17609): at com.google.gson.internal.$Gson$Types.resolve($Gson$Types.java:375)
05-24 15:09:55.782: E/AndroidRuntime(17609): at com.google.gson.internal.$Gson$Types.resolve($Gson$Types.java:380)
05-24 15:09:55.782: E/AndroidRuntime(17609): at com.google.gson.internal.$Gson$Types.resolve($Gson$Types.java:375)
05-24 15:09:55.782: E/AndroidRuntime(17609): at com.google.gson.internal.$Gson$Types.resolve($Gson$Types.java:380)
05-24 15:09:55.782: E/AndroidRuntime(17609): at com.google.gson.internal.$Gson$Types.resolve($Gson$Types.java:375)
05-24 15:09:55.782: E/AndroidRuntime(17609): at com.google.gson.internal.$Gson$Types.resolve($Gson$Types.java:380)
05-24 15:09:55.782: E/AndroidRuntime(17609): at com.google.gson.internal.$Gson$Types.resolve($Gson$Types.java:375)
05-24 15:09:55.782: E/AndroidRuntime(17609): at com.google.gson.internal.$Gson$Types.resolve($Gson$Types.java:380)
05-24 15:09:55.782: E/AndroidRuntime(17609): at com.google.gson.internal.$Gson$Types.resolve($Gson$Types.java:375)
05-24 15:09:55.782: E/AndroidRuntime(17609): at com.google.gson.internal.$Gson$Types.resolve($Gson$Types.java:380)
05-24 15:09:55.782: E/AndroidRuntime(17609): at com.google.gson.internal.$Gson$Types.resolve($Gson$Types.java:375)
05-24 15:09:55.782: E/AndroidRuntime(17609): at com.google.gson.internal.$Gson$Types.resolve($Gson$Types.java:380)
05-24 15:09:55.782: E/AndroidRuntime(17609): at com.google.gson.internal.$Gson$Types.resolve($Gson$Types.java:375)
05-24 15:09:55.782: E/AndroidRuntime(17609): at com.google.gson.internal.$Gson$Types.resolve($Gson$Types.java:380)
05-24 15:09:55.782: E/AndroidRuntime(17609): at com.google.gson.internal.$Gson$Types.resolve($Gson$Types.java:375)
05-24 15:09:55.782: E/AndroidRuntime(17609): at com.google.gson.internal.$Gson$Types.resolve($Gson$Types.java:380)
05-24 15:09:55.782: E/AndroidRuntime(17609): at com.google.gson.internal.$Gson$Types.resolve($Gson$Types.java:355)
05-24 15:09:55.782: E/AndroidRuntime(17609): at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory.getBoundFields(ReflectiveTypeAdapterFactory.java:117)

[Question] Synchronous writes and performance considerations

The docs of QueueFile mention that "writes are synchronous". Since Tape advertises itself as being lightning fast, I wonder how fast this really is compared to, say, a relational database such as SQLite? Performing disk I/O with every insert doesn't exactly sound fast to me, and it's the reason why for instance SQLite use page caches and Android's SharedPreferences introduced apply for eventual consistency with the underlying file system.

Have there been measurements made around how fast Tape really is compared to any of the mentioned persistence options? Any concerns around event/task frequency when appending to the log?

We are evaluating Tape as a simple persistence layer for tracking events. The API lends itself very well to the kind of interaction we're aiming for, but I'm concerned about performance since we will perform multiple writes per second e.g. when audio is playing. This wasn't a problem with SQLite for above mentioned reasons. Would you advise to manually buffer these events in memory first, then flush them to disk every now and then?

Add QueueFile#remove(int) and FileObjectQueue#remove(int)

Proposing this as the counterpart to #61, which lets us read an arbitrary number of items from QueueFile. Once those elements are processed, we could remove those elements all at once from the queue

List<Foo> list = fileObjectQueue.peek(max);
process(list);
fileObjectQueue.remove(list.size());

Read/Write speed

Using the following simple test case:


    public static void main(String ...strings) throws Exception {

        File file = new File("test.queue");
        QueueFile queue = new QueueFile(file);      
        Date start = new Date();
        for (int i=0; i < 1000; i++) {
            queue.add(("message " + i ).getBytes("utf8"));
        }
        System.out.println("completed ADDS in : " + (new Date().getTime()-start.getTime()));
        start = new Date();
        for (int i=0; i < 1000; i++) {
            queue.remove();
        }
        System.out.println("completed REMOVES in : " + (new Date().getTime()-start.getTime()));
    }

gives results:

completed Adds in : 28146
completed removes in : 9132

thats almost 40 seconds for 1000 adds/removes. Am I missing something, is this expected? iostat shows disk usage at 95-100% during that time. (running recent ubuntu, non-ssd disk)

About data file corruption

I read this from README.md:

underlying file is structured to survive process and even system crashes

But after i have a look at the code, found that append message to queue is not atomic, because there are two write operations, firstly you write data, and then update the header data. I wonder that is it possible that the OS IO scheduler reorder the two write operation? If yes, file corruption may occur when system crashes.

Thanks & Best Regards!

Add,Remove at index x

I know this would require a large change/overhaul as the current implementation is based off a queue not a list.

But as a future improvement it would be nice to be able to insert Tasks in a certain points in the queue. Or at the very least, a kind of RunNext where we can prepend the Task to run before it moves on the the remaining queue.

Return removed item in onRemove()

In ObjectQueue, onAdd(ObjectQueue<Channel> queue, Channel channel) returns the added item, but onRemove(ObjectQueue<Channel> queue) does not. The same, and contrary to many Java collection classes, remove() does not return the removed item either.

I understand this is an optimization for speed, but what's the sense of onRemove() if it doesn't tell me which item has been removed? Normally, I would want to use the listener to sync the queue with the UI, or similar. As you have it now, I need to do that manually for remove, which is not very clean.

You may consider this change for 1.3.0.

tape:1.3.0 not released

Change log is update for version 1.3.0 and is set as 2015-01-06

Releases only show up to 1.2.3

compile 'com.squareup:tape:1.3.0' is not working

StackOverflowError

I Need you suggest

Queue Info

FileObjectQueue >> FileObjectQueue{queueFile=QueueFile[fileLength=524288, size=94, first=Element[position = 16, length = 3198], last=Element[position = 280547, length = 271], element lengths=[3198, 2750, 2951, 3006, 2453, 2994, 2685, 2868, 2585, 2638, 2589, 2510, 2461, 3178, 2722, 3018, 2720, 2872, 2983, 3436, 3100, 3198, 2643, 2552, 2757, 3643, 3663, 3494, 3455, 3113, 3550, 3513, 3354, 3421, 3131, 2738, 2894, 3097, 2512, 2493, 2512, 2484, 2508, 2492, 1436, 2770, 2845, 2900, 2725, 3130, 2793, 3035, 2743, 2754, 2901, 3451, 3137, 3437, 3070, 3164, 3093, 3270, 3021, 3142, 3516, 3458, 3404, 3454, 3435, 3469, 3567, 1359, 3122, 2800, 2746, 1090, 2872, 2892, 3564, 3087, 3506, 3511, 3359, 3491, 3469, 3502, 3527, 3528, 3477, 3496, 3572, 3475, 2660, 271]]}

Error Log

java.lang.StackOverflowError
            at java.io.BufferedInputStream.read(BufferedInputStream.java:252)
            at libcore.io.Streams.readFully(Streams.java:81)
            at java.io.DataInputStream.readShort(DataInputStream.java:152)
            at java.io.DataInputStream.readUnsignedShort(DataInputStream.java:165)
            at java.io.DataInputStream.readUTF(DataInputStream.java:169)
            at java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:1707)
            at java.io.ObjectInputStream.readNewClassDesc(ObjectInputStream.java:1637)
            at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:658)
            at java.io.ObjectInputStream.readNewArray(ObjectInputStream.java:1421)
            at java.io.ObjectInputStream.readNonPrimitiveContent(ObjectInputStream.java:760)
            at java.io.ObjectInputStream.readObject(ObjectInputStream.java:1988)
            at java.io.ObjectInputStream.readObject(ObjectInputStream.java:1945)
            at java.io.ObjectInputStream.readFieldValues(ObjectInputStream.java:1116)
            at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:455)
            at java.io.ObjectInputStream.readObjectForClass(ObjectInputStream.java:1348)
            at java.io.ObjectInputStream.readHierarchy(ObjectInputStream.java:1245)
            at java.io.ObjectInputStream.readNewObject(ObjectInputStream.java:1840)
            at java.io.ObjectInputStream.readNonPrimitiveContent(ObjectInputStream.java:762)
            at java.io.ObjectInputStream.readObject(ObjectInputStream.java:1988)
            at java.io.ObjectInputStream.readUnshared(ObjectInputStream.java:1960)
            at com.squareup.tape.SerializedConverter.deserialize(SerializedConverter.java:27)
            at com.squareup.tape.SerializedConverter.from(SerializedConverter.java:38)
            at com.squareup.tape.SerializedConverter.from(SerializedConverter.java:20)
            at com.squareup.tape.FileObjectQueue.peek(FileObjectQueue.java:61)
            at com.squareup.tape.TaskQueue.peek(TaskQueue.java:29)
         at PerformanceRequestService.executeNext(PerformanceRequestService.java:49)
         at PerformanceRequestService.onSuccess(PerformanceRequestService.java:62)
         at PerformanceRequestTask.execute(PerformanceRequestTask.java:83)
         at PerformanceRequestService.executeNext(PerformanceRequestService.java:52)
         at PerformanceRequestService.onSuccess(PerformanceRequestService.java:62)
         at PerformanceRequestTask.execute(PerformanceRequestTask.java:83)
         at PerformanceRequestService.executeNext(PerformanceRequestService.java:52)
         at PerformanceRequestService.onSuccess(PerformanceRequestService.java:62)
         at PerformanceRequestTask.execute(PerformanceRequestTask.java:83)
         at PerformanceRequestService.executeNext(PerformanceRequestService.java:52)
         at PerformanceRequestService.onSuccess(PerformanceRequestService.java:62)
         at PerformanceRequestTask.execute(PerformanceRequestTask.java:83)
         at PerformanceRequestService.executeNext(PerformanceRequestService.java:52)
         at PerformanceRequestService.onSuccess(PerformanceRequestService.java:62)
         at PerformanceRequestTask.execute(PerformanceRequestTask.java:83)
         at PerformanceRequestService.executeNext(PerformanceRequestService.java:52)
         at PerformanceRequestService.onSuccess(PerformanceRequestService.java:62)
         at PerformanceRequestTask.execute(PerformanceRequestTask.java:83)
         at PerformanceRequestService.executeNext(PerformanceRequestService.java:52)
         at PerformanceRequestService.onSuccess(PerformanceRequestService.java:62)
         at PerformanceRequestTask.execute(PerformanceRequestTask.java:83)

Concurrent issue for TaskQueue

Hi, we implement a task queue inherited from TaskQueue. We use FileObjectQueue as delegate. And In our project we have two android service to manipulate this TaskQueue. The two process would work as the same time. And I saw that there is comment said that "Not safe for concurrent use." in TaskQueue. So my question is in our usage, we would manipulate the TaskQueue concurrently. Will our delegate file crash or any concurrent issues happens?? Should we add any protection for TaskQueue manipulation??

QueueFile is throwing IOException: Underlying input stream returned zero bytes

Pardon if I am not using this correctly, but I can't seem to get this to work. I'm trying to use Guava's CharStreams.toString to simplify reading..

public class TapeTestApp {

    public static void main(String[] args) throws Exception {
        QueueFile q = new QueueFile(new File("test.txt"));

        q.add("One\n".getBytes());
        q.add("Two\n".getBytes());

        ElementReader reader = new ElementReader() {

            @Override
            public void read(InputStream in, int length) throws IOException {

                String stringFromStream = CharStreams.toString(new InputStreamReader(in, "UTF-8"));
                System.out.println(stringFromStream);

        };

        q.peek(reader);
    }
}

Enable clear() on ObjectQueue

It would be useful if I could clear and remove all outstanding tasks from the TaskQueue and reset the QueueFile could we add clear() to ObjectQueue interface and implementations to *Queues?

Unit test improvement

It would be good to also check the final block "5" for QueueFileTest.testFileExpansionCorrectlyMovesElements

i.e. byte[] expectedBlockNumbers = {2, 3, 4, 6, 7, 8, 5};

Add ability to close underlying file in FileObjectQueue

I use FileObjectQueue in Fragment and when I close Aactivity I'm running into this:

A resource was acquired at attached stack trace but never released. See java.io.Closeable for information on avoiding resource leaks.
java.lang.Throwable: Explicit termination method 'close' not called
at dalvik.system.CloseGuard.open(CloseGuard.java:184)
at java.io.RandomAccessFile.(RandomAccessFile.java:128)
at com.squareup.tape.QueueFile.open(QueueFile.java:215)
at com.squareup.tape.QueueFile.(QueueFile.java:116)
at com.squareup.tape.FileObjectQueue.(FileObjectQueue.java:35)

Expose asList() on ObjectQueue

The API currently only exposes the latest task. Having an ability to query the queue to determine which operations are currently queued would be extremely helpful.

Suggestion: Support deque-like operations?

Hello, and thanks for this great library.

Given it is implemented as a circular buffer, it would be (reasonably) easy to support deque-like operations. E.g.: addFront/addBack, peekFront/peekBack, removeFront/removeBack, iterator/descendingIterator`

From what I can tell, doing it would require changes to the file format.
E.g., we can add the length of each element twice, both before and after the payload, so that the list can be traversed on both directions.

QueueFile hangs on a 'corrupt' file

Still researching how we got here, but we have a file filled with zeros, even the header. This causes QueueFile.expandIfNecessary to never complete, it's basically adding the fileLength each time through.

I propose we add an additional check on fileLength when we read it in readHeader. If zero (and less than zero? java ints are signed, after all) it would throw an IOException, failing fast.

Update maven?

The package on maven is 2 years old (v1.2.3).
There has been o lot of development since, can we have a new version on Maven?
(Or is the current version considered unstable?)

Peek throw com.google.gson.JsonSyntaxException

Hey !

Sometimes, in production, I will get something wrong and an object will be stuck. The Queue is going to throw a com.google.gson.JsonSyntaxException each time this object is peek()

Do you have any idea why ?

For now, all I can do is ask the user to clear data cache, but some datas are lost each time.

Thanks !

`QueueFile` Logic Error

Contains the following:

fileLength = INITIAL_LENGTH;
if (fileLength > INITIAL_LENGTH) setLength(INITIAL_LENGTH);

Support for large queues

Right now, queue file sizes are limited to 2G max and element counts < Integer.MAX_VALUE. It would be nice to raise this limit and support larger file sizes. The queue files wouldn't be compatible with older files since the pointers would all have to be longs of course.

FileException shouldn't be unchecked

In various places an IOException is promoted to a FileException. This means that when there's an issue in the underlying file for whatever reason, tape will throw an unchecked exception. Unless the caller is careful to remember to wrap a try/catch around calls into tape functions, tape may unexpectedly kill the app.

(While it's true that there's little tape can do to recover from IOExceptions, it'd be nice to encourage the app to decide how to recover from these "unrecoverable" sorts of errors -- for example, the app could inform the user that it just lost some of their data but is attempting to recover. When the alternative is just to crash, that doesn't seem so bad.)

PS: I'm relatively new to Java, so feel free to just close this if I'm reopening some age-old debate that you've already decided upon. But it seems at least "Effective Java" says that runtime exceptions should only be used for programming errors.

Drop Serializable from Task

We shouldn't mandate serializable. Note: For downstream classes, this is probably a non-compatible change; if they were depending on Serialization then they will need to implement serializable themselves.

You can deserialize non-serializable classes, but I need to do a refresher on whether this changes semantics of the process.

Track execution of the task

I briefly investigated tape framework and didn't found any possibility to track execution of the tasks published to queue. So my question is it possible to achieve it?

Maybe this is out of scope of this library and such functionality is up to each particular application that uses tape framework, but I would like to have such possibility.

Hi I would solve it in case if tape missing this feature and would be happy if this may become a part of the framework.

  1. For instance add an additional field which indicates the state of the task (but it will require further updates of this file)
  2. Store and additional file (something like this {task_id}.inprogress) while task is running and remove it at the end of execution

This information will be helpful if somebody wants to check if task is already started or not

Implementing Proguard with SquareTape

I'm trying to do a sample prototype of this project with the implementation of Proguard. However, I'm running into issues with Dagger in release mode (debug mode build work fine):

No injectable members on com.example.squaretapetest.d. Do you want to add an injectable constructor? required by com.example.squaretapetest.d com.example.squaretapetest.SampleActivity.a
No injectable members on com.b.a.b. Do you want to add an injectable constructor? required by com.b.a.b com.example.squaretapetest.SampleActivity.b
at dagger.internal.ThrowingErrorHandler.handleErrors(ThrowingErrorHandler.java:34)
at dagger.internal.Linker.linkRequested(Linker.java:136)
at dagger.ObjectGraph.getEntryPointBinding(ObjectGraph.java:264)
at dagger.ObjectGraph.inject(ObjectGraph.java:238)

Would anyone happen to have suggestions? Thank you!

java.io.EOFException creating a FileObjectQueue

I feel like I am doing something wrong. I get the following exception when trying to create a FileObjectQueue:

java.io.EOFException
        at java.io.RandomAccessFile.readFully(Unknown Source) ~[?:1.8.0_92]
        at java.io.RandomAccessFile.readFully(Unknown Source) ~[?:1.8.0_92]
        at com.squareup.tape.QueueFile.readHeader(QueueFile.java:160) ~[tape-1.2.3.jar!/:?]
        at com.squareup.tape.QueueFile.<init>(QueueFile.java:117) ~[tape-1.2.3.jar!/:?]
        at com.squareup.tape.FileObjectQueue.<init>(FileObjectQueue.java:35) ~[tape-1.2.3.jar!/:?]

I'm trying to use a temporary file for the queue:

File file = File.createTempFile(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString());

And then I create the queue:

ObjectQueue<Message> queue = new FileObjectQueue<Message>(file, new GsonConverter<Message>(gson, Message.class));

I changed to the InMemoryObjectQueue and it works fine. I'm using Java 8 and currently on a Windows VM at the moment. I'm using version 1.2.3. Any ideas what I could be doing wrong? Thanks

Clarification on transactionality

I generally understand how transactionality is preserved, by writing ("comitting") the header after writing data. However, I'm not clear on how it's implemented and the disclaimer about atomic segment writes.

According to @crazybob (https://www.parleys.com/tutorial/5148922b0364bc17fc56c97b/chapter15) there are 2 fsyncs :

  1. write data
  2. fsync
  3. write header
  4. fsync

But I don't see any explicit call to sync. Is this because opening the file in "rwd" is equivalent to an explicit sync ? According to the RandomAccessFile javadocs, "rwd" mode is

" If the file resides on a local storage device then when an invocation of a method of this class returns it is guaranteed that all changes made to the file by that invocation will have been written to that device. "

If that is the case what is the need for this disclaimer ?

"The current implementation is built for file systems that support atomic segment writes (like YAFFS)."

Or is it the case that even with "rwd", the write is not atomic, in which case the javadoc is wrong ??

Feels like I'm missing something. Thanks.

EROFS (Read-only file system) errors

In a FooService that uploads changes to the server using Tape, I get quite a few exceptions from user installations that look like this:

Caused by: java.io.FileNotFoundException: /data/data/com.example.foo/files/some_queue: open failed: EROFS (Read-only file system)
at libcore.io.IoBridge.open(IoBridge.java:409)
at java.io.RandomAccessFile.(RandomAccessFile.java:118)
at com.squareup.tape.QueueFile.open(QueueFile.java:206)
at com.squareup.tape.QueueFile.(QueueFile.java:109)
at com.squareup.tape.FileObjectQueue.(FileObjectQueue.java:35)
at com.example.foo.FooService.onCreate(FooService.java:123)
... X more Caused by: libcore.io.ErrnoException: open failed: EROFS (Read-only file system)
at libcore.io.Posix.open(Native Method)
at libcore.io.BlockGuardOs.open(BlockGuardOs.java:110)
at libcore.io.IoBridge.open(IoBridge.java:393)

The queue is initialized like this:

FileObjectQueue.Converter<Payload> converter = new GsonConverter<Payload>(gson, Payload.class);
File queueFile = new File(getFilesDir(), "some_queue");
queue = new FileObjectQueue<Payload>(queueFile, converter);

In addition, we read and write often from the files subdir using context.openFileOutput / openFileInput and don't see these errors, which puzzles me.

I'm assuming that the FS is in a read-only state for some random reason, and that the app won't work and there isn't much to do about it regardless of Tape, but I'm wondering if anyone else has seen this with Tape.

Create New Sample Application

The current one relies on imgur whose API is not that stable and our API key might no longer be working. Lets come up with a good, small, clear sample that newcomers can learn from easily.

Thread safety using RX-Java

Hey guys,

I am using tape in an intent service. To provide thread safe interface, I thought of using RX-Java with single threaded scheduler and performing all the operations on same thread by wrapping all the calls with subscribeon. An example of API.

// Trying to synchronize QueueFile by using single threaded scheduler.
// It is currently very rough.
public final class ObservableQueueFile<T> {

    @NonNull private final Scheduler objectQueueScheduler =
            Schedulers.from(Executors.newSingleThreadExecutor());

    @NonNull final private ObjectQueue<T> objectQueue;

    public ObservableQueueFile(@NonNull final ObjectQueue<T> objectQueue) {
        this.objectQueue = objectQueue;
    }

    public Observable<Integer> size() {
        return Observable.defer(new Func0<Observable<Integer>>() {
            @Override
            public Observable<Integer> call() {
                return Observable.just(objectQueue.size());
            }
        }).subscribeOn(objectQueueScheduler);
    }

    public Observable<Void> add(final T entry) {
        return Observable.fromCallable(new Callable<Void>() {
            @Override
            public Void call() throws Exception {
                objectQueue.add(entry);
                return null;
            }
        }).subscribeOn(objectQueueScheduler);
    }

    public Observable<T> peek() {
        return Observable.defer(new Func0<Observable<T>>() {
            @Override
            public Observable<T> call() {
                return Observable.just(objectQueue.peek());
            }
        }).subscribeOn(objectQueueScheduler);
    }

    public Observable<Void> remove() {
        return Observable.fromCallable(new Callable<Void>() {
            @Override
            public Void call() throws Exception {
                objectQueue.remove();
                return null;
            }
        }).subscribeOn(objectQueueScheduler);
    }

    public Observable<Void> setListener(final Listener<T> listener) {
        return Observable.fromCallable(new Callable<Void>() {
            @Override
            public Void call() throws Exception {
                objectQueue.setListener(listener);
                return null;
            }
        }).subscribeOn(objectQueueScheduler);
    }
}

Just wanted to get feedback on the api and if I am missing something. Also, if it is useful enough to be part of tape with optional dependency on rx-java? I will be happy to contribute.

Retrieve All Tasks

Expose an API that allows clients to retrieve all elements in the queue, perform an operation, and clear the queue in one atomic operation.

A use case would be when the server can accept a batched operation, and it's more network efficient to perform one large post request, over numerous smaller ones.

This would simply add the ability to see all tasks in the queue and clear the queue. Essentially combining #25 and #26.

Prevent concurrent modification while iterating with forEach.

It conflicts with #remove


    public static void main(String[] args) throws IOException, InterruptedException {
        File file = new File("temp");
        file.delete();

        final QueueFile queueFile = new QueueFile(file);
        System.out.println("adding");
        int i = 0;
        while (i < 50) {
            try {
                queueFile.add(String.valueOf(++i).getBytes());
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        System.out.println("reading");

        final AtomicLong atomicLong = new AtomicLong();
        read(queueFile, atomicLong);
        read(queueFile, atomicLong);
        read(queueFile, atomicLong);
        read(queueFile, atomicLong);
    }

    protected static void read(final QueueFile queueFile, final AtomicLong atomicLong) throws IOException {
        queueFile.forEach(new QueueFile.ElementVisitor() {
            @Override
            public boolean read(InputStream in, int length) throws IOException {
                byte[] bytes = new byte[length];
                in.read(bytes);
                // System.out.println(new String(bytes));

                queueFile.remove();
                atomicLong.incrementAndGet();
                return true;
            }
        });
        System.out.println("total read " + atomicLong.get());
    }

NPE occur after pic an image

java.lang.NullPointerException
at org.apache.harmony.luni.platform.OSFileSystem.open(OSFileSystem.java:150)
at java.io.FileInputStream.(FileInputStream.java:82)
at com.squareup.tape.sample.HttpRequest.part(HttpRequest.java:2630)
at com.squareup.tape.sample.HttpRequest.part(HttpRequest.java:2612)
at com.squareup.tape.sample.HttpRequest.part(HttpRequest.java:2598)
at com.squareup.tape.sample.ImageUploadTask$1.run(ImageUploadTask.java:44)
at java.lang.Thread.run(Thread.java:1096)

Better support for other filesystems

From the Javadoc in QueueFile.java:

The current implementation is built for file systems that support atomic segment writes (like YAFFS). Most conventional file systems don't support this; if the power goes out while writing a segment, the segment will contain garbage and the file will be corrupt. We'll add journaling support so this class can be used with more file systems later.

How to leverage Tape as disk based BlockingQueue ?

I would like to leverage Tape as a BlockingQueue for Executor. ( for Java not Android )

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html#ThreadPoolExecutor(int,%20int,%20long,%20java.util.concurrent.TimeUnit,%20java.util.concurrent.BlockingQueue)

Can we leverage Tape for this ? As I can see Tape has a proprietary API. As you have begun Tape2 could this be explored to match BlockingQueue API with Tape ?

I could see someone has already implemented disk based BlockingQueue.
But it's implementation is messy and maintained, unlike Tape.

https://github.com/Vijay2win/FilebackedBlockingQueue

What is the equivalent of poll or dequeue method

What to do if one need to consume the queue also. Trying to do pub-sub using this.
I know we can easily write one using queue.remove. Still I believe it would be quick task but add meaning to lib.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.