Giter Club home page Giter Club logo

Comments (11)

groue avatar groue commented on September 1, 2024

This issue is about RxGRDB v0.7.0, and contains outdated information. Please refer to the Scheduling documentation chapter for fresh information about RxGRDB scheduling.

Original comments below


Hello @mayurdhaka-suryasoftware

I get a crash that reads 'Database was not used on the correct thread'. From what I understand, I am using the db object on a separate thread than the one it was initiated on (dbQueue).

You understand well. And GRDB absolutely prevents database connections from being used outside of their dedicated dispatch queue with a fatal error. There's no escape to this rule, which is the golden rule of GRDB thread-safety.

But I wanted to know if there is a way to achieve what I'm trying to do because the work I'm doing in my map block is something I'd like to get off the main thread.

That's a very interesting question, one that deserves a clear explanation.

Let's first "fix" your sample code by fetching from the dedicated dispatch queue:

dbQueue.rx
  .changes(in: [Project.all()])
  .map { db in
    let foo = Row.fetchAll(db, mySQLQuery)
    ...
  }

Let's try to find out on which thread the fetch happens.

dbQueue.rx.changes is documented to notify of changes on the "database writer dispatch queue". You are using a DatabaseQueue, so this dispatch queue is a unique serial dispatch queue dedicated to this database.

We thus know on which dispatch queue we are. But we don't know yet if the main thread is blocked, or not. And we remember that you want to avoid blocking the main thread. So let's keep on investigating.

dbQueue.rx.changes emit its elements as soon as a transaction has committed relevant database changes. If that database transaction has been performed from the main thread, then the fetch indeed blocks the main thread:

// From the main thread, synchronously execute a database transaction
dbQueue.inTransaction { db in
  try Project(...).insert(db)
  try ...
  return .commit // Triggers dbQueue.rx.changes
}

// Still on the main thread. Now the transaction has been committed, and
// the observable has fetched its results.

On the other side, if the transaction is executed from some other thread, then the main thread is not blocked at all:

// On some thread which is not the main thread
dbQueue.inTransaction { db in
  try Project(...).insert(db)
  try ...
  return .commit // Triggers dbQueue.rx.changes
}

// Still on some thread, which was blocked until the observable would
// fetch its results. The main thread did not see anything.

We now understand that the main thread may, or may not be blocked, depending on the threads that use the dbQueue.

And that's the absolute best you can achieve with DatabaseQueue.

So how to you absolutely avoid the main thread from being blocked?

Replace DatabaseQueue with DatabasePool.

A database pool is dedicated to efficient multi-threading, unlike a database queue which is designed to be as simple as possible. A database pool is a little more involved than a database queue, so please have a look at the reading list at the bottom of this answer. But a database pool opens new multi-threading horizons. This is precisely its job.

This is how a database pool solves your issue:

let processingQueue = DispatchQueue(label: "processing", qos: .userInitiated)
processingQueue.async { // or sync, you choose
    dbPool.rx
        .changeTokens(in: [Project.all()])
        .mapFetch(resultQueue: processingQueue) { db in
            // fetch
            return Row.fetchAll(db, mySQLQuery)
        }
        .subscribe(onNext: { foo in
            // process fetched results
            ...
        })
}

Yes that's a big rewrite (and by writing it, I realize that I may have to improve that - suggestions are welcome). But please bear with me:

First, the main thread no longer has to wait for the fetch to complete after it has performed a transaction:

// From the main thread, synchronously execute a database transaction
dbPool.writeInTransaction { db in
  try Project(...).insert(db)
  try ...
  return .commit // Triggers dbQueue.rx.changes
}

// Still on the main thread. Now the transaction has been committed, and
// the observable concurrently fetches and processes its results off
// the main thread.

Second, the main thread is not blocked when it reads from the database, even if the observable is fetching its own results:

// From the main thread: not blocked by the eventual fetch performed by the observable
dbPool.read { db in
  // fetch something
}

Third, you are now guaranteed that your processing of the fetched data happens off the main thread (in the "process queue").

For this to work well, you must think harder about dispatch queues. RxGRDB may improve in the future, but right now, this is how you avoid blocking the main thread:

// This serial dispatch queue is required for RxGRDB to guarantee the correct
// ordering of change notifications off the main thread.
let processingQueue = DispatchQueue(label: "processing", qos: .userInitiated)

// Observable must be subscribed on the processing queue:
processingQueue.async { // or sync, you choose
    
    dbPool.rx
        
        // Use `changeTokens` (not `changes`)
        .changeTokens(in: [Project.all()])
        
        // Use `mapFetch` (not `map`), and ask to get results
        // on the processing queue.
        .mapFetch(resultQueue: processingQueue) { db in
            // Fetching itself does not happen on the processing queue, but
            // on a database reading dispatch queue. You can process fetched
            // results here, but you may have better returning the fetched
            // results for later processing on the processing queue.
            return Row.fetchAll(db, mySQLQuery)
        }
        
        .subscribe(onNext: { foo in
            // Now we're on the processing queue: you can process fetched results
            ...
        })
}

If you want a reading list:

from rxgrdb.

groue avatar groue commented on September 1, 2024

It is worth noting that if database pools do allow a very precise and efficient use of threads, most applications do not need that.

A general good advice is that before fixing a performance problem, one first has to experience it, and run benchmarks in order to precisely locate the bottleneck.

So: if my answer above does provide a robust solution to your question, I suggest you to keep on using a database queue. Yes, the main thread will be blocked sometimes. But for how long? A couple milliseconds is not a big deal. SQLite is very fast, and GRBD is fast. Also, remember that the main queue is not blocked if a database queue is used on some background thread (look for "On the other side..." in my previous comment).

I'm there if you have any other question.

from rxgrdb.

groue avatar groue commented on September 1, 2024

Here a third answer. This will help me write a documentation chapter about the main thread eventually 😉

There's more to say about plain simple database queues.

After RxGRDB notifies changes, one generally fetches and processes results.

Sometimes one can fetch, and then process. Sometimes fetching and processing have to happen together. But what is slow and should be done off the main thread? Is it the fetch? Is it the processing? Is it both?

Often the fetch is quite fast. When fetching and processing can be separated, you can accept that fetches may shortly block the main thread, and perform the slow processing of the fetched results off the main thread:

// Example 1
request.rx
    .fetchAll(in: dbQueue)
    .observeOn(ConcurrentDispatchQueueScheduler(qos: .userInitiated))
    .map { fetchedResults in
        // process off the main thread
        ...
    }

// Example 2
dbQueue.rx
    .changeTokens(in: [Project.all()]
    .mapFetch { db in return <fetched results> }
    .observeOn(ConcurrentDispatchQueueScheduler(qos: .userInitiated))
    .map { fetchedResults in
        // process off the main thread
        ...
    }

// General pattern
<Observable of fetched values>
    .observeOn(ConcurrentDispatchQueueScheduler(qos: .userInitiated))
    .map { fetchedResults in
        // process off the main thread
        ...
    }

Finally, I suggest that you read again the RxRGDB documentation, and focus on the synchronizedStart (default true) and resultQueue (default main queue) parameters.

That's all for now :-)

from rxgrdb.

mayurdhaka-suryasoftware avatar mayurdhaka-suryasoftware commented on September 1, 2024

Hey @groue

Thanks as always for taking the time to answer what's going on in great detail.

I get the general picture of what's going on but I'm also going through the reading material you've provided to refresh my memory and get a better understanding of the way GRDB works.

Meanwhile, I've been facing a related problem with something you mentioned in your answers. You wrote:

dbQueue.rx.changes emit its elements as soon as a transaction has committed relevant database changes. If that database transaction has been performed from the main thread, then the fetch indeed blocks the main thread

I've observed this happening in my app that lead to a couple of bugs. Here's what I did:

  1. At some point in my app, I subscribed to a list of projects like so:
dbQueue.rx.changes(in: [Project.all()])
   .map {
     // Do my mapping work
  }..subscribe(onNext: { foo in
            // Update list of projects with foo
   })
  1. From another part of my app, I have a piece of code like so:
dbQueue.inTransaction { db in
  try Project(...).insert(db)
  return .commit // Triggers dbQueue.rx.changes
}

As you said, if I write to the database with my code in point 2. on the main thread, the observable also triggers on the main thread.
But, what I found was, if I write to the database on a background thread, the observable is triggered on the background thread too? So if I change the code in point 2. to be:

DispatchQueue(label: "Foo", qos: .userInitiated).async {
    dbQueue.inTransaction { db in
        try Project(...).insert(db)
         return .commit // Triggers dbQueue.rx.changes
    }
}

then the observable is also triggered on the DispatchQueue identified as Foo.

This doesn't make sense to be because I'm using dbQueue to subscribe to the observable, and I'm using a dbQueue to write to the database too. So why is the queue I make the write to the database, also end up being the queue on which the observable is triggered? And what are the sequence of steps GRDB/RxGRDB is executing in to ensure this thread consistency?
My best guess is dbQueue isn't really a regular DispatchQueue? Clearly I'm missing something here. It would be very helpful if you could elaborate why this happens?
(PS: I got around this problem by explicitly subscribing on the main queue with observeOn.)

from rxgrdb.

groue avatar groue commented on September 1, 2024

Hey @groue

Thanks as always for taking the time to answer what's going on in great detail.

You're welcome! I'm glad you have opened this issue. It is very useful. It will help improving both ergonomics and documentation of the library.

I get the general picture of what's going on but I'm also going through the reading material you've provided to refresh my memory and get a better understanding of the way GRDB works.

Appreciated :-) Scheduling is a touchy subject indeed.

Meanwhile, I've been facing a related problem with something you mentioned in your answers. You wrote:

dbQueue.rx.changes emit its elements as soon as a transaction has committed relevant database changes. If that database transaction has been performed from the main thread, then the fetch indeed blocks the main thread

I've observed this happening in my app that lead to a couple of bugs.

Bugs? OK, I read carefully:

Here's what I did:

  1. At some point in my app, I subscribed to a list of projects like so:
dbQueue.rx.changes(in: [Project.all()])
   .map {
     // Do my mapping work
  }..subscribe(onNext: { foo in
            // Update list of projects with foo
   })
  1. From another part of my app, I have a piece of code like so:
dbQueue.inTransaction { db in
  try Project(...).insert(db)
  return .commit // Triggers dbQueue.rx.changes
}

As you said, if I write to the database with my code in point 2. on the main thread, the observable also triggers on the main thread.

Yes.

But, what I found was, if I write to the database on a background thread, the observable is triggered on the background thread too? So if I change the code in point 2. to be:

DispatchQueue(label: "Foo", qos: .userInitiated).async {
    dbQueue.inTransaction { db in
        try Project(...).insert(db)
         return .commit // Triggers dbQueue.rx.changes
    }
}

then the observable is also triggered on the DispatchQueue identified as Foo.

Yes. The dbQueue.rx.changes observable is triggered on transactions commits, synchronously.

Quoting the documentation of this method:

All elements are emitted on the database writer dispatch queue, serialized with all database updates.

We'll see later that this definition makes changes an observable that maybe is not the one you need. But let's explain more before.

What is this "database writer dispatch queue"? To understand, let's quote database queue documentation:

A database queue can be used from any thread. The inDatabase and inTransaction methods are synchronous, and block the current thread until your database statements are executed in a protected dispatch queue. They safely serialize the database accesses.

The "database writer dispatch queue" is the same queue as the "protected dispatch queue" of the above paragraph.

Let's analyse your sample code:

DispatchQueue(label: "Foo", qos: .userInitiated).async {
    // We're in "Foo" here.
    dbQueue.inTransaction { db in
        // We're in the database "protected dispatch queue" here
        ...
        return .commit // (1)
    }
}

On (1), the dbQueue.rx.changes observable is triggered, from the database protected dispatch queue. Meanwhile the Foo queue is blocked, and waits for the whole dbQueue.inTransaction method to complete.

This doesn't make sense to be because I'm using dbQueue to subscribe to the observable, and I'm using a dbQueue to write to the database too. So why is the queue I make the write to the database, also end up being the queue on which the observable is triggered? And what are the sequence of steps GRDB/RxGRDB is executing in to ensure this thread consistency?

I hope that it is more clear now. There is no bug: everything behaves exactly as expected.

But this is not the behavior you expect. I thus suggest you keep on reading.

My best guess is dbQueue isn't really a regular DispatchQueue? Clearly I'm missing something here. It would be very helpful if you could elaborate why this happens?
(PS: I got around this problem by explicitly subscribing on the main queue with observeOn.)

You guessed right.

Now, maybe dbQueue.changes(in:).map() is not the reactive method your app needs. Instead, I suggest you look at an observable of fetched values. There are specific ones, such as request.fetchAll(in:). And there is the most general of all: dbQueue.changeTokens(in:).mapFetch().

All observables of fetched values emit their values on a single dispatch queue (which default to the main queue).

Project.all().rx
    .fetchAll(in: dbQueue)
    .subscribe(onNext: { projects: [Project] in
        // In the main queue: use projects here
    })

All observables of fetched values can eventually be reduced to the general dbQueue.changeTokens(in:).mapFetch(), which makes it more clear how various dispatch queues are involved:

dbQueue.rx
    .changeTokens(in: [Project.all()])
    .mapFetch { db -> [Project] in
        // In the database "protected dispatch queue":
        // Do your mapping work. For example:
        let projects = try Project.fetchAll(db)
        return projects
    }
    .subscribe(onNext: { projects: [Project] in
        // In the main queue: use projects here
    })

By using observables of fetched values, you enjoy observables that easily emit in the dispatch queue you need. RxGRDB does all the hard scheduling job for you.

To be clear, you just can not reproduce the same behavior with the changes method:

// Almost the same, but not quite:
dbQueue.rx
    .changes(in: [Project.all()])
    .map { db -> [Project] in
        // In the database "protected dispatch queue":
        // Do your mapping work. For example:
        let projects = try Project.fetchAll(db)
        return projects
    }
    .observeOn(MainScheduler.instance)
    .subscribe(onNext: { projects: [Project] in
        // In the main queue: use projects here
    })

There are two differences:

  1. With changes.map, the first element is emitted asynchronously on subscription. Fetching observables emit their first element synchronously on subscription.
  2. changes.map prevents RxGRDB from performing efficient scheduling when one uses a database pool (you can ignore this one since you are using a database queue).

The first point can be very important for many applications. For example, apps often want to synchronously update a view when it is shown, not after.

I hope I have convinced you that observables of fetched values are a neat tool. If you still think that you neeed to use changes, then please tell me more about what you want to achieve, so that I can adjust my advice.

For a finishing note, I must talk briefly of PR #18 which will ship shortly. We've seen above that fetching observables of RxGRDB emit their fetched values on the main queue by default. This won't change. What will change is the way one builds fetching observables that emit their fetched values on a different queue.

from rxgrdb.

groue avatar groue commented on September 1, 2024

A last finishing note. I believe you'd be happy with:

// Only track projects
Project.all().rx
    .fetchAll(in: dbQueue)
    .observeOn(ConcurrentDispatchQueueScheduler(qos: .userInitiated))
    .map { projects in
        // process projects off the main thread
        ...
    }

// Most feneral form
dbQueue.rx
    .changeTokens(in: [Project.all()]
    .mapFetch { db in return <fetched results> }
    .observeOn(ConcurrentDispatchQueueScheduler(qos: .userInitiated))
    .map { fetchedResults in
        // process fetched results off the main thread
        ...
    }

If not, I need more information.

from rxgrdb.

groue avatar groue commented on September 1, 2024

@mayurdhaka-suryasoftware

RxGRDB v0.8.0 is out (release notes). It has much more robust way to schedule database notifications.

And it especially ships with a brand new Scheduling documentation chapter. It is the definitive guide about RxGRDB scheduling.

I close this issue, which mostly contains outdated information now. Please open a new issue if you have any more question!

from rxgrdb.

mayurdzk avatar mayurdzk commented on September 1, 2024

Hey @groue once again, thanks a bunch for the replies!

Bugs? OK, I read carefully:

Sorry, I should've been more clear that the bugs were in my code, due to an unclear understanding of how my code was working--not with RxGRDB. 😃

As for the dbQueue queue switching issue, I've been reading through some of the documentation lately and I found this piece right in the documentation of dbQueue:

The inDatabase and inTransaction methods are synchronous, and block the current thread until your database statements are executed in a protected dispatch queue.

As it says, the current thread is blocked, and the closure is executed. Since I now understand that the observable fires the instant my inTransaction method is called, it follows that I will be on the same queue as where I initiated the write on. I understand this clearly now.

I suppose the confusing bit for me was the usage of dbQueue where I was confusing it with a regular DispatchQueue that I have access to and simply writing to the database, no matter what queue I write from, will never have any effect on the observables.

As for my issue of getting the observable to execute the map/mapFetch bit off the main queue, I found a way to perform those changes using the subscribe(on:) operator. This way, all the mapping of the observable, into a new type (from Data to ViewData, say) happens off the main thread I use and the observation of the final ViewData objects happens on the main thread--which is exactly what I needed. 🎉

Also, we have switched over to using dbPools, due to our discussions and the concurrent reads it gives us is a huge gain in snappiness of the app.

Thanks a lot for taking the time to answer these questions. Can't wait to read the new documentation.
Cheers! 😃🎉

from rxgrdb.

groue avatar groue commented on September 1, 2024

Glad to hear that! Database Pool is the jewel of GRDB.

it requires somewhat more database skills than database queues. I especially want to stress out that most database pool writes should happen through writeInTransaction. When one use plain dbPool.write { ...}, one can inadvertently expose inconsistent database states to readers. The classical example is:

// SAFE CONCURRENCY
try dbPool.writeInTransaction { db in
    try Credit(destinationAccout, amount).insert(db)
    try Debit(sourceAccount, amount).insert(db)
    return .commit
}

// UNSAFE CONCURRENCY
try dbPool.write { db in
    try Credit(destinationAccout, amount).insert(db)
    try Debit(sourceAccount, amount).insert(db)
}

More info & details at the GRDB Concurrency Guide. And happy RxSwift/GRDB/RxGRDB combo!

from rxgrdb.

mayurdzk avatar mayurdzk commented on September 1, 2024

@groue Yep, we're using writeInTransaction everywhere. Although I'm curious to know GRDB supports write at all given that a) It might be unsafe and b) writeInTransaction exists?

from rxgrdb.

groue avatar groue commented on September 1, 2024

I sometimes ask myself the very same question 😅 And I thus guess you're right. write has to turn into unsafeWrite, and join the unsafe methods of DatabaseWriter.

from rxgrdb.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.