turtleai / derive Goto Github PK
View Code? Open in Web Editor NEWAn Event Sourcing and CQRS solution.
An Event Sourcing and CQRS solution.
@TurtleAI/eds
I've currently got an implementation for broadcasting a GenServer.cast
to the subscribers as you see here:
derive/lib/derive/broadcaster.ex
Lines 30 to 34 in f2ddeee
I'd also like to implement a version that does a GenServer.call
in parallel to each subscriber, awaits all of the calls, then returns a keyword list of the results of the form {subscriber, result}
Like
results = Broadcaster.broadcall(server, {:fetch_data, 23})
Is there a best practice for this? I've looked at the Task
module but not sure what of the best way to handle this.
Derive.Ecto.Selector
should accept a %Query{}
(ideally, any query-able supported by Ecto) as selector.
https://github.com/TurtleAI/derive/blob/master/lib/derive/ecto/selector.ex#L23-L31
A common use case is changing multiple rows with a single query. For example:
def handle_event(%InvoiceIssued{} = event) do
[
[ids: event.line_item_ids]
|> Query.invoice_line_items() # returns a %Query{} struct
|> update(%{status: :invoiced})
]
end
A possible implementation (in https://github.com/TurtleAI/derive/blob/master/lib/derive/ecto/selector.ex#L23-L31) is:
def selector_query(%Query{} = query), do: query
Doing multiple operations is likely -- although I didn't measure it -- performance impact.
Most services (and perhaps some reducers, when they processes the same event "instance") requires awaiting for an event to get reduced prior to do what they need to do. In some cases, we need to await for all events that were generated by a command, not just the one event we're handling.
Therefore, we need a simple API we can call from within the service's/reducer's event-handler so that we can await the entire "batch" of events related to the one we have at-hand -- wait 'till they get handled and their changes get persisted.
At first, it could be something "simpler" (or less difficult, at least) like awaiting all events that have the key command_id
with value X
. But, while planing a future infra/arch need, I realized that in some cases we want to persist events directly to the event store, skipping commands. For example, when we persist events from within migrations.
I really don't want to clutter the command_id
kv with trash data -- that can't be correlated to a command log nor a request log --, so I suggest we add a key to events named something like batch_id
. For commands, we can make them populate this batch_id
with the same value as the command_id
, no problem. Or we could just generate a new id and set that value to batch_id
of all events produced by that command execution. Either way would do.
Since Derive seems to be non-opinionated in regards of the structure of events, with the exception of the id
field, this new await_processed/2
signature might need to accept a tuple informing both the key and the value the user (dev) wants to match.
Current API:
await_processed(list_of_events, :reducers)
Suggestion:
await_processed(list_of_events, :reducers)
await_processed({:batch_id, "123"}, :reducers)
# clear distinction between signatures -- pattern matching by list or tuple.
It's likely that derive will need some sort of registry/index of all events currently being processed and which pid/supervisor (might be many) is processing them. If that's the case, it should also be possible -- and I'd say expected -- to have the following signature as well:
await_processed(list_of_event_ids, :reducers)
# You'd have to peak into the type of the first element of the list in order to make a distinction
# between this signature vs the the one that takes a list of event structs.
# I won't suggest removing the signature that takes the list of events' structs in favor of this
# one because I guess we get performance benefits from awaiting on a struct -- I imagine it's
# easier for you to locate the processes you need to await based on the workers that handle
# that event struct. But, if the performance gain of that approach is negligible, then perhaps
# it'd be better to just replace it in favor of list of ids.
Related to #16
In order to keep the code simpler and more readable, in some cases I personally prefer creating no-ops (or is it null-ops? -- in a reducer, it means producing an operation like an update
that will do no side effect to the db). This avoids conditional creation of ops and overkill extractions with conditions and/or pattern matching.
The most common case, for me, is having a pattern match that might return null instead of an op. But there has been one case case where I used and updated that changed no fields (issue #16) -- this one was in Hatch, don't remember if there were any in TurtleOS.
Of course we could just wrap the returned list of ops with a function that filters no-ops (like reject null values or even filtering out Derive operations' structs based on some logic), therefore this isn't a blocker. But, I think this is something we could do on Derive's side the keep userland-code cleaner.
The sudo-code for what I'm suggesting would look like:
def foo (a, b, c) do
apply(reducer_mod, :handle_event, [event])
|> List.wrap() # in case a single op (not a list) is returned
|> List.flatten() # in case there are nested lists, useful when you extract
# the conditional logic for generating subsets of events.
|> Enum.reject(&no_op?/1)
|> invoke_commit(reducer_mod)
end
defp no_op?(null), do: true
defp no_op?(%Update{fields: fields}) when map_size(fields) == 0, do: true
# ...
def no_op?(_), do: false
A use case example:
def handle_event(%Foo{} = event) do
[
maybe_delete_bar(event.baz)
]
end
defp maybe_delete_bar([]), do: nil
defp maybe_delete_bar([_ | _] = bar) do
[bla: bar]
|> Query.bars()
|> delete()
end
Another one:
def handle_event(%Foo{} = event) do
[
maybe_update_bar(event)
]
end
defp maybe_update_bar(event) do
transaction(fn repo ->
current_bar = load_bar(repo, event.bar_id)
new_bar = Map.merge(current_bar, %{a: event.bar_a, b: event.bar_b})
changed_fields = diff(current_bar, new_bar)
update({Bar, event.bar_id}, changed_fields) # there might be no fields changed and we actually have
# a use case like that. In fact, the query fails at
# runtime if no fields were changed (that's issue 16).
end)
end
It also makes it easier to do this:
def handle_event(%Foo{} = event) do
[
if (not is_nil(event.bar)) do
insert(%Bar{id: event.bar_id, a: event.a, b: event.b})
end
# the `if` block will return null when the condition evaluates to false.
]
end
Edit: updated sudo-code for filtering out no-ops, now allows for this use case:
def handle_event(%Foo{} = event) do
[
maybe_delete_x(event.foo),
maybe_create_y(event.bar),
maybe_create_z(event.baz)
# Each `maybe_*` function returns a list of 0 or more events.
# That's why that `List.flatten/1` I added is important.
]
end
Hi @TurtleAI/eds
In several places throughout the code we pass around modules that implement behaviors.
For example here:
derive/lib/derive/migration.ex
Lines 9 to 10 in b8d5670
We the Derive.Reducer.t()
typespec is just an alias to module()
However, what I really want is to accept modules that implement behaviors so I can get vscode to give me warnings if I mistype a function or pass in a module that doesn't have the desired methods.
Is this possible?
Upsert relies under the hood on Insert operation. Unlike the insert operation though, upsert shouldn't take an on_conflict
argument because the action of upsert
already expected the record to get replaced (updated) if there's a conflict.
https://github.com/TurtleAI/derive/blob/master/lib/derive/ecto/operation.ex#L46
It's hard to reproduce, I don't know yet how to -- it's the first time it's happening.
Here's the error and the service code https://gist.github.com/rwillians/95ec29d14bf5b5e656a9267ed9f06b7d
02:01:37.726 [error] GenServer {:"Elixir.Hatch.Invoices.Service.supervisor", {Hatch.Invoices.Service, "30975810790621495645780525318144"}} terminating
** (FunctionClauseError) no function clause matching in Derive.PartitionDispatcher.handle_info/2
(derive 0.1.0) lib/derive/partition_dispatcher.ex:80: Derive.PartitionDispatcher.handle_info({[:alias | #Reference<0.2816401368.3219980290.154724>], :ok}, %Derive.PartitionDispatcher{awaiters: [{{#PID<0.1092.0>, [:alias | #Reference<0.2816401368.3219980290.155007>]}, "30975811344411199482614974382080"}], options: %Derive.Options{batch_size: 100, logger: &Derive.Logger.IOLogger.log/1, mode: :catchup, name: Hatch.Invoices.Service, reducer: Hatch.Invoices.Service, show_progress: nil, source: :"Elixir.Hatch.Invoices.Service.source", validate_version: nil}, partition: %Derive.Partition{cursor: "30975810790842856574665039937536", error: nil, id: "30975810790621495645780525318144", status: :ok}, timeout: 30000})
(stdlib 4.0.1) gen_server.erl:1120: :gen_server.try_dispatch/4
(stdlib 4.0.1) gen_server.erl:1197: :gen_server.handle_msg/6
(stdlib 4.0.1) proc_lib.erl:240: :proc_lib.init_p_do_apply/3
Last message: {[:alias | #Reference<0.2816401368.3219980290.154724>], :ok}
After a couple hours of try-and-error, somehow, I managed to get it "fixed" locally by adding this:
# PartitionDispatcher at line 88
def handle_info({_from, :ok}, %S{timeout: timeout} = state) do
{:noreply, state, timeout}
end
I'm not sure of what I did there -- brain fried after debugging most the code base kkkry --, I just copied and past a bunch of things till get that working.
Failure was triggered when awaiting an event inside a service (weird thing is that we didn't catch that error before)
# Hatch.Invoice.Service
def handle_event(%TimeEntryCreated{} = e) do
# commenting out this `await_processed/2` is enough to not get the error on derive but,
# of course, the service will fail if the time entry doesn't exist in the db yet.
await_processed([e], :reducers)
with_time_entry(e.time_entry_id, &maybe_create_invoice_line_item/1)
end
Edit: ok, maybe that wasn't a "fix" cuz now I'm getting theses errors all over the place (although for some coincidence most tests are passing)
02:27:33.012 [error] GenServer :"Elixir.Hatch.Contracts.Service.source" terminating
** (FunctionClauseError) no function clause matching in anonymous fn/1 in Turtle.EventLog.handle_cast/2
(turtle 0.0.1) lib/event_log.ex:102: anonymous fn(nil) in Turtle.EventLog.handle_cast/2
(elixir 1.13.4) lib/enum.ex:4034: Enum.filter_list/2
(turtle 0.0.1) lib/event_log.ex:102: Turtle.EventLog.handle_cast/2
(stdlib 4.0.1) gen_server.erl:1120: :gen_server.try_dispatch/4
(stdlib 4.0.1) gen_server.erl:1197: :gen_server.handle_msg/6
(stdlib 4.0.1) proc_lib.erl:240: :proc_lib.init_p_do_apply/3
Last message: {:"$gen_cast", {:new_events, [nil, nil]}}
# seems to repeat for all domains' service/reducer
The tests that seems to produce the most amount of errors is api/test/features/hatch__invoices_test.exs
, at scenario "Invoices Only seller-admin in a contract can issue invoice".
Hi @TurtleAI/eds
If I want to update the table for an Ecto.Schema
, I can do something like this:
source = Ecto.get_meta(record, :source)
record
|> Ecto.put_meta(source: "#{dynamic_prefix}_#{source}")
Is there something that similar that can be done to an Ecto.Query
?
Something like
query
|> update_sources(fn source -> "#{dynamic_prefix}_#{source}" end)
Thanks!
Hi @TurtleAI/eds
I've implemented a MapSupervisor
https://github.com/TurtleAI/derive/blob/master/lib/derive/map_supervisor.ex#L24-L44
It's used to start or lookup processes for a given key.
However, if you notice, I'm passing in a registry as the 2nd argument. I'd rather encapsulate the Registry
within the MapSupervisor
so I don't have to think about declaring a registry elsewhere each time.
I guess I could setup a Supervisor
as I'm seeing at the link below:
https://elixir-lang.org/getting-started/mix-otp/dynamic-supervisor.html#supervision-trees
But it seems like a lot of boilerplate just for bundling a registry.
Would you recommend going this route, or are there better ways of doing this?
Hi @TurtleAI/eds
I'm satisfied with the public api of the library, so want to start getting feedback again :)
A Dispatcher
will start from the last processed event and process events until there are no more events to process. This is fine if there are a small number of events, however it's possible (for example if we are rebuild the state from scratch) that there would be on the order of 50,000 events.
Right now I recursively call a catchup function and process 100 events at a time, but I wonder if there are better ways to handle it? Should I somehow have the process to send a message to itself to process more events?
Here's, on boot, the process picks up where it left off in the event log to start processing events:
derive/lib/derive/dispatcher.ex
Lines 111 to 118 in 518e9cc
Implementation of catchup:
derive/lib/derive/dispatcher.ex
Lines 129 to 161 in 518e9cc
Example of an event that was produced by a service and didn't get it's :source_event_id
populated.
{
"id": "30870348649259176310057341550594",
"time": "2022-10-18T15:00:00.578187Z",
"__type__": "Elixir.Hatch.TimeTracker.Events.TimeEntriesInvoiced",
"command_id": null,
"invoice_id": "30870348649203836077836212895744",
"request_id": null,
"time_entry_ids": [
"30870348648779560964140893208576"
],
"source_event_id": null
}
@venkatd do we need some sort of custom commit callback for our services?
The following code:
merge({Turtle.Payments.Models.PaymentIntent, e.payment_intent_id},
status: :processing,
amount: e.amount,
payment_method_id: e.payment_method_id,
payment_method_type: String.to_atom(e.payment_method_type),
payment_method_brand: e.payment_method_brand,
payment_method_last4: e.payment_method_last4
)
Fails in my codebase because it's not passing along the created_at
field. It's trying to set the created_at
to null but that probably means there's a bug in which missing fields in the struct are mistakenly set.
I need to write a test case to reproduce this behavior
20:20:54.630 [error] ** (Ecto.QueryError) `update_all` requires at least one field to be updated in query:
from t0 in Hatch.TimeTracker.Models.TimeEntry,
where: t0.id == ^"30806491569410975171616041336832",
update: [set: []]
(ecto 3.8.3) lib/ecto/repo/queryable.ex:205: Ecto.Repo.Queryable.execute/4
(ecto 3.8.3) lib/ecto/multi.ex:742: Ecto.Multi.apply_operation/4
(ecto 3.8.3) lib/ecto/multi.ex:723: Ecto.Multi.apply_operation/5
(elixir 1.13.4) lib/enum.ex:2396: Enum."-reduce/3-lists^foldl/2-0-"/3
(ecto 3.8.3) lib/ecto/multi.ex:697: anonymous fn/5 in Ecto.Multi.apply_operations/5
(ecto_sql 3.8.1) lib/ecto/adapters/sql.ex:1221: anonymous fn/3 in Ecto.Adapters.SQL.checkout_or_transaction/4
(db_connection 2.4.2) lib/db_connection.ex:865: DBConnection.transaction/3
(ecto 3.8.3) lib/ecto/repo/transaction.ex:18: Ecto.Repo.Transaction.transaction/4
(derive 0.1.0) lib/derive/ecto/operation/update.ex:16: anonymous fn/6 in Derive.Ecto.DbOp.Derive.Ecto.Operation.Update.to_multi/2
(ecto 3.8.3) lib/ecto/multi.ex:723: Ecto.Multi.apply_operation/5
(elixir 1.13.4) lib/enum.ex:2396: Enum."-reduce/3-lists^foldl/2-0-"/3
(ecto 3.8.3) lib/ecto/multi.ex:697: anonymous fn/5 in Ecto.Multi.apply_operations/5
(ecto_sql 3.8.1) lib/ecto/adapters/sql.ex:1221: anonymous fn/3 in Ecto.Adapters.SQL.checkout_or_transaction/4
(db_connection 2.4.2) lib/db_connection.ex:1562: DBConnection.run_transaction/4
The problem is on https://github.com/TurtleAI/derive/blob/master/lib/derive/ecto/operation/update.ex#L21-L25
We need another pattern match where, If the map of fields is empty (map_size(fields) == 0
), then we perform a no-op -- just skip and do nothing.
Hi @TurtleAI/eds
I'm finding that when Ecto.Repo.transaction
fails due to something like a constraint error, I'm unable to find the key of the operation that failed.
I'd like to show a more helpful message that correlates an event to the Ecto operation that triggered it and getting the key would help here.
I have a try ... rescue
here:
derive/lib/derive/state/ecto.ex
Line 39 in b430a9a
And I rescue an error of this form:
%Ecto.QueryError{message: "deps/ecto/lib/ecto/repo/queryable.ex:166: field `missing_field` in `update` does not exist in schema DeriveEctoTest.User in query:\n\nfrom u0 in DeriveEctoTest.User,\n where: u0.id == ^\"99\",\n update: [set: [missing_field: ^\"stuff\"]]\n"}
Is there a way to get the name of Ecto.Multi
operation that this error was triggered for?
Or if this isn't possible, maybe there's a way to validate these individual operations before executing them?
Thanks!
I'm using the test db in dev env at the moment, so that I don't have to create some records I want to access via graphiql (for some testing).
Whenever I run some test in order to create the records, when I boot the app the error occurs
I rebuild using auto, but there doesn't seem to be any events pending for rebuild on activity reducer
After running rebuild auto, I'm able to boot the app
Hi @TurtleAI/eds
A few years back, I wrote a hacky event-sourcing implementation for our Turtle backend. It has worked for us so far, but I'd like to re-implement some core components as a standalone library.
Keeping it as a separate library will make it easier to test, improve, and get help on from folks like yourself :)
Derive.Reducer
As our number of users grow, we'd like to be able to handle the load. In the old implementation, I'm doing some things inefficiently such as redundantly querying the event source for events.
We'd also like to eventually run this in a multi-node environment. Our code in Turtle only works in a single-node environment.
We'd like to have flexibility on:
Once we've built this library, we will want to add some tooling to improve our experience. For example:
Before implementing too much, I'd love to get help iterating on the public API.
Please take a look at the tests in derive_ecto_test.exs
and the documentation in Derive.Reducer
When we integrate this library into our Turtle backend, we would essentially define a bunch of reducers that implement the Derive.Reducer
behavior. Then, we'd like some derived state (postgres tables) to be kept up to date with an event log (postgres table called events)
Do you have any questions/feedback on the API? Any undefined behavior we should better define?
Thanks!
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.