Giter Club home page Giter Club logo

Comments (44)

jroper avatar jroper commented on July 29, 2024 4

And finally, we can implement all of the above today without changing any of the language support libraries. It's all done in the proxy. So, this solution is state model agnostic, implemented 100% in the cloud state proxy, and agnostic to the actual projection mechanism, whether it's remote, SQL, another state model or manual.

from cloudstate.

jroper avatar jroper commented on July 29, 2024 1

One thing about the projection stream is that it is polymorphic. So, let's first consider the case where events are in protobuf format. So, if we consider the shopping cart app, we have two protobuf events, ItemAdded, and ItemRemoved. There are no interfaces in protobuf, we can't define a ShoppingCartEvent interface, and then say that our event emitted consumes ShoppingCartEvent. So, we could support this:

service MyReadSideProcessor {
  rpc HandleItemAdded(ItemAdded) returns google.Empty {
    option (cloudstate.eventing) {
      in: {
        event_log: "shopping-cart"
      };
    };
  };
  rpc HandleItemRemoved(ItemRemoved) returns google.Empty {
    option (cloudstate.eventing) {
      in: {
        event_log: "shopping-cart"
      };
    };
  };
};

We could define the semantics such that two consumers for the same event stream on the same entity/service represent a single virtual consumer, whose events are partitioned between the two actual consumers. I think this will actually work well, especially for Java. But, perhaps you might like to define one method, and use that one method to receive events of all types (or, events of all types that aren't explicitly defined), with pattern matching on the types. For that, we can treat google.protobuf.Any specially:

service MyReadSideProcessor {
  rpc HandleEvent(google.protobuf.Any) returns google.Empty {
    option (cloudstate.eventing) {
      in: {
        event_log: "shopping-cart"
      };
    };
  };
};

So, all messages go to that one method. In the support libraries, we would also see that this is a google.protobuf.Any, and we would deserialize it. In JavaScript, since there's no typing on the methods anyway, there wouldn't be much to do, we'd just pass it as is. In Java, where there is typing, users could declare the method to accept java.lang.Object, or they could define an interface ShoppingCartEvent, declare in the java options in the descriptor for ItemAdded and ItemRemoved that these should implement ShoppingCartEvent, and then in their method, accept ShoppingCartEvent.

This would of course also work for any eventing support, not just the event log.

Moving on to non protobuf formats - our user libraries support persisting JSON, which is a subjective choice that we don't want to preclude. Also, for broader eventing support, we can be quite sure that users will want to consume event streams that contain JSON from other systems. Or event plain text. For this, I'm again proposing that users use google.protobuf.Any in the descriptor.

Now, if it's an event log that's been persisted by Cloudstate, the type_url if it's JSON is going to be json.cloudstate.io/<typename>. In the case of JavaScript, <typename> is read from the type property in the event. In the case of Java, <typename> is the FQCN of the class. So, the support libraries can likewise treat google.protobuf.Any specially here, and use the type_url to know how to deserialize it.

If however the event has come from another system, we won't be able to know the type to deserialize as. In our JavaScript support library, when you serialize an object to JSON that doesn't have a type property, the type URL is json.cloudstate.io/object. I propose we use that, if we know that the incoming event is JSON, we create a google.protobuf.Any using the rules for JSON defined here with a type_url of json.cloudstate.io/object. For JavaScript, of course it just deserializes the JSON as is. For Java, it can use the type of the method that's accepting the events to know what type to use to deserialize it.

from cloudstate.

sleipnir avatar sleipnir commented on July 29, 2024 1

@ralphlaude
See
https://github.com/cloudstateio/cloudstate/blob/master/proxy/core/src/main/scala/io/cloudstate/proxy/eventing/EventingManager.scala
And
https://github.com/cloudstateio/cloudstate/blob/master/samples/java-pingpong/src/main/protos/pingpong/pingpong.proto

I think it's a base.

from cloudstate.

ralphlaude avatar ralphlaude commented on July 29, 2024 1

@sleipnir thanks for the link regarding siddhi.

@viktorklang, @sleipnir,
I think we will find out where to put those configurations after playing around with some use cases. I think using proto descriptor is a good place to start because we have compile time and runtime support.

from cloudstate.

viktorklang avatar viktorklang commented on July 29, 2024 1

@sleipnir Ah, OK, I probably missed that part then—thanks for clarifying again :)

from cloudstate.

viktorklang avatar viktorklang commented on July 29, 2024

Figuring out Evented Services would be a nice thing to do after the doc sprint.

from cloudstate.

ralphlaude avatar ralphlaude commented on July 29, 2024

Hi @jroper,
Hi @viktorklang,

I like the thouhgts regarding CQRS support for event sourced entities and I understand the fact that you don't want to reinvent the wheel.
You want to provide some kind of abstract dataset for CQRS to eleminate the support of distinct API (mentioned on Cloudstate Gitter by @viktorklang):

We're currently thinking about projections, and there's been quite a few conversations between @jroper and I. The challenge is this:

  • Don't want to require distinct APIs for every single language support
  • Don't want to require distinct APIs for every single state model
  • Don't want to reinvent wheels (designing new query languages)

What would be nice is to have a unified approach to generating an intermediate dataset, which can then be exported to a set of supported query models (think RDBMS, ElasticSearch, …), and then you can field queries in the native query language of the query models.

I still have questions regarding the defined GRPC protocol between Clodustate and the User Function.
If there are support for local and remote read side processors the User Function should be able to define the kind of read side processors it wants. If no, who should define that?
The GRPC protocol should contain GRPC commands for initiating the consumption of the event log (transforming the event log with datasets) and for consuming the read side or projection using some kind of GRPC streams. The User Function, in my opinion, should implement the GRPC protocol, is that correct?
Furthermore who should define the schema of the projection and where to export the transformed events from the log?

I want more insights here to come up with some defined GRPC protocol for CQRS.

@viktorklang - I hope it is fine for quoting you here.

Thanks for any support.

from cloudstate.

viktorklang avatar viktorklang commented on July 29, 2024

@ralphlaude I think it is important to not take event-sourcing as the defining source of information to projections. We will end up with probably a handfull of different state models, and if you wanted to do a join of the data from different state models, the solution needs to be able to accommodate that.

So, we need to take a look at this from the generic sense of domain data => aggregated information, and aggregated information + query => view

So, for the simple case, let's say that you have Customer as a domain, and you want to create a view of that domain, where you want to be able to search for customers based on partial match on their name, and you want to be able to sort that result according to lexicographical info, and you want to be able to paginate the result. So effectively you want to be able to do the following (pseudocode):

Aggregation: UPSERT CustomersByBName WHERE customerid = ? SET firstname = ?, SURNAME = ?
View: SELECT customerid, firstname, surname FROM CustomersByName WHERE firstname LIKE ? OR surname LIKE ? ORDER BY ? ? OFFSET ? LIMIT ?

from cloudstate.

ralphlaude avatar ralphlaude commented on July 29, 2024

@viktorklang thanks for the insights and the explanation. Good to know event sourcing is not the defining source of information to projections.

from cloudstate.

marcellanz avatar marcellanz commented on July 29, 2024

As discussed on the contributors call on 2020-03-10, having aggregation and views for the different state models as well as potentially every language support, I have the impression the cloudstate protocol itself could perhaps lead to first principles regarding aggregation/transformation and querying; I don't know yet how :-)

from cloudstate.

jroper avatar jroper commented on July 29, 2024

Let's take a step back. The biggest mistake we made when doing projections in Lagom was jumping straight to projections. @viktorklang talk to the Akka team, it's a mistake that they're now trying to fix with the Akka projections library. We're jumping straight to projections here - talking about SQL statements and the like, that's jumping way too far ahead.

The first piece to get right in CQRS is the event stream. The event stream is the bridge between the write side and the read side, it is the mechanism by which updates are passed from one to the other. As long as a state model can produce an event stream, then that state model can have its command and query sides segregated, it can be used as part of a CQRS design. So, the event stream is the mechanism by which we ensure that a particular projection mechanism isn't tied to a particular state model, as long as we design it so that a projection is just an event stream consumer, it doesn't matter which state model the stream came from, it will be the same.

Incidentally, if there is no event stream for a state model - you can't do CQRS with it, and there's no point in even starting to talk about projections for that state model. So for example, synthesising a useful event stream for CRDTs - not sure if it's going to be possible. But we don't have to worry about that now, it's a problem that we can research and solve later, we just need to get the mechanism that Cloudstate offers for consuming event streams right, and then if we can get a CRDT event stream, then we can have CRDT projections.

So, in general, there are typically two types of projection - local, and remote. In a local projection, you consume the event stream directly, updating the projection datastore as you go. In a remote projection, you publish the event stream to a message broker, and then from a different service, consume the event stream from the message broker, and the remote service updates the datastore as it goes. Note that a remote projection might not even be implemented in Cloudstate. It could be an AWS Lambda function for example.

This is where we went wrong in Lagom. The first thing we did was create an API for defining local read side projections. Then later, we added a different API for publishing the event stream to a message broker. Because these weren't designed together, the mechanisms we had for each were completely different. They looked nothing like each other. And that was a mistake. Let's not make that mistake in Cloudstate.

And this is why we shouldn't be jumping straight to projections. The projection itself may or may not be in the Cloudstate service. It could be local, in which case it is, or it could be remote, in which case it's not. The commonality between the local and the remote is not the projection, it's the event stream, and until we have got that right, we shouldn't be talking about whatever Cloudstate is going to do to provide projection support. As a first step, we need to have that event stream publishing correct. Once we have that, you can do remote projections using something that isn't Cloudstate. You can also do projections in Cloudstate with no specific projection support, eg with a stateless service that talks directly to a database. That's not the end goal that we want to be at, but it is the first step.

from cloudstate.

jroper avatar jroper commented on July 29, 2024

So, with that in mind, here's what I'd like to propose for publishing projections. First, let's show the remote read side:

service MyReadSidePublisher {
  rpc HandleEvent(stream MyEvent) returns (stream MyPublishedEvent) {
    option (cloudstate.eventing) {
      in: {
        event_stream: "my-event-sourced-persistence-id"
      };
      out: {
        topic: "my-outbound-message-broker-topic"
      };
    };
  };
};

Now this service will be implemented by a stateless service, which transforms the stream of MyEvent to the stream of MyPublishedEvent - it may do some filtering, or some folding/unfolding of events along the way, it doesn't have to be 1:1.

Now for a local read side, with no projection support in Cloudstate:

service MyReadSideProcessor {
  rpc HandleEvent(MyEvent) returns google.Empty {
    option (cloudstate.eventing) {
      in: {
        event_stream: "my-event-sourced-persistence-id"
      };
    };
  };
};

Pretty simple, it just consumes the events, and emits nothing. Again, this would be implemented as a stateless service, and in this case, the service would be responsible for talking directly to the datastore itself. Alternatively, the service call could forward the handling to another state model, eg, a key value service.

And just as a preview of what I think might work well in future for, say, SQL support:

service MyReadSideProcessor {
  rpc HandleEvent(MyEvent) returns (google.Empty) {
    option (cloudstate.eventing) {
      in: {
        event_stream: "my-event-sourced-persistence-id"
      };
      out: {
        sql_projection: {
          query: "select myevent.entity_id from myevent where myevent.type = \"email_updated\" group by myevent.data.email"
          table: "users_by_email"
        };
      };
    };
  };
};

That's just one way it could be done, a lot more thought would be needed though. Also note, the above would also work for implementing the remote end of a remote projection, the in would just be a topic.

from cloudstate.

viktorklang avatar viktorklang commented on July 29, 2024

from cloudstate.

ralphlaude avatar ralphlaude commented on July 29, 2024

@jroper, great! The event stream as common interface between different state models seems to be the right abstraction. The proposal is clean and straightforward to implement.

from cloudstate.

marcellanz avatar marcellanz commented on July 29, 2024

I like that a lot.

from cloudstate.

sleipnir avatar sleipnir commented on July 29, 2024

Clean and transparent. I also liked it!

from cloudstate.

ralphlaude avatar ralphlaude commented on July 29, 2024

@jroper i would like to understand a bit more of how to implement the
service MyReadSideProcessor ... only in the proxy without changing any language support libraries. Thanks

from cloudstate.

jroper avatar jroper commented on July 29, 2024

The service itself will still be implemented by users in the user function - this will be responsible for either mapping events to an external form to be published, or perhaps putting into the Cloudstate keyvalue store, or talking to a datastore directly. But it will be done using the existing APIs offered by the language support libraries, no changes needed to them (or very little change) - which is good, because that's where so much of the effort is, because there are (or will be) so many of them. The more simple they are, the better off we are.

from cloudstate.

ralphlaude avatar ralphlaude commented on July 29, 2024

thanks and understood :).

from cloudstate.

viktorklang avatar viktorklang commented on July 29, 2024

@jroper @ralphlaude @marcellanz @sleipnir I think we need to answer the following questions:

  • How will this work beyond event sourced entities?
  • How will aggregations be made? (we don't want to call databases directly from user code, for all the same reasons why the Cloudstate Proxy handles the data access)
  • How will views be served?
  • How will cross-domain aggregation work?

One way to trying to answer all those questions would be to define a canonical example, like the Shopping Cart served for when builing the statful function support.

Thoughts?

from cloudstate.

ralphlaude avatar ralphlaude commented on July 29, 2024

@viktorklang all those interesting questions are good and they will be answered. I think, perhaps i am wrong, the focus right now is to get the event publishing for event sourced entity right. Once we got the event publishing right we can go a step further and start looking for answer to those questions which are part of projections.

from cloudstate.

sleipnir avatar sleipnir commented on July 29, 2024

Hi @ralphlaude I think we are not far from getting the issue of publishing events right. We already have some work done on that and I believe there is a short time to get there I think @viktorklang is thinking about all 'TODOS' ahead of us.

@viktorklang Using the shopping-cart as a example seems to be the natural way, since we could complete the entire of eventsourced cycle and once we already have a TCK to support.

from cloudstate.

ralphlaude avatar ralphlaude commented on July 29, 2024

@sleipnir, i was not aware of the progress concerning the publishing of events. Good to know there are a lot of progress made there.

from cloudstate.

ralphlaude avatar ralphlaude commented on July 29, 2024

@viktorklang,

i will try to provide some thoughts on some questions:

* How will this work beyond event sourced entities?

event sourced entities uses AKKA Persistence which provides a way for accessing recents events for an entity.
For non event sourced entities we need a way to access last state changes on the datastore as AKKA Persistence does. This way we have an event streams which can be tranformed by the readside processor. For accessing last state changes we have to save those changes separately and then we can query it to form the event stream.

* How will aggregations be made? (we don't want to call databases directly from user code, for all the same reasons why the Cloudstate Proxy handles the data access)
* How will views be served?

The readside processor knows where to store the aggregated data. The proxy can provide a endpoint like (.../servicename/entitytype/projectionname) for serving the views. The endpoint will accept a query as request payload. The proxy can use the information (where to store the aggregated data) of the readside for accessing the aggregated data.

* How will cross-domain aggregation work?

from cloudstate.

viktorklang avatar viktorklang commented on July 29, 2024

@ralphlaude What I want to avoid is having the User Language Support or the user code having to connect to the database directly—because it requires all languages to have db drivers for all dbs we want to be able to support. Supporting dbs for every language seems like a big win.

from cloudstate.

ralphlaude avatar ralphlaude commented on July 29, 2024

@viktorklang, @jroper, @marcellanz, @sleipnir,

@viktorklang, i understand very well that you want to avoid the User Language Support or the User Code having access to the database.

I think one way to solve the problem would be to use Eventing or Pub/Sub to mitigate the database access. The User Function or Projection will publish events/messages through Eventing or Pub/Sub and Cloudstate will consumed those events/messages and do something with it depending on the use case. In the case of Projection it will just feed the database. The User Function or Projection should be able to define which database type it wants to use for a projection.

Cloudstate could provide support to consume Eventing or Pub/Sub for feeding all supported databases. The most important question here is how.

The local projection will then publish transformed events through Eventing or Pub/Sub and those events will be consumed by Cloudstate to feed the projection in the database.
The local projection interface will look like this:

service MyLocalReadSideProcessor {
  rpc HandleEvent(MyEvent) returns (google.Empty) {
    option (cloudstate.eventing) {
      in: {
        event_log: "my-event-sourced-persistence-id"
      };
      out: {
	topic: "my-cloudstate-inbound-message-projection-topic"
        projection: {
          databasetype: "cassandra"
          table: "some_table"
        };
      };
    };
  };
};

I think we can use the same mechanism to address Projection for cross domain-aggregation.

I think this can be test again the work of @jroper if we think it is fine.

from cloudstate.

sleipnir avatar sleipnir commented on July 29, 2024

@ralphlaude LGTM
But... The definition of the projection should be in the StatefulService deployment statement and not in the grpc options. That's me thinking about implementation. There should be only one reference:

projection: some-projection

And any information, query, configuration would be done in the implementation of the service yaml referenced with the name 'some-projection'. What do you think ?

from cloudstate.

viktorklang avatar viktorklang commented on July 29, 2024

@sleipnir Yes and no, Imagine a projection which creates a composite view from two domains, who would it know where to obtain the data from?

from cloudstate.

sleipnir avatar sleipnir commented on July 29, 2024

Couldn't we declare multiple sources in the projection?

from cloudstate.

viktorklang avatar viktorklang commented on July 29, 2024

@sleipnir Sure, but how do you know the names of those sources if it is only available in their own deployment descriptors?

from cloudstate.

sleipnir avatar sleipnir commented on July 29, 2024

@viktorklang I understand the question, but everything in the descriptor is available to the application is a question of how to access it and not the possibility or impossibility of that access. However, come on, my opinion is that if we don't get out of the simplest possible use case, we will have no idea how to proceed to the most complex cases. Whether using options or descriptors we need a simpler use case to get started and a base implementation as simple as, even if we change all of that later, it would support more ideas.

from cloudstate.

viktorklang avatar viktorklang commented on July 29, 2024

@sleipnir Agreed. However, I'd like to keep all external interfaces in the proto descriptor, so there's a one-stop-shop for knowing where to connect to. The YAML/deployment descriptor makes sense to keep everything which is service-internal. Does that make sense?

from cloudstate.

sleipnir avatar sleipnir commented on July 29, 2024

No problem is a good arrangement. For me, the projection was much more internal than external. But seeing your argument makes sense.
Another point I wanted to mention is that maybe we should take a look around and see if anyone has tackled this problem before, or any similar problem. See the siddhi query used in the WSO2 zip product https://siddhi.io/en/v5.0/docs/query-guide/ or on sparkql or similar

from cloudstate.

viktorklang avatar viktorklang commented on July 29, 2024

@sleipnir I understand. The rationale I use is that the projection stream (change events) needs to be observable by other functions in order to build composite projections, so it is not a service-internal thing.

from cloudstate.

sleipnir avatar sleipnir commented on July 29, 2024

@viktorklang perfect! take a look at siddhi there’s nothing new (it’s many years old already) but it can inspire a good interface

from cloudstate.

ralphlaude avatar ralphlaude commented on July 29, 2024

@viktorklang, is it a good way to go using Eventing or Pub/Sub to avoid access to the database from the User Function? Are there something else we can address here?

from cloudstate.

viktorklang avatar viktorklang commented on July 29, 2024

@sleipnir @ralphlaude To me, what is important is that the solution is well-known so people can get started quickly and with confidence. The question is if we can get there with a minimal subset of SQL, or similar.

from cloudstate.

sleipnir avatar sleipnir commented on July 29, 2024

If I'm not mistaken akka persistence already has an api that supports something like sql,it's nothing like what I showed with siddhi, much simpler from what I saw,, sorry, I'm not a specialist in akka persistence, creating an interface that does the middle of the field and with the events being persisted in the journal correctly it should be a base to start . I'm wrong?

from cloudstate.

viktorklang avatar viktorklang commented on July 29, 2024

@ralphlaude Not sure if Eventing is the right step, the aggregator needs a feed of events, but the result of the aggregation should be possible to be UPSERT:ed by the proxy or otherwise.

from cloudstate.

sleipnir avatar sleipnir commented on July 29, 2024

If I'm not mistaken akka persistence already has an api that supports something like sql,it's nothing like what I showed with siddhi, much simpler from what I saw,, sorry, I'm not a specialist in akka persistence, creating an interface that does the middle of the field and with the events being persisted in the journal correctly it should be a base to start . I'm wrong?

This would also leave regardless of the eventing. On the read side

from cloudstate.

sleipnir avatar sleipnir commented on July 29, 2024

Why we do not use eventing, in this case state changes due to eventsourced events would be directed to any topic defined for ingestion, we added an aggregation step, filter, windows, batches, simple operators, reading from this source and updating a supported journal and we use akka persistence for queries in a first phase? We evolved from there!

from cloudstate.

viktorklang avatar viktorklang commented on July 29, 2024

@sleipnir The issue is not with eventing, the issue is who executed the updates into the data store for the projection, and how are queries fielded?

from cloudstate.

sleipnir avatar sleipnir commented on July 29, 2024

Sorry @viktorklang thought i had addressed this in the reply. The proxy would consume the flow of events (even if it is not the problem it would be part of the solution) it would resolve the persistence issues as I said (applying filters, etc ...) persisting this state via akka persistence and on the reading side it would use akka persistence's own query capabilities to deliver information.

from cloudstate.

ralphlaude avatar ralphlaude commented on July 29, 2024

@jroper when could we see something regarding the event stream for projection of the event sourced entity? thanks

from cloudstate.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.