Giter Club home page Giter Club logo

propulsion's Introduction

Propulsion Build Status release NuGet license code size

While the bulk of this code is in production across various Walmart systems, it's unfortunately pre-documentation atm (ideally there'd be a nice summary of various projection patterns, but also much broader information discussing the tradeoffs implied in an event-centric system as a whole

If you're looking for a good discussion forum on these kinds of topics, look no further than the DDD-CQRS-ES Slack's #equinox channel (invite link).

Components

The components within this repository are delivered as a multi-targeted Nuget package targeting net461 (F# 3.1+) and netstandard2.0 (F# 4.5+) profiles

  • Propulsion NuGet Implements core functionality in a channel-independent fashion including ParallelProjector, StreamsProjector. Depends on MathNet.Numerics, Serilog
  • Propulsion.Cosmos NuGet Provides bindings to Azure CosmosDb a) writing to Equinox.Cosmos :- CosmosSink b) reading from CosmosDb's changefeed by wrapping the dotnet-changefeedprocessor library :- CosmosSource. Depends on Equinox.Cosmos, Microsoft.Azure.DocumentDB.ChangeFeedProcessor, Serilog
  • Propulsion.EventStore NuGet. Provides bindings to EventStore, writing via Propulsion.EventStore.EventStoreSink Depends on Equinox.EventStore, Serilog
  • Propulsion.Kafka NuGet Provides bindings for producing and consuming both streamwise and in parallel. Includes a standard codec for use with streamwise projection and consumption, Propulsion.Kafka.Codec.NewtonsoftJson.RenderedSpan. Implements a KafkaMonitor that can log status information based on Burrow. Depends on FsKafka v = 1.3.0, Serilog
  • Propulsion.Kafka0 NuGet. Same functionality/purpose as Propulsion.Kafka but targets older Confluent.Kafka/librdkafka version for for interoperability with systems that have a hard dependency on that. Depends on Confluent.Kafka [0.11.3], librdkafka.redist [0.11.4], Serilog

The ubiquitous Serilog dependency is solely on the core module, not any sinks, i.e. you configure to emit to NLog etc.

dotnet tool provisioning / projections test tool

  • Propulsion.Tool Tool NuGet: Tool used to initialize a Change Feed Processor aux container for Propulsion.Cosmos and demonstrate basic projection, including to Kafka. (Install via: dotnet tool install Propulsion.Tool -g)

Related repos

  • See the Jet dotnet new templates repo for examples using the packages herein:

    • proProjector template for example CosmosSource logic consuming from a CosmosDb ChangeFeedProcessor.
    • proProjector template (in -k mode) for example producer logic using StreamsProducer, StreamsProjector and ParallelProducer.
    • proConsumer template for example consumer logic using ParallelConsumer and StreamsConsumer.
    • proSync template for examples of binding a CosmosSource or EventStoreSource to a CosmosSink or EventStoreSink.
  • See the FsKafka repo for BatchedProducer and BatchedConsumer implementations (together with the KafkaConsumerConfig and KafkaProducerConfig used in the Parallel and Streams wrappers in Propulsion.Kafka)

  • See the Equinox QuickStart for examples of using this library to project to Kafka from Equinox.Cosmos and/or Equinox.EventStore.

QuickStart

1. Use propulsion tool to run a CosmosDb ChangeFeedProcessor

dotnet tool uninstall Propulsion.Tool -g
dotnet tool install Propulsion.Tool -g

propulsion init -ru 400 cosmos # generates a -aux container for the ChangeFeedProcessor to maintain consumer group progress within
# -V for verbose ChangeFeedProcessor logging
# `-g projector1` represents the consumer group - >=1 are allowed, allowing multiple independent projections to run concurrently
# stats specifies one only wants stats regarding items (other options include `kafka` to project to Kafka)
# cosmos specifies source overrides (using defaults in step 1 in this instance)
propulsion -V project -g projector1 stats cosmos

2. Use propulsion tool to Run a CosmosDb ChangeFeedProcessor, emitting to a Kafka topic

$env:PROPULSION_KAFKA_BROKER="instance.kafka.mysite.com:9092" # or use -b

# `-V` for verbose logging
# `-g projector3` represents the consumer group; >=1 are allowed, allowing multiple independent projections to run concurrently
# `-l 5` to report ChangeFeed lags every 5 minutes
# `kafka` specifies one wants to emit to Kafka
# `temp-topic` is the topic to emit to
# `cosmos` specifies source overrides (using defaults in step 1 in this instance)
propulsion -V project -g projector3 -l 5 kafka temp-topic cosmos

Projectors

See this medium post regarding some patterns used at Jet in this space for a broad overview of the reasons one might consider employing a projection system.

Propulsion.Cosmos Projection facilities

An integral part of the Equinox.Cosmos feature set is the ability to project events based on the Azure DocumentDb ChangeFeed mechanism. Key elements involved in realizing this are:

In CosmosDb, every document lives within a logical partition, which is then hosted by a variable number of processor instances entitled physical partitions (Equinox.Cosmos documents pertaining to an individual stream bear the same partition key in order to ensure correct ordering guarantees for the purposes of projection). Each front end processor has responsibility for a particular subset range of the partition key space.

The ChangeFeed's real world manifestation is as a long running Processor per frontend processor that repeatedly tails a query across the set of documents being managed by a given partition host (subject to topology changes - new processors can come and go, with the assigned ranges shuffling to balance the load per processor). e.g. if you allocate 30K RU/s to a container and/or store >20GB of data, it will have at least 3 processors, each handling 1/3 of the partition key space, and running a change feed from that is a matter of maintaining 3 continuous queries, with a continuation token each being held/leased/controlled by a given Change Feed Processor.

Effect of ChangeFeed on Request Charges

It should be noted that the ChangeFeed is not special-cased by CosmosDb itself in any meaningful way - something somewhere is going to be calling a DocumentDb API queries, paying Request Charges for the privilege (even a tail request based on a continuation token yielding zero documents incurs a charge). Its important to consider that every event written by Equinox.Cosmos into the CosmosDb container will induce an approximately equivalent cost due to the fact that a freshly inserted document will be included in the next batch propagated by the Processor (each update of a document also 'moves' that document from it's present position in the change order past the the notional tail of the ChangeFeed). Thus each insert/update also induces an (unavoidable) request charge based on the fact that the document will be included aggregate set of touched documents being surfaced per batch transferred from the ChangeFeed (charging is per KiB or part thereof). The effect of this cost is multipled by the number of ChangeFeedProcessor instances one is running.

Change Feed Processors

The CosmosDb ChangeFeed's real world manifestation is a continuous query per DocumentDb Physical Partition node processor.

For .NET, this is wrapped in a set of APIs presented within the standard Microsoft.Azure.DocumentDb[.Core] APIset (for example, the Sagan library is built based on this, but there be dragons; implementing a correct one you can trust, with tests, reliability and good performance is no trivial undertaking).

A ChangeFeed Processor consists of (per CosmosDb processor/range)

  • a host process running somewhere that will run the query and then do something with the results before marking off progress
  • a continuous query across the set of documents that fall within the partition key range hosted by a given physical partition host

The implementation in this repo uses Microsoft's .NET ChangeFeedProcessor implementation, which is a proven component used for diverse purposes including as the underlying substrate for various Azure Functions wiring (though NOT bug free at the present time).

See the PR that added the initial support for CosmosDb Projections and the QuickStart for instructions.

Feeding to Kafka

While Kafka is not for Event Sourcing, if you have the scale to run automate the care and feeding of Kafka infrastructure, it can a great toof for the job of Replicating events and/or Rich Events in a scalable manner.

The Apache Kafka intro docs provide a clear terse overview of the design and attendant benefits this brings to bear.

As noted in the Effect of ChangeFeed on Request Charges section, it can make sense to replicate a subset of the ChangeFeed to a Kafka topic (both for projections being consumed within a Bounded Context and for cases where you are generating a Pubished Notification Event) purely from the point of view of optimising request charges (and not needing to consider projections when considering how to scale up provisioning for load). Other benefits are mechanical sympathy based - Kafka can be the right tool for the job in scaling out traversal of events for a variety of use cases given one has sufficient traffic to warrant the complexity.

See the PR that added the initial support for CosmosDb Projections and the QuickStart for instructions.

CONTRIBUTING

Please raise GitHub issues for any questions so others can benefit from the discussion.

This is an Open Source project for many reasons, with some central goals:

  • quality reference code (the code should be clean and easy to read; where it makes sense, it should remain possible to clone it locally and use it in tweaked form)
  • optimal resilience and performance (getting performance right can add huge value for some systems, i.e., making it prettier but disrupting the performance would be bad)
  • this code underpins non-trivial production systems (having good tests is not optional for reasons far deeper than having impressive coverage stats)

We'll do our best to be accomodating to PRs and issues, but please appreciate that we emphasize decisiveness for the greater good of the project and its users; new features start with -100 points.

Within those constraints, contributions of all kinds are welcome:

  • raising Issues is always welcome (but we'll aim to be decisive in the interests of keeping the list navigable).
  • bugfixes with good test coverage are always welcome; in general we'll seek to move them to NuGet prerelease and then NuGet release packages with relatively short timelines (there's unfortunately not presently a MyGet feed for CI packages rigged).
  • improvements / tweaks, subject to filing a GitHub issue outlining the work first to see if it fits a general enough need to warrant adding code to the implementation and to make sure work is not wasted or duplicated

TEMPLATES

The best place to start, sample-wise is with the QuickStart, which walks you through sample code, tuned for approachability, from dotnet new templates stored in a dedicated repo.

BUILDING

Please note the QuickStart is probably the best way to gain an overview, and the templates are the best way to see how to consume it; these instructions are intended mainly for people looking to make changes.

NB The Propulsion.Kafka.Integration tests are reliant on a TEST_KAFKA_BROKER environment variable pointing to a Broker that has been configured to auto-create ephemeral Kafka Topics as required by the tests (each test run blindly writes to a guid-named topic and trusts the broker will accept the write without any initialization step)

build, including tests on net461 and netcoreapp2.1

dotnet build build.proj -v n

FAQ

why do you employ Kafka as an additional layer, when downstream processes could simply subscribe directly and individually to the relevant Cosmos db change feed(s)? Is it to accommodate other messages besides those emitted from events and snapshot updates? ๐Ÿ™ @Roland Andrag

Well, Kafka is definitely not a critical component or a panacea.

You're correct that the bulk of things that can be achieved using Kafka can be accomplished via usage of the changefeed. One thing to point out is that in the context of enterprise systems, having a well maintained Kafka cluster does have less incremental cost that it might do if you're building a smaller system from nothing.

Some of the negatives of consuming from the CF direct:

  • each CFP reader imposes RU charges (its a set of continuous queries against each and every physical range of which the cosmos store is composed)
  • you can't apply a server-side filter, so you pay for everything you see
  • you're very prone to falling into coupling issues
  • (as you alluded to), if there's some logic or work involved in the production of events you'd emit to Kafka, each consumer would need to duplicate that

While many of these concerns can be alleviated to varying degrees by splitting the storage up into multiple containers in order that each consumer will intrinsically be interested in a large proportion of the data it will observe (potentially using database level RU allocations), the write amplification effects of having multiple consumers will always be more significant when reading directly than when using Kafka, the design of which is well suited to running lots of concurrent readers.

Splitting event categories into containers solely to optimize these effects can also make the management of the transactional workload more complex; the ideal for any given container is to balance the concerns of:

  • ensuring that datasets for which you want to ringfence availability / RU allocations don't share with containers/databases for which running hot (potentially significant levels of rate limiting but overall high throughput in aggregate as a result of using a high percentage of the allocated capacity)
  • avoiding prematurely splitting data prior to it being required by the constraints of CosmosDB (i.e. you want to let splitting primarily be driven by reaching the [10GB] physical partition range)
  • not having logical partition hotspots that lead to a small number of physical partitions having significantly above average RU consumption
  • having relatively consistent document sizes
  • economies of scale - if each container (or database if you provision at that level) needs to individually managed (with a degree of headroom to ensure availability for load spikes etc), you'll tend to require higher aggregate RU assignment for a given overall workload based on a topology that has more containers

What's the deal with the history of this repo?

This repo is derived from FsKafka; the history has been edited to focus only on edits to the Propulsion libraries.

Your question here

  • Please feel free to log question-issues; they'll get answered here

propulsion's People

Contributors

bartelink avatar adamralph avatar enricosada avatar jorgef avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.