Giter Club home page Giter Club logo

derive's People

Contributors

rwillians avatar venkatd avatar

Watchers

 avatar  avatar

derive's Issues

Best practice in OTP for calling GenServer.call on a list of processes and awaiting their results

@TurtleAI/eds

I've currently got an implementation for broadcasting a GenServer.cast to the subscribers as you see here:

@doc """
Append a list of events to the event log
"""
def broadcast(server, message),
do: GenServer.cast(server, {:broadcast, message})

I'd also like to implement a version that does a GenServer.call in parallel to each subscriber, awaits all of the calls, then returns a keyword list of the results of the form {subscriber, result}

Like
results = Broadcaster.broadcall(server, {:fetch_data, 23})

Is there a best practice for this? I've looked at the Task module but not sure what of the best way to handle this.

Accept a query struct as a selector on Derive.Ecto.Selector

Derive.Ecto.Selector should accept a %Query{} (ideally, any query-able supported by Ecto) as selector.
https://github.com/TurtleAI/derive/blob/master/lib/derive/ecto/selector.ex#L23-L31

A common use case is changing multiple rows with a single query. For example:

def handle_event(%InvoiceIssued{} = event) do
  [
    [ids: event.line_item_ids]
    |> Query.invoice_line_items() # returns a %Query{} struct
    |> update(%{status: :invoiced})
  ]
end

A possible implementation (in https://github.com/TurtleAI/derive/blob/master/lib/derive/ecto/selector.ex#L23-L31) is:

def selector_query(%Query{} = query), do: query

Doing multiple operations is likely -- although I didn't measure it -- performance impact.

Batch processing (sort of)

Most services (and perhaps some reducers, when they processes the same event "instance") requires awaiting for an event to get reduced prior to do what they need to do. In some cases, we need to await for all events that were generated by a command, not just the one event we're handling.

Therefore, we need a simple API we can call from within the service's/reducer's event-handler so that we can await the entire "batch" of events related to the one we have at-hand -- wait 'till they get handled and their changes get persisted.

At first, it could be something "simpler" (or less difficult, at least) like awaiting all events that have the key command_id with value X. But, while planing a future infra/arch need, I realized that in some cases we want to persist events directly to the event store, skipping commands. For example, when we persist events from within migrations.

I really don't want to clutter the command_id kv with trash data -- that can't be correlated to a command log nor a request log --, so I suggest we add a key to events named something like batch_id. For commands, we can make them populate this batch_id with the same value as the command_id, no problem. Or we could just generate a new id and set that value to batch_id of all events produced by that command execution. Either way would do.

Since Derive seems to be non-opinionated in regards of the structure of events, with the exception of the id field, this new await_processed/2 signature might need to accept a tuple informing both the key and the value the user (dev) wants to match.

Current API:

await_processed(list_of_events, :reducers)

Suggestion:

await_processed(list_of_events, :reducers)
await_processed({:batch_id, "123"}, :reducers)
# clear distinction between signatures -- pattern matching by list or tuple.

It's likely that derive will need some sort of registry/index of all events currently being processed and which pid/supervisor (might be many) is processing them. If that's the case, it should also be possible -- and I'd say expected -- to have the following signature as well:

await_processed(list_of_event_ids, :reducers)
# You'd have to peak into the type of the first element of the list in order to make a distinction
# between this signature vs the the one that takes a list of event structs.
# I won't suggest removing the signature that takes the list of events' structs in favor of this
# one because I guess we get performance benefits from awaiting on a struct -- I imagine it's
# easier for you to locate the processes you need to await based on the workers that handle
# that event struct. But, if the performance gain of that approach is negligible, then perhaps
# it'd be better to just replace it in favor of list of ids.

Filtering out no-ops returned by event handlers

Related to #16

In order to keep the code simpler and more readable, in some cases I personally prefer creating no-ops (or is it null-ops? -- in a reducer, it means producing an operation like an update that will do no side effect to the db). This avoids conditional creation of ops and overkill extractions with conditions and/or pattern matching.

The most common case, for me, is having a pattern match that might return null instead of an op. But there has been one case case where I used and updated that changed no fields (issue #16) -- this one was in Hatch, don't remember if there were any in TurtleOS.

Of course we could just wrap the returned list of ops with a function that filters no-ops (like reject null values or even filtering out Derive operations' structs based on some logic), therefore this isn't a blocker. But, I think this is something we could do on Derive's side the keep userland-code cleaner.

The sudo-code for what I'm suggesting would look like:

def foo (a, b, c) do
  apply(reducer_mod, :handle_event, [event])
  |> List.wrap() # in case a single op (not a list) is returned
  |> List.flatten() # in case there are nested lists, useful when you extract
                    # the conditional logic for generating subsets of events.
  |> Enum.reject(&no_op?/1)
  |> invoke_commit(reducer_mod)
end

defp no_op?(null), do: true
defp no_op?(%Update{fields: fields}) when map_size(fields) == 0, do: true
# ...
def no_op?(_), do: false

A use case example:

def handle_event(%Foo{} = event) do
  [
    maybe_delete_bar(event.baz)
  ]
end

defp maybe_delete_bar([]), do: nil
defp maybe_delete_bar([_ | _] = bar) do
  [bla: bar]
  |> Query.bars()
  |> delete()
end

Another one:

def handle_event(%Foo{} = event) do
  [
    maybe_update_bar(event)
  ]
end

defp maybe_update_bar(event) do
  transaction(fn repo ->
    current_bar = load_bar(repo, event.bar_id)
    new_bar = Map.merge(current_bar, %{a: event.bar_a, b: event.bar_b})
    changed_fields = diff(current_bar, new_bar)
    
    update({Bar, event.bar_id}, changed_fields) # there might be no fields changed and we actually have 
                                                # a use case like that. In fact, the query fails at
                                                # runtime if no fields were changed (that's issue 16).
  end)
end

It also makes it easier to do this:

def handle_event(%Foo{} = event) do
  [
    if (not is_nil(event.bar)) do
      insert(%Bar{id: event.bar_id, a: event.a, b: event.b})
    end
    # the `if` block will return null when the condition evaluates to false.
  ]
end

Edit: updated sudo-code for filtering out no-ops, now allows for this use case:

def handle_event(%Foo{} = event) do
  [
    maybe_delete_x(event.foo),
    maybe_create_y(event.bar),
    maybe_create_z(event.baz)
    # Each `maybe_*` function returns a list of 0 or more events.
    # That's why that `List.flatten/1` I added is important.
  ]
end

Dialyzer specs for behaviors

Hi @TurtleAI/eds

In several places throughout the code we pass around modules that implement behaviors.

For example here:

@spec mark_for_rebuild(Derive.Reducer.t()) :: :ok
def mark_for_rebuild(reducer) do

We the Derive.Reducer.t() typespec is just an alias to module()

However, what I really want is to accept modules that implement behaviors so I can get vscode to give me warnings if I mistype a function or pass in a module that doesn't have the desired methods.

Is this possible?

No clause matching in `Derive.PartitionDispatcher.handle_info/2`

02:01:37.726 [error] GenServer {:"Elixir.Hatch.Invoices.Service.supervisor", {Hatch.Invoices.Service, "30975810790621495645780525318144"}} terminating
** (FunctionClauseError) no function clause matching in Derive.PartitionDispatcher.handle_info/2
    (derive 0.1.0) lib/derive/partition_dispatcher.ex:80: Derive.PartitionDispatcher.handle_info({[:alias | #Reference<0.2816401368.3219980290.154724>], :ok}, %Derive.PartitionDispatcher{awaiters: [{{#PID<0.1092.0>, [:alias | #Reference<0.2816401368.3219980290.155007>]}, "30975811344411199482614974382080"}], options: %Derive.Options{batch_size: 100, logger: &Derive.Logger.IOLogger.log/1, mode: :catchup, name: Hatch.Invoices.Service, reducer: Hatch.Invoices.Service, show_progress: nil, source: :"Elixir.Hatch.Invoices.Service.source", validate_version: nil}, partition: %Derive.Partition{cursor: "30975810790842856574665039937536", error: nil, id: "30975810790621495645780525318144", status: :ok}, timeout: 30000})
    (stdlib 4.0.1) gen_server.erl:1120: :gen_server.try_dispatch/4
    (stdlib 4.0.1) gen_server.erl:1197: :gen_server.handle_msg/6
    (stdlib 4.0.1) proc_lib.erl:240: :proc_lib.init_p_do_apply/3
Last message: {[:alias | #Reference<0.2816401368.3219980290.154724>], :ok}

After a couple hours of try-and-error, somehow, I managed to get it "fixed" locally by adding this:

# PartitionDispatcher at line 88
def handle_info({_from, :ok}, %S{timeout: timeout} = state) do
  {:noreply, state, timeout}
end

I'm not sure of what I did there -- brain fried after debugging most the code base kkkry --, I just copied and past a bunch of things till get that working.

Failure was triggered when awaiting an event inside a service (weird thing is that we didn't catch that error before)

# Hatch.Invoice.Service
def handle_event(%TimeEntryCreated{} = e) do
  # commenting out this `await_processed/2` is enough to not get the error on derive but,
  # of course, the service will fail if the time entry doesn't exist in the db yet.
  await_processed([e], :reducers)

  with_time_entry(e.time_entry_id, &maybe_create_invoice_line_item/1)
end

Edit: ok, maybe that wasn't a "fix" cuz now I'm getting theses errors all over the place (although for some coincidence most tests are passing)

02:27:33.012 [error] GenServer :"Elixir.Hatch.Contracts.Service.source" terminating
** (FunctionClauseError) no function clause matching in anonymous fn/1 in Turtle.EventLog.handle_cast/2
    (turtle 0.0.1) lib/event_log.ex:102: anonymous fn(nil) in Turtle.EventLog.handle_cast/2
    (elixir 1.13.4) lib/enum.ex:4034: Enum.filter_list/2
    (turtle 0.0.1) lib/event_log.ex:102: Turtle.EventLog.handle_cast/2
    (stdlib 4.0.1) gen_server.erl:1120: :gen_server.try_dispatch/4
    (stdlib 4.0.1) gen_server.erl:1197: :gen_server.handle_msg/6
    (stdlib 4.0.1) proc_lib.erl:240: :proc_lib.init_p_do_apply/3
Last message: {:"$gen_cast", {:new_events, [nil, nil]}}
#  seems to repeat for all domains' service/reducer

The tests that seems to produce the most amount of errors is api/test/features/hatch__invoices_test.exs, at scenario "Invoices Only seller-admin in a contract can issue invoice".

Rewrite table names in an `Ecto.Query`

Hi @TurtleAI/eds

If I want to update the table for an Ecto.Schema, I can do something like this:

source = Ecto.get_meta(record, :source)
record
|> Ecto.put_meta(source: "#{dynamic_prefix}_#{source}")

Is there something that similar that can be done to an Ecto.Query?

Something like

query
|> update_sources(fn source -> "#{dynamic_prefix}_#{source}" end)

Thanks!

Including a Registry inside of a DynamicSupervisor

Hi @TurtleAI/eds

I've implemented a MapSupervisor
https://github.com/TurtleAI/derive/blob/master/lib/derive/map_supervisor.ex#L24-L44

It's used to start or lookup processes for a given key.

However, if you notice, I'm passing in a registry as the 2nd argument. I'd rather encapsulate the Registry within the MapSupervisor so I don't have to think about declaring a registry elsewhere each time.

I guess I could setup a Supervisor as I'm seeing at the link below:
https://elixir-lang.org/getting-started/mix-otp/dynamic-supervisor.html#supervision-trees

But it seems like a lot of boilerplate just for bundling a registry.

Would you recommend going this route, or are there better ways of doing this?

More robust implementation for a large number of events

Hi @TurtleAI/eds

I'm satisfied with the public api of the library, so want to start getting feedback again :)

A Dispatcher will start from the last processed event and process events until there are no more events to process. This is fine if there are a small number of events, however it's possible (for example if we are rebuild the state from scratch) that there would be on the order of 50,000 events.

Right now I recursively call a catchup function and process 100 events at a time, but I wonder if there are better ways to handle it? Should I somehow have the process to send a message to itself to process more events?

Here's, on boot, the process picks up where it left off in the event log to start processing events:

def handle_cast(:catchup_on_boot, %S{mode: mode} = state) do
new_state = catchup(state)
case mode do
:catchup -> {:noreply, new_state}
:rebuild -> {:stop, :normal, new_state}
end
end

Implementation of catchup:

defp catchup(
%S{
reducer: reducer,
source: source,
partition: %Derive.Partition{version: version} = partition,
batch_size: batch_size,
lookup_or_start: lookup_or_start
} = state
) do
case Derive.EventLog.fetch(source, {version, batch_size}) do
{[], _} ->
# done processing so return the state as is
state
{events, new_version} ->
events
|> events_by_partition_dispatcher(reducer, lookup_or_start)
|> Enum.map(fn {partition_dispatcher, events} ->
PartitionDispatcher.dispatch_events(partition_dispatcher, events)
{partition_dispatcher, events}
end)
|> Enum.each(fn {partition_dispatcher, events} ->
PartitionDispatcher.await(partition_dispatcher, events)
end)
new_partition = %{partition | version: new_version}
reducer.set_partition(new_partition)
# we have more events left to process, so we recursively call catchup
%{state | partition: new_partition}
|> catchup()
end
end

Events produced by services aren't populated `source_event_id`

Example of an event that was produced by a service and didn't get it's :source_event_id populated.

{
    "id": "30870348649259176310057341550594",
    "time": "2022-10-18T15:00:00.578187Z",
    "__type__": "Elixir.Hatch.TimeTracker.Events.TimeEntriesInvoiced",
    "command_id": null,
    "invoice_id": "30870348649203836077836212895744",
    "request_id": null,
    "time_entry_ids": [
        "30870348648779560964140893208576"
    ],
    "source_event_id": null
}

@venkatd do we need some sort of custom commit callback for our services?

Ensure `merge` doesn't try to set missing fields to nil

The following code:

    merge({Turtle.Payments.Models.PaymentIntent, e.payment_intent_id},
      status: :processing,
      amount: e.amount,
      payment_method_id: e.payment_method_id,
      payment_method_type: String.to_atom(e.payment_method_type),
      payment_method_brand: e.payment_method_brand,
      payment_method_last4: e.payment_method_last4
    )

Fails in my codebase because it's not passing along the created_at field. It's trying to set the created_at to null but that probably means there's a bug in which missing fields in the struct are mistakenly set.

I need to write a test case to reproduce this behavior

`update` operation fails when no fields needs to be changed

20:20:54.630 [error] ** (Ecto.QueryError) `update_all` requires at least one field to be updated in query:

from t0 in Hatch.TimeTracker.Models.TimeEntry,
  where: t0.id == ^"30806491569410975171616041336832",
  update: [set: []]

    (ecto 3.8.3) lib/ecto/repo/queryable.ex:205: Ecto.Repo.Queryable.execute/4
    (ecto 3.8.3) lib/ecto/multi.ex:742: Ecto.Multi.apply_operation/4
    (ecto 3.8.3) lib/ecto/multi.ex:723: Ecto.Multi.apply_operation/5
    (elixir 1.13.4) lib/enum.ex:2396: Enum."-reduce/3-lists^foldl/2-0-"/3
    (ecto 3.8.3) lib/ecto/multi.ex:697: anonymous fn/5 in Ecto.Multi.apply_operations/5
    (ecto_sql 3.8.1) lib/ecto/adapters/sql.ex:1221: anonymous fn/3 in Ecto.Adapters.SQL.checkout_or_transaction/4
    (db_connection 2.4.2) lib/db_connection.ex:865: DBConnection.transaction/3
    (ecto 3.8.3) lib/ecto/repo/transaction.ex:18: Ecto.Repo.Transaction.transaction/4
    (derive 0.1.0) lib/derive/ecto/operation/update.ex:16: anonymous fn/6 in Derive.Ecto.DbOp.Derive.Ecto.Operation.Update.to_multi/2
    (ecto 3.8.3) lib/ecto/multi.ex:723: Ecto.Multi.apply_operation/5
    (elixir 1.13.4) lib/enum.ex:2396: Enum."-reduce/3-lists^foldl/2-0-"/3
    (ecto 3.8.3) lib/ecto/multi.ex:697: anonymous fn/5 in Ecto.Multi.apply_operations/5
    (ecto_sql 3.8.1) lib/ecto/adapters/sql.ex:1221: anonymous fn/3 in Ecto.Adapters.SQL.checkout_or_transaction/4
    (db_connection 2.4.2) lib/db_connection.ex:1562: DBConnection.run_transaction/4

The problem is on https://github.com/TurtleAI/derive/blob/master/lib/derive/ecto/operation/update.ex#L21-L25

We need another pattern match where, If the map of fields is empty (map_size(fields) == 0), then we perform a no-op -- just skip and do nothing.

Detect key of operation when `Ecto.Multi` fails on an `Ecto.QueryError`

Hi @TurtleAI/eds

I'm finding that when Ecto.Repo.transaction fails due to something like a constraint error, I'm unable to find the key of the operation that failed.

I'd like to show a more helpful message that correlates an event to the Ecto operation that triggered it and getting the key would help here.

I have a try ... rescue here:

MultiOp.commit_failed(op, error)

And I rescue an error of this form:

%Ecto.QueryError{message: "deps/ecto/lib/ecto/repo/queryable.ex:166: field `missing_field` in `update` does not exist in schema DeriveEctoTest.User in query:\n\nfrom u0 in DeriveEctoTest.User,\n  where: u0.id == ^\"99\",\n  update: [set: [missing_field: ^\"stuff\"]]\n"}

Is there a way to get the name of Ecto.Multi operation that this error was triggered for?
Or if this isn't possible, maybe there's a way to validate these individual operations before executing them?

Thanks!

Weird error on boot: Turtle.Activity.Reducer needs rebuild

I'm using the test db in dev env at the moment, so that I don't have to create some records I want to access via graphiql (for some testing).

Whenever I run some test in order to create the records, when I boot the app the error occurs
Screenshot 2023-01-05 at 00 23 20

I rebuild using auto, but there doesn't seem to be any events pending for rebuild on activity reducer
Screenshot 2023-01-05 at 00 23 47

After running rebuild auto, I'm able to boot the app

Feedback on derive API design

Hi @TurtleAI/eds

A few years back, I wrote a hacky event-sourcing implementation for our Turtle backend. It has worked for us so far, but I'd like to re-implement some core components as a standalone library.

Keeping it as a separate library will make it easier to test, improve, and get help on from folks like yourself :)

Goals of the library

  • Keep derived state in sync with a durable stream of events according to the rules specified in Derive.Reducer
  • Allow rebuilding the derived state from scratch if the rules change (equivalent to running a migration)
  • Account for scenarios such as a server restarts (it should pick up where it left off)

Desired characteristics

Scalability

As our number of users grow, we'd like to be able to handle the load. In the old implementation, I'm doing some things inefficiently such as redundantly querying the event source for events.

We'd also like to eventually run this in a multi-node environment. Our code in Turtle only works in a single-node environment.

Flexibility

We'd like to have flexibility on:

  • Where events come from: in Turtle it's hardcoded to a single table
  • Where the state is stored: in Turtle we only support Ecto, but we also want to experiment with in-memory data structures
  • Error handling: we have different error handling needs depending on the domain. Some areas (like chat messages), we'd want to log the error and keep processing. Other areas we may want to shut it down and prevent anything else from getting processed.

Good tooling

Once we've built this library, we will want to add some tooling to improve our experience. For example:

  • A liveview UI to view newly produced events, which reducers processed them, and the state changes they produced
  • Good error reporting if processing an event fails

Next steps

Before implementing too much, I'd love to get help iterating on the public API.
Please take a look at the tests in derive_ecto_test.exs and the documentation in Derive.Reducer

When we integrate this library into our Turtle backend, we would essentially define a bunch of reducers that implement the Derive.Reducer behavior. Then, we'd like some derived state (postgres tables) to be kept up to date with an event log (postgres table called events)

Do you have any questions/feedback on the API? Any undefined behavior we should better define?

Thanks!

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.