Giter Club home page Giter Club logo

node-cqrs-eventdenormalizer's Introduction

⚠️ IMPORTANT NEWS! πŸ“°

I’ve been dealing with CQRS, event-sourcing and DDD long enough now that I don’t need working with it anymore unfortunately, so at least for now this my formal farewell!

I want to thank everyone who has contributed in one way or another. Especially...

  • Jan, who introduced me to this topic.
  • Dimitar, one of the last bigger contributors and maintainer.
  • My last employer, who gave me the possibility to use all these CQRS modules in a big Cloud-System.
  • My family and friends, who very often came up short.

Finally, I would like to thank Golo Roden, who was there very early at the beginning of my CQRS/ES/DDD journey and is now here again to take over these modules.

Golo Roden is the founder, CTO and managing director of the native web, a company specializing in native web technologies. Among other things, he also teaches CQRS/ES/DDD etc. and based on his vast knowledge, he brought wolkenkit to life. wolkenkit is a CQRS and event-sourcing framework based on Node.js. It empowers you to build and run scalable distributed web and cloud services that process and store streams of domain events.

With this step, I can focus more on i18next, locize and localistars. I'm happy about that. 😊

So, there is no end, but the start of a new phase for my CQRS modules. πŸ˜‰

I wish you all good luck on your journey.

Who knows, maybe we'll meet again in a github issue or PR at i18next πŸ˜‰

Adriano Raiano


Introduction

travis npm

Node-cqrs-eventdenormalizer is a node.js module that implements the cqrs pattern. It can be very useful as eventdenormalizer component if you work with (d)ddd, cqrs, domain, host, etc.

Table of Contents

Installation

npm install cqrs-eventdenormalizer

Usage

var denormalizer = require('cqrs-eventdenormalizer')({
  // the path to the "working directory"
  // can be structured like
  // [set 1](https://github.com/adrai/node-cqrs-eventdenormalizer/tree/master/test/integration/fixture/set1) or
  // [set 2](https://github.com/adrai/node-cqrs-eventdenormalizer/tree/master/test/integration/fixture/set2)
  denormalizerPath: '/path/to/my/files',

  // optional, default is 'commandRejected'
  // will be used to catch AggregateDestroyedError from cqrs-domain
  commandRejectedEventName: 'rejectedCommand',

  // optional, default is 800
  // if using in scaled systems, this module tries to catch the concurrency issues and
  // retries to handle the event after a timeout between 0 and the defined value
  retryOnConcurrencyTimeout: 1000,

  // optional, default is in-memory
  // currently supports: mongodb, redis, tingodb, couchdb, azuretable, dynamodb and inmemory
  // hint: [viewmodel](https://github.com/adrai/node-viewmodel#connecting-to-any-repository-mongodb-in-the-example--modewrite)
  // hint settings like: [eventstore](https://github.com/adrai/node-eventstore#provide-implementation-for-storage)
  repository: {
    type: 'mongodb',
    host: 'localhost',                          // optional
    port: 27017,                                // optional
    dbName: 'readmodel',                        // optional
    timeout: 10000                              // optional
  // authSource: 'authedicationDatabase',        // optional
    // username: 'technicalDbUser',                // optional
    // password: 'secret'                          // optional
  // url: 'mongodb://user:pass@host:port/db?opts // optional
  },

  // optional, default is in-memory
  // currently supports: mongodb, redis, tingodb, dynamodb and inmemory
  // hint settings like: [eventstore](https://github.com/adrai/node-eventstore#provide-implementation-for-storage)
  revisionGuard: {
    queueTimeout: 1000,                         // optional, timeout for non-handled events in the internal in-memory queue
    queueTimeoutMaxLoops: 3,                    // optional, maximal loop count for non-handled event in the internal in-memory queue
    startRevisionNumber: 1,			// optional, if defined the denormaizer waits for an event with that revision to be used as first event

    type: 'redis',
    host: 'localhost',                          // optional
    port: 6379,                                 // optional
    db: 0,                                      // optional
    prefix: 'readmodel_revision',               // optional
    timeout: 10000                              // optional
    // password: 'secret'                          // optional
  },
  skipExtendEvent: false,						// optional
  skipOnEventMissing: false,					// optional
  skipOnEvent: false,							// optional
  skipOnNotification: false,					// optional
});

Catch connect ad disconnect events

// repository
denormalizer.repository.on('connect', function() {
  console.log('repository connected');
});

denormalizer.repository.on('disconnect', function() {
  console.log('repository disconnected');
});

// revisionGuardStore
denormalizer.revisionGuardStore.on('connect', function() {
  console.log('revisionGuardStore connected');
});

denormalizer.revisionGuardStore.on('disconnect', function() {
  console.log('revisionGuardStore disconnected');
});


// anything (repository or revisionGuardStore)
denormalizer.on('connect', function() {
  console.log('something connected');
});

denormalizer.on('disconnect', function() {
  console.log('something disconnected');
});

Define the event structure

The values describes the path to that property in the event message.

denormalizer.defineEvent({
  // optional, default is 'correlationId'
  // will use the command id as correlationId, so you can match it in the sender
  // will be used to copy the correlationId to the notification
  correlationId: 'correlationId',

  // optional, default is 'id'
  id: 'id',

  // optional, default is 'name'
  name: 'name',

  // optional, default is 'aggregate.id'
  aggregateId: 'aggregate.id',

  // optional
  context: 'context.name',

  // optional
  aggregate: 'aggregate.name',

  // optional, default is 'payload'
  payload: 'payload',

  // optional, default is 'revision'
  // will represent the aggregate revision, can be used in next command
  revision: 'revision',

  // optional
  version: 'version',

  // optional, if defined the values of the command will be copied to the event (can be used to transport information like userId, etc..)
  meta: 'meta'
});

Define the notification structure

The values describes the path to that property in the notification message.

denormalizer.defineNotification({
  // optional, default is 'correlationId'
  // will use the command id as correlationId, so you can match it in the sender
  // will be used to copy the correlationId from the event
  correlationId: 'correlationId',

  // optional, default is 'id'
  id: 'id',

  // optional, default is 'name'
  action: 'name',

  // optional, default is 'collection'
  collection: 'collection',

  // optional, default is 'payload'
  payload: 'payload',

  // optional, will be copied from event
  aggregateId: 'meta.aggregate.id',

  // optional, will be copied from event
  context: 'meta.context.name',

  // optional, will be copied from event
  aggregate: 'meta.aggregate.name',

  // optional, will be copied from event
  // will represent the aggregate revision, can be used in next command
  revision: 'meta.aggregate.revision',

  // optional, will be copied from event
  eventId: 'meta.event.id',

  // optional, will be copied from event
  event: 'meta.event.name',

  // optional, if defined the values of the event will be copied to the notification (can be used to transport information like userId, etc..)
  meta: 'meta'
});

Define the id generator function [optional]

you can define a synchronous function

denormalizer.idGenerator(function () {
  var id = require('uuid').v4().toString();
  return id;
});

or you can define an asynchronous function

denormalizer.idGenerator(function (callback) {
  setTimeout(function () {
    var id = require('uuid').v4().toString();
    callback(null, id);
  }, 50);
});

Wire up events [optional]

you can define a synchronous function

// pass events to bus
denormalizer.onEvent(function (evt) {
  bus.emit('event', evt);
});

or you can define an asynchronous function

// pass events to bus
denormalizer.onEvent(function (evt, callback) {
  bus.emit('event', evt, function ack () {
    callback();
  });
});

skip onEvent if provided

You can skip onEvent from being called, by adding the `skipOnEvent` option to the denormalizer. Checkout the usage section for more information.

Wire up notifications [optional]

you can define a synchronous function

// pass notifications to bus
denormalizer.onNotification(function (noti) {
  bus.emit('event', evt);
});

or you can define an asynchronous function

// pass notifications to bus
denormalizer.onNotification(function (noti, callback) {
  bus.emit('notification', noti, function ack () {
    callback();
  });
});

skip onNotification if provided

You can skip onNotification from being called, by addding the `skipOnNotification` option to the denormalizer. Checkout the usage section for more information.

Wire up event missing [optional]

you can define a synchronous function

denormalizer.onEventMissing(function (info, evt) {
  console.log(info);
  console.log(evt);
});

skip onEventMissing if provided

You can skip onEventMissing from being called, by adding the `skipOnEventMissing` option to the denormalizer. Checkout the usage section more information.

Define default event extension [optional]

you can define a synchronous function

denormalizer.defaultEventExtension(function (evt) {
  evt.receiver = [evt.meta.userId];
  return evt;
});

or you can define an asynchronous function

denormalizer.defaultEventExtension(function (evt, callback) {
  evt.receiver = [evt.meta.userId];
  callback(null, evt);
});

skip default event extensions

You can skip all event extenders and the default extensions from being executed by adding the option `skipExtendEvent` to the denormalizer. Checkout the usage section for more information.

Using custom structure loader function

The built-in structure loader can be replaced with one adapted to your needs. To do that, you need to include a loading method in the options object passed to the domain constructor.

// options will contain denormalizerPath as well as the as well as a definition object containing all the constructors of the denormalizer components  ( Collection, ViewBuilder etc. )
function structureLoader(options) {
	const collection = new options.definitions.Collection({
		name: 'col'
	});
	collection.addViewBuilder(new options.definitions.ViewBuilder({
		name: 'evt',
		aggregate: 'agg',
		context: 'ctx'              
	}, function() {}));
	return {
		collections: [
			collection
		]
	};
	// or more probably
	return myExternalLoader(options.denormalizerPath, options.definitions);
}

require('cqrs-eventdenormalizer')({
		denormalizerPath: '/path/to/my/files',
		structureLoader: structureLoader
});

Initialization

denormalizer.init(function (err, warnings) {
  // this callback is called when all is ready...
  // warnings: if no warnings warnings is null, else it's an array containing errors during require of files
});

// or

denormalizer.init(); // callback is optional

Handling an event

denormalizer.handle({
  id: 'b80ade36-dd05-4340-8a8b-846eea6e286f',
  correlationId: 'c80ada33-dd05-4340-8a8b-846eea6e151d',
  name: 'enteredNewPerson',
  aggregate: {
    id: '3b4d44b0-34fb-4ceb-b212-68fe7a7c2f70',
    name: 'person'
  },
  context: {
    name: 'hr'
  },
  payload: {
    firstname: 'Jack',
    lastname: 'Huston'
  },
  revision: 1,
  version: 0,
  meta: {
    userId: 'ccd65819-4da4-4df9-9f24-5b10bf89ef89'
  }
}); // callback is optional

or

denormalizer.handle({
  id: 'b80ade36-dd05-4340-8a8b-846eea6e286f',
  correlationId: 'c80ada33-dd05-4340-8a8b-846eea6e151d',
  name: 'enteredNewPerson',
  aggregate: {
    id: '3b4d44b0-34fb-4ceb-b212-68fe7a7c2f70',
    name: 'person'
  },
  context: {
    name: 'hr'
  },
  payload: {
    firstname: 'Jack',
    lastname: 'Huston'
  },
  revision: 1,
  version: 0,
  meta: {
    userId: 'ccd65819-4da4-4df9-9f24-5b10bf89ef89'
  }
}, function (errs, evt, notifications) {
  // this callback is called when event is handled successfully or unsuccessfully
  // errs can be of type:
  // - null
  // - Array of Errors
  //
  // evt: same as passed in 'onEvent' function
  //
  // notifications: Array of viewmodel changes
});

Request denormalizer information

After the initialization you can request the denormalizer information:

denorm.init(function (err) {
  denorm.getInfo();
  // ==>
  // {
  //   "collections": [
  //     {
  //       "name": "person",
  //       "viewBuilders": [
  //         {
  //           "name": "enteredNewPerson",
  //           "aggregate": "person",
  //           "context": "hr",
  //           "version": 2,
  //           "priority": 223
  //         },
  //         {
  //           "name": "registeredEMailAddress",
  //           "aggregate": "person",
  //           "context": "hr",
  //           "version": 2,
  //           "priority": 312
  //         }
  //       ],
  //       "eventExtenders": [
  //         {
  //           "name": "enteredNewPerson",
  //           "aggregate": "person",
  //           "context": "hr",
  //           "version": 2
  //         }
  //       ],
  //       "preEventExtenders": [
  //         {
  //           "name": "enteredNewPerson",
  //           "aggregate": "person",
  //           "context": "hr",
  //           "version": 2
  //         }
  //       ]
  //     },
  //     {
  //       "name": "personDetail",
  //       "viewBuilders": [
  //         {
  //           "name": "enteredNewPerson",
  //           "aggregate": "person",
  //           "context": "hr",
  //           "version": 2,
  //           "priority": 110
  //         },
  //         {
  //           "name": "registeredEMailAddress",
  //           "aggregate": "person",
  //           "context": "hr",
  //           "version": 2,
  //           "priority": Infinity
  //         }
  //       ],
  //       "eventExtenders": [],
  //       "preEventExtenders": []
  //     }
  //   ],
  //   "generalEventExtenders": [
  //     {
  //       "name": "",
  //       "aggregate": null,
  //       "context": null,
  //       "version": -1
  //     }
  //   ],
  //   "generalPreEventExtenders": []
  // }
});

Components definition

Collection

module.exports = require('cqrs-eventdenormalizer').defineCollection({
  // optional, default is folder name
  name: 'personDetail',

  // optional, default ''
  defaultPayload: 'payload',

  // optional, default false
  noReplay: false,

  // indexes: [ // for mongodb
  //   'profileId',
  //   // or:
  //   { profileId: 1 },
  //   // or:
  //   { index: { profileId: 1 }, options: {} },
  // ],

  // repositorySettings: { // optional
  //   mongodb: { // for mongo db
  //     indexes: [ // same as above
  //       'profileId',
  //       // or:
  //       { profileId: 1 },
  //       // or:
  //       { index: { profileId: 1 }, options: {} },
  //     ],
  //   },
  //   elasticsearch6: { // for elasticsearch 5.x and 6.x ( elasticsearch6 type / implementation / driver )
  //     refresh: 'wait_for', // refresh behaviour on index, default is true ( ie. force index refresh ) https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-refresh.html
  //     waitForActiveShards: 2, // optional, defaults to 1 ( ie. wait only for primary ) https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-index.html#create-index-wait-for-active-shards
  //     index: { // optional applied on index create, https://www.elastic.co/guide/en/elasticsearch/reference/6.x/indices-create-index.html
  //       settings: { // will be merged with the default ones,
  //         number_of_shards: 3, // optional, otherwise taken from type settings, defaults to 1,
  //         number_of_replicas: 1, // optional otherwise taken from type settings, defaults to 0,
  //       },
  //       mappings: { // optiona will be merged with the default ones,
  //         properties: { // specific properties to not be handled by dynamic mapper
  //           title: {
  //             type: 'text',
  //           },
  //         },
  //       },
  //     },
  //   },
  // },
},

  // optionally, define some initialization data for new view models...
{
  emails: ['[email protected]'],
  phoneNumbers: [],
});

If you need an information from an other collection while denormalizing an event, you can require such a collection and make some lookups. for example

col.findViewModels({ my: 'value' }, function (err, vms) {});

or

col.loadViewModel('id', function (err, vm) {});

or

col.loadViewModelIfExists('id', function (err, vm) {});

But be careful with this!

ViewBuilder

Each viewBuilder is dedicated to a specific event. It reacts on an event and denormalizes that event in an appropriate collection.

Viewbuilders are structured by collection (not by context).

module.exports = require('cqrs-eventdenormalizer').defineViewBuilder({
  // optional, default is file name without extension,
  // if name is '' it will handle all events that matches
  name: 'enteredNewPerson',

  // optional
  aggregate: 'person',

  // optional
  context: 'hr',

  // optional, default is 0
  version: 2,

  // optional, if not defined or not found it will generate a new viewmodel with new id
  id: 'aggregate.id',

  // optional, suppresses auto-creation of new view model if none matching the id can be found, default is true
  autoCreate: true,

  // optional, if not defined it will pass the whole event...
  payload: 'payload',

  // optional, default Infinity, all view-builders will be sorted by this value
  priority: 1
}, function (data, vm) { // instead of function you can define
                         // a string with default handling ('create', 'update', 'delete')
                         // or function that expects a callback (i.e. function (data, vm, callback) {})

  // if you have multiple concurrent events that targets the same vm, you can catch it like this:
  // during a replay the denormalization finishes and the retry does not happen
  if (vm.actionOnCommit === 'create') {
  	return this.retry(); // hint: do not use arrow function in this scope when using this.retry()
  	// or
  	//return this.retry(100); // retries to denormalize again in 0-100ms
  	// or
  	//return this.retry({ from: 500, to: 8000 }); // retries to denormalize again in 500-8000ms
  }

  vm.set('firstname', data.firstname);
  vm.set('lastname', data.lastname);
});

ViewBuilder for multiple viewmodels in a collection

Be careful with the query!

A lot of viewmodels can slow down the denormalization process!

module.exports = require('cqrs-eventdenormalizer').defineViewBuilder({
  // optional, default is file name without extension,
  // if name is '' it will handle all events that matches
  name: 'enteredNewPerson',

  // optional
  aggregate: 'person',

  // optional
  context: 'hr',

  // optional, default is 0
  version: 2,

  // optional, if not defined or not found it will generate a new viewmodel with new id
  query: { group: 'admins' },

  // optional, if not defined it will pass the whole event...
  payload: 'payload',

  // optional, default Infinity, all view-builders will be sorted by this value
  priority: 1
}, function (data, vm) { // instead of function you can define
                         // a string with default handling ('create', 'update', 'delete')
                         // or function that expects a callback (i.e. function (data, vm, callback) {})handling ('create', 'update', 'delete')
  vm.set('firstname', data.firstname);
  vm.set('lastname', data.lastname);
  //this.remindMe({ that: 'important value' });
  //this.retry();
});
// optional define a function to that returns an id that will be used as viewmodel id when id not specified in options or found
//.useAsId(function (evt) {
//  return 'newId';
//});
// or
//.useAsId(function (evt, callback) {
//  callback(null, 'newId');
//});	
// optional define a function that returns a query that will be used as query to find the viewmodels (but do not define the query in the options)
//.useAsQuery(function (evt) {
//  return { my: evt.payload.my };
//});
// or async
//.useAsQuery(function (evt, callback) {
//  callback(null, { my: evt.payload.my });
//});
// optional define a function that returns a list of items, for each the viewbuilder will run.
//.executeForEach(function (evt) {
//  return [{ init: 'value1' }, { init: 'value2' }];
//});
// or async
//.executeForEach(function (evt, callback) {
//  callback(null, [{ init: 'value1' }, { init: 'value2' }]);
//});
//
// optional define a function that checks if an event should be handled ( before vm is loaded )
//.defineShouldHandleEvent(function (evt) {
//  return true;
//});
// or
//.defineShouldHandleEvent(function (evt, callback) {
//  callback(null, true');
//});
//
// optional define a function that checks if an event should be handled ( after vm is loaded )
//.defineShouldHandle(function (evt, vm) {
//  return true;
//});
// or
//.defineShouldHandle(function (evt, vm, callback) {
//  callback(null, true');
//});
//
// optional define a function that checks if an event should be handled
//.onAfterCommit(function (evt, vm) {
//  //var memories = this.getReminder();
//  //console.log(memories.that); // 'important value'
//  //doSomethingStrange()
//});
// or
//.onAfterCommit(function (evt, vm, callback) {
//  var memories = this.getReminder();
//  //console.log(memories.that); // 'important value'
//  // doSomethingStrange(callback)
//  callback(memories.that === 'important value' ? null : new Error('important value not set'));
//});

EventExtender

for a collection (in a collection folder)

module.exports = require('cqrs-eventdenormalizer').defineEventExtender({

// module.exports = require('cqrs-eventdenormalizer').definePreEventExtender({ // same api as normal EventExtenders but executed before viewBuilder so the extended event can be used // optional, default is file name without extension, // if name is '' it will handle all events that matches name: 'enteredNewPerson',

  // optional
  aggregate: 'person',

  // optional
  context: 'hr',

  // optional, default is 0
  // if set to -1, it will ignore the version
  version: 2//,

  // optional, if not defined it will pass the whole event...
  // payload: 'payload'
}, function (evt, col, callback) {
  // col.loadViewModel()... or from somewhere else... (col.findViewModels( /* see https://github.com/adrai/node-viewmodel#find */ ))
  evt.extended = true;
  callback(null, evt);
});

// or

module.exports = require('cqrs-eventdenormalizer').defineEventExtender({
  // optional, default is file name without extension,
  // if name is '' it will handle all events that matches
  name: 'enteredNewPerson',

  // optional
  aggregate: 'person',

  // optional
  context: 'hr',

  // optional, default is 0
  // if set to -1, it will ignore the version
  version: 2,

  // if defined it will load the viewmodel
  id: 'payload.id'//,

  // optional, if not defined it will pass the whole event...
  // payload: 'payload'
},
function (evt, vm) {
  evt.extended = vm.get('myValue');
  return evt;
});

// or

module.exports = require('cqrs-eventdenormalizer').defineEventExtender({
  // optional, default is file name without extension,
  // if name is '' it will handle all events that matches
  name: 'enteredNewPerson',

  // optional
  aggregate: 'person',

  // optional
  context: 'hr',

  // optional, default is 0
  // if set to -1, it will ignore the version
  version: 2,

  // if defined it will load the viewmodel
  id: 'payload.id'//,

  // optional, if not defined it will pass the whole event...
  // payload: 'payload'
},
function (evt, vm, callback) {
  evt.extended = vm.get('myValue');
  callback(null, evt);
});
// optional define a function to that returns an id that will be used as viewmodel id when id not specified in options or found
//.useAsId(function (evt) {
//  return 'newId';
//});
// or
//.useAsId(function (evt, callback) {
//  callback(null, 'newId');
//});	

not for a collection

module.exports = require('cqrs-eventdenormalizer').defineEventExtender({
  // optional, default is file name without extension,
  // if name is '' it will handle all events that matches
  name: 'enteredNewPerson',

  // optional
  aggregate: 'person',

  // optional
  context: 'hr',

  // optional, default is 0
  // if set to -1, it will ignore the version
  version: 2//,

  // optional, if not defined it will pass the whole event...
  // payload: 'payload'
}, function (evt) {
  evt.extended = true;
  return evt;
});

// or

module.exports = require('cqrs-eventdenormalizer').defineEventExtender({
  // optional, default is file name without extension,
  // if name is '' it will handle all events that matches
  name: 'enteredNewPerson',

  // optional
  aggregate: 'person',

  // optional
  context: 'hr',

  // optional, default is 0
  // if set to -1, it will ignore the version
  version: 2//,

  // optional, if not defined it will pass the whole event...
  // payload: 'payload'
}, function (evt, callback) {
  evt.extended = true;
  callback(null, evt);
});

Replay events

Replay whenever you want...

denormalizer.replay([/* ordered array of events */], function (err) {
  if (err) { console.log(err); }
});

or when catching some events:

denormalizer.onEventMissing(function (info, evt) {

  // grab the missing events, depending from info values...
  // info.aggregateId
  // info.aggregateRevision
  // info.aggregate
  // info.context
  // info.guardRevision
  // and call handle...
  denormalizer.handle(missingEvent, function (err) {
    if (err) { console.log(err); }
  });

});

you can skip onEventMissing from being called, if provided, by adding the option `skipOnEventMissing` to the denormalizer. Checkout the usage section for more information.

or depending on the last guarded event:

denormalizer.getLastEvent(function (err, evt) {

  if (event.occurredAt < Date.now()) {
  	// ...
  }

});

streamed

denormalizer.replayStreamed(function (replay, done) {

  replay(evt1);
  replay(evt2);
  replay(evt3);

  done(function (err) {
    if (err) { console.log(err); }
  });

});

if you want to clear the readModel before replaying...

denormalizer.clear(function (err) {
});

ES6 default exports

Importing ES6 style default exports is supported for all definitions where you also use module.exports:

module.exports = defineCollection({...});

works as well as

exports.default = defineCollection({...});

as well as (must be transpiled by babel or tsc to be runnable in node)

export default defineCollection({...});

Also:

exports.default = defineViewBuilder({...});
exports.default = defineEventExtender({...});
// etc...

Exports other than the default export are then ignored by this package's structure loader.

Release notes

License

Copyright (c) 2019 Adriano Raiano

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

node-cqrs-eventdenormalizer's People

Contributors

adrai avatar alemhnan avatar dependabot[bot] avatar emmkong avatar glockenbeat avatar goloroden avatar irt-fbachmann avatar jamuhl avatar marcbachmann avatar nanov avatar robinfehr avatar shazichuanshuo avatar tomkaltz avatar tommiii avatar wrobel avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

node-cqrs-eventdenormalizer's Issues

need more info on eventExtenders

:) need more understanding on below points related eventExtenders

1 : Though there is viewBuilder, what is actual purpose of eventExtenders.
2 : Do we have to use both or use eventExtenders when we want to extend event.
3 : What kind of things can fit inside eventExtenders.

How to use 'meta' in `defineViewBuilder`

I can set the 'meta' field in the `defineEvent' and I would like to subsequently use that value in a viewBuilder.

Is there a way to achieve that? The function being passed as only 'data', the viewModel and eventully the callback.

If I do not define explicitly the payload field I do get the whole event with the sub fields 'data' and 'meta' but it would be nice to access the 'meta' field directly.

viewbuilder for multiple events

ed.defineViewBuilder({
    name: '', // optional, default is file name without extension, if name is '' it will handle all events that matches
    aggregate: 'timesheet', // optional
    context: 'timesheet',
    id: 'aggregateId', // if not defined or not found it will generate a new viewmodel with new id
    //payload: 'payload' // optional,
    //if not defined it will pass the whole event...
    autoCreate: true

    // optional, if not defined it will pass the whole event...
    //payload: 'payload'
  },...

I have tried this but the denormalizer wouldn't fire for all aggregates named timesheet. Did I misinterpret "it will handle all events that matches"?

write to different repositories for the same definition of the viewbuilder

In the same node instance I have:

  • 1 view builders (vb)
  • 2 mongo repositories: repomongoCustomer1, repomongoCustomer2
  • 2 denormalizers:
    - denomCustomer1 (that writes to repomongoCustomer1),
    • denomCustomer2 -> (that writes to repomongoCustomer2),
  • 1 eventstore. Each event in the eventstore contains a property customer that describes what denormalizer is in charge of the denormalization.

When I instantiate, in order, denomCustomer1 and denomCustomer2. Happens that the event is only written to the last repository (repomongoCustomer2) .

Is it possible to have multiple repository associated to multiple denormalizers, all created from the same viewBuilder?

Thanks

[Feature] FullText index

I would like to implement fulltext search in my project. I want to use the simplicity of denormalizer to index documents from events.

Create a new storage for node-viewmodel based on search-index is it a good thing ?

I wan't your feeling about that.

ConcurrencyError on replay

Hallo @adrai ,

my denormalizer-application tries to get the last processed event on startup and then all newer events and replays them.

The code is basically like this:

const lastEvent = await denorm.getLastEvent();
const lastEventTimeStamp = new Date(lastEvent.occurredAt);
const eventsSinceLastEvent = await denorm.getEventsSince(lastEventTimeStamp, 0, 1000);
const eventsToReplay = eventsSinceLastEvent.filter(ev => ev.payload.occurredAt > lastEventTimeStamp));
await denorm.replay(eventsToReplay);

Now sometimes the replay fails with a ConcurrencyError. I digged through the code and saw that the exception occurs in your ViewModel-library: it performs a conditional update - only if the _hash is different. For me it seems as if the event has already been written to the DB and therefore there is no hash-change.

Until now my application catches exceptions at startup, logs them and then quits. Because of this, the app always stops and never begins to run properly.

Now my question: is this something that is an expected situation that I should ignore? That is, catch the ConcurrencyError and simply proceed? Or do I have something wrong with my replay-code?

Unfortunately at the moment I don't have a clue what is causing this behavior. I had the denormalizer application running in my debugger, then performed some changes and restarted it. But I took care that no events were being processed when I restarted it, the app was idle.

Thanks for your help,
Steven

[Question] What are Notification used for?

Hello,

I'm trying to figure out what are notifications used for? Can someone share is knowledge on the topic please? thanks for any info you could provide on this.

Alex

[Question] how to handle Binaries?

Dear All,

I have setuped a domain that requires to store binaries. For this I have applied the ClaimCheck pattern.

Meaning when I want to submit a command that should contains a binary, I replace the binaries on the command with the claim provided from the claimcheck service. I then process the command and the event applied has also the claim check id.

Now from the readmodel. When I receive the event with the claim check. I need to retrieve back the binary from the claim check in order to save the binary on the read model repository in eventually a dedicated collection. I was wondering if you have any recommendation on that scenario?
I could of course keep the binary on the claimcheck service and I have to in order to be able to replay all the events. But I would like to avoid to server the binaries for the readmodel from there. I mean... The read model should depends only on its own storage and not on the availability of another service.

I'm open to any clean and smart suggestion :)

Wrong ViewBuilder triggered

I am triggering the event 'A'.'B'.'C'. (context.aggregate.event) and the ViewBuilder 'A'.'B'.'C' get correctly triggered.

But also the ViewBuilder D.E.C. ('C' is the same string but context and aggregate different).
I'm fully specifying the ViewBuilders with name/aggregate/context in the definition like you showed me here .

I tried different name for the file with the same results. I also tried to put unrelated random names just to uncouple the definition from the name of the file.

Do you have any hints? Might it be a bug?

Elasticseach "Too Many Requests"

Hi,

This issue is related with issue #70

While replaying a large number of events to Elasticsearch I find that at certain point elasticsearch reply with:
HTTP/1.1 429 Too Many Requests

Checking the issue I was able to find that all the 'POST' requests to elasticsearch are done at one point int time. I was able to count over 100 requests for every 10ms time period.

Checking the replay code I see that all the requests are done in:

cqrs-eventdenormalizer/lib/definitions/collection.js line 475-490

If we change this block to:

const concurrentOps = 5;
async.series([
  function (callback) {
    async.eachLimit(replVmsToDelete, concurrentOps, commit, callback); // changes
  },
  function (callback) {
    async.eachLimit(replVms, concurrentOps, commit, callback); // changes
  }
], function (err) {
  if (err) {
    debug(err);
  }
  self.replayingVms = {};
  self.replayingVmsToDelete = {};
  self.isReplaying = false;
  callback(err);
});

this will implement rate control on the requests.
But would be good to be able to control the rate while instanciating the repository. (or any other nice place)

There is any possibility to implement this on the lib ?

Please let me know.
Tnks
--Fabio

No occurredAt field in THE_LAST_SEEN_EVENT

Hi Adriano,
there is currently no occurredAt-field in the last seen event in the revision-collection.
I couldn't find this string in the codebase, too - have you removed it at some time?

Reconnect if connect fails?

Hallo,
I'm using MongoDB. When Mongo is not started and I try to init the event store, it throws an exception:

MongoError: failed to connect to server [localhost:27017] on first connect [MongoError: connect ECONNREFUSED 127.0.0.1:27017]

Question 1: is there a way to instruct cqrs-eventdenormalizer to perform a retry? Same question goes to cqrs-domain.

Question 2: what about retries if the connection is initially fine and the event denormalizer starts up properly. Later Mongo goes down - how should I handle that situation?

Thanks,
Steven

Elasticsearch support

Apologies for the maybe silly question, is Elasticsearch supported in the denormalization?

I see that I can specify settings for a collection, but I don't see, at least in the docs, where to set up the connection to Elasticsearch.

onEventMissing handling and callback

Hey,

When handling missing events using onEventMissing, the callback is not called which can cause issues with message acking/rejecting, is this correct? If yes, is this by design or can be extended to call the callback after handling the missing events?

Thanks!

Revison guard store horizontal scaling

Is possible to change the db for the revision guard store? I see it saves the events in memory but I've some concerns about the horizontal scaling. Are there some limits? What happens if I'm missing a huge amount of events?

ElasticSearch6: vm.actionOnCommit==='update' gives status:404

Hi,

I am denormalizing 2 events in replay mode.

  • topicOpened, that creates the aggregate
  • topicUpdated, modifies the aggregate already created.

They are alway saved and executed in order, by the denormalizers.

Although after the 2nd denormalizer the vm.actionOnCommit is 'update'. Only when the 2nd denormalizer is executed the ViewModel is passed to the bulkCommit(...). To the bulkCommit(...) arrives only vm.actionOnCommit='update'. (https://github.com/adrai/node-viewmodel/blob/master/lib/databases/elasticsearch6.js#L225 ). So only the operation "update" is pushed to the elasticsearch .
Because the record does not exist, the update fails.
Based on my observations, this issue occurs whenever the is more than 1 event for one aggregate (ie: revision > 1).

My suggestion is to 'upsert' the record when vm.actionOnCommit is 'update', by using the 'index' operation. (similarly to what is done in the mongo driver , https://github.com/adrai/node-viewmodel/blob/master/lib/databases/mongodb.js#L296 )

Am I missing something?

Thanks

Dealing with missed events

What is the recommended way to deal with missed events?
I tried doing an RPC queue requesting those ( aggId, guardedRev, aggregateRev ) and the handling those one by one, but this approach seems to bring some problems with it ( mainly because of the handle inside handle ). Should i just send the missing events again thought the normal events queue ( send/receiver ) ?

'disconnect' event for dynamodb repository not working

Hi,

We are currently implementing the denormalizer with the dynamodb repository and it seems like the 'disconnect' event does not get emitted.

I tested it by setting up a docker-compose with my service (using the denormalizer) and dynamodb and simply stopped the dynamodb container while the service was still running.

Can someone confirm that this is a problem with dynamodb? or is it a general problem?

Thanks for the answers in advance

Create a custom repository to store data into a relational database

I am having some trouble placing a custom repository wiich would store data in a relational database which for the purposes of my system I have to keep updated the same way that I want to maintain the "mongo" repository updated, the best place for it to be set should be at the viewBuilder, using the callback?

Something like that?

module.exports = require('cqrs-eventdenormalizer').defineViewBuilder({
  name: 'itemCreated',
  id: 'payload.id'
}, function (data, vm, callback) {
  // assync logic to record data elsewhere
  //updating mongo database
  vm.set(data);

  recordOnRelationalDatabase(data).then(function (recordedData) {
    //...
    return callback();
  }),catch(function(err) {
    callback(err)
  });
});

viewbuilders not ordering properly

hie :)

what i am trying to achieve : i create sales invoice and related to sales invoice i create accounting entries and if there is problem in creating accounting entries then i want to cancel sales invoice created earlier.

on salesInvoiceCreated event

  1. salesInvoiceCreated viewbuilder executes and creates readmodel
  2. salesInvoiceCreatedSaga.js listens and creates command createAccounting.
  3. i forcefully generated error in businessrule of createAccounting so that it will throw rejectedCommand event
  4. on rejectedCommand event it will fire cancelSalesInvoice

my sample with context and saga implemented

problem:
some times every thing works fine it creates sales invoice then and on problem in creating accounting entries it cancels sales invoice created earlier and keeps giving AlreadyDenormalizedError error
but but but
some times cancel sales invoice fires first and then create sales invoice and keeps giving AlreadyDenormalizedError error

solutions that i tried :
also i tried to set priority in viewbuilder

Update multiple viewmodels question

Hi,

I am implementing a tree structure based on nested set and when adding a node in the tree, some other nodes need updates.
What I have in mind is handling the domain event in 2 view builders, one adding the node and one updating the other nodes but the problem is I can't specify in the query attribute of the viewBuilder the ID of the tree for example.

module.exports = require('cqrs-eventdenormalizer') .defineViewBuilder(
    {
      name: 'nodeAdded',
      aggregate: 'node',
      context: 'nodes',
      version: 0,
      query: {
          // here
      },
      autoCreate: false,
      priority: 1
    },
    function(data, vm, cb) {
        // ...
    });

On my mind only using a function for the query attribute would do the trick.

...
query: function (ev /*, cb */) {
    return {
        treeId: ev.aggregate.id // in my case the domain aggregate is the tree
    }
},
...

Rebuild viewmodel with eventdenormalizer and node-eventstore

Hello,

Thank you for your library.

I want to rebuild a viewmodel.

I can do it by using:
eventdenormalizer.clear(function (err) {});
eventdenormalizer.replay([/* ordered array of events */], function (err) { if (err) { console.log(err); } });

I can query my events from node-eventstore using:
es.getEvents()

However, I didn't find the solution to pass my events from the event store to my eventdenormalizer.
Since eventdenormalizer subscribes to eventstore events, do I need to dispatch each event using eventstore.setEventToDispatched or is there another way?

Thanks.

Josh

[General Question] revisionGuard usage?

Hi there, first thanks for all this open sourced cqrs component. They are of great value. I was wondering if you could point me to some documentation that describes what is the revisionGuard used for? I'm familiar with CQRS and I have followed Greg young intensive CQRS weekend seminar but I do not record having heard of the revision guard :( thanks for any help.
Alex.

Event's not reaching denormalizer

I think I'm missing something simple but I'm not sure if it's config or implementation. My issue is that I'm not seeing events reaching my denormalizer(s). I've wired in modules cqrs-domain & node-cqrs-eventdenormalizer, with basic configuration using InMemory storage/queues. I've left out all optional configuration.

In the execution sequence, everything is happening in aggregates as expected, but I'm not seeing a publish to the denormalizer. Any hints on how to wire this up would be greatly appreciated.

replayStreamed not committing into the database

I'm running the replayStreamed function and I have issues in getting it working. Apparently, the viewBuilders are not committing the data during each replay. Therefore if to denormalize event 2 I need to lookup data already denormalized in event 1 it fails because it is not committed yet in the database.

Am I doing something obviously wrong? or maybe you are caching the views and committing in batches?

Do you have any hint how to approach this case?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    πŸ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. πŸ“ŠπŸ“ˆπŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❀️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.