rebus-org / rebus Goto Github PK
View Code? Open in Web Editor NEW:bus: Simple and lean service bus implementation for .NET
Home Page: https://mookid.dk/category/rebus
License: Other
:bus: Simple and lean service bus implementation for .NET
Home Page: https://mookid.dk/category/rebus
License: Other
PascalCase reaks of .NET property naming convention - camelCasing in XML attribute names if much XMLy (or something)
Enlisting in ambient TX is not necessary for neither MongoDbSubscriptionStorage nor MongoDbSagaPersister because - they should both just "do their thing", and then verify that the insert/update went well, throwing some kind of exception if that was not the case.
Just do it.
Just start out with Rebus. Remember Semver.
Don't create packages for all the integration projects just yet.
do it.
and figure out how to store stuff... possibly as a serialized JSON blog with columns for correlation values...?
When errors occur during MSMQ communication, only a one-liner makes it to the output window. It should be possible to see the full details of what went wrong somehow.
To allow that containers provide ctor injection for headers etc.
by implementing IHandleMessages<dynamic>
. That would be cool!
(not sure how an IHandleMessages<dynamic>
would actually behave at the moment... is it just a question of having the Dispatcher
resolve those as well?
The existing SqlServerSagaPersister
is, due to it's dynamic nature, probably pretty heavy and clunky for most uses. This is due to the fact that it serializes the entire saga data in a blob, storing correlation IDs in a separate index table.
A CustomSqlServerSagaPersister
would be cool because the saga persister would require the user to provide a mapping to/from saga data object type, and then all the speed of a single-row saga persister can be achieved, while still allowing users to model their saga data with e.g. all of their nifty domain primitives etc.
I'm thinking that a mapping (Type) => (FromSagaData, ToSagaData)
must be provided to the persister for each saga data type that is to be used, where FromSagaData
is a lambda that "flattens" the saga data into a single db row (possibly in the form of a Dictionary<string, object>'), and
ToSagaData` is a lambda that "hydrates" the data again.
and that includes moving the erronous message to an error queue.
Remember that the receiving transaction should be used to send to the error queue, which should be committed.
When handling messages from an error queue, or when implementing routing, it would be a nice feature if there was a bus.Forward()
and bus.Forward(endpoint)
, allowing the transport message to be forwarded with all its headers intact.
ATM headers can be read from the message context - make it possible to provide headers for outgoing messages
Break persistence of timeouts out into ITimeoutRepository
, putting the existing in-memory stuff in an InMemoryTimeoutRepository
. This will allow for implementing Mongo/SQL server persistence some time later.
IHandleMessages<object>
in front of everything in a service.Test case PersisterCanFindSagaByPropertiesWithDifferentDataTypes
does not work because BSON uses the actual types, and IStoreSagaData
does lookups with strings... this is not optimal....
Either replace the Find
signature with one that takes an object, or embed the saga data in a root document which also holds an the values under which the saga data is indexes... e.g. something like this
{
// saga data Guid id is transferred to the root document
_id: BinData(3,"55aswyKr1EibG5x1mj/acw=="),
// index holds ToString() representations of all values
// this piece of data should be indexed by
index: {
_id: "30734953-5e28-4635-8c98-1b0fbb959d15",
someProperty: "23",
embedded: {
embeddedProperty: "67"
}
},
// the actual saga data
data: {
_id: BinData(3,"55aswyKr1EibG5x1mj/acw=="),
_t: "SomeAssembly.MySagaData",
someProperty: 23,
embedded: {
embeddedProperty: 67
}
}
}
which would require e.g. a lookup for SomeProperty
on the saga data to query index.someProperty
, and a lookup for Embedded.EmbeddedProperty
to query index.embedded.embeddedProperty
.
Actually, it might be a good idea to flatten the index
document - just come up with some way of representing the .
in a BSON property name.
In the AssertCanSendAndReceiveMessageWithHeaders method under the Contracts for new transports it tests whether the transport transfers the headers with the message-body thus forcing the transport-implementation to encode the headers itself in order to pass the test.
But when using the RebusBus the headers are already serialized using one of the ISerializeMessages implementations.
This means that in order to pass the Contracts-tests the transport (the azure queue in particular since it has no extensions) will transfer the headers duplicated.
Is this by design or more of a consequence of the Contracts being designed to use utilize and test the MSMQ messages Extensions functionality?
do. it.
Trim all projects to depend only on the BCL libs that are actually used.
This task may be in the nitpickers' department, but still - you can't deny there's a correlation between having a huge number of dependencies and not caring ;)
Make serialization work to/from streams
Replace the static 1.0.0-alpha version in nuspec files dynamically when building (probably generating an AssemblyInfo.cs file and pulling it from there).
Build numbers would be nice - but a temporary solution could be:
1.0.0-alpha20120315-121430 (yyyyMMdd-hhmmss)
This would allow people using NuGet references to both see and update Rebus packages from either the global NuGet feed or local build feed - without uninstalling and reinstalling the package (necessary now because version doesn't change).
When double-clicking error messages, a simple MessageBox.Show(...)
is used to show the details. A proper error dialog should be made, allowing for arbitrary length error text to be shown (and scrolled and copy/pasted etc.).
At the moment, headers are serialized into the message even though they are preserved "on the side" for e.g. infrastructure to look at.
Headers should probably only be transferred "on the side".
When a message gets properly deserialized, but the handling of it fails max, the source queue should be included in the header when moving the message to the error queue.
That will allow a tool similar to NServiceBus' returnToSourceQueue to be written.
When a message fails multiple times, and it is decided that the message is poisoned, it gets moved to the service's error queue. In doing that, the experienced exceptions are included in the rebus-error-message
header.
The rebus-error-message
header should get special treatment in Snoop, allowing failed messages to be emphasized somehow.
remove existing IContainerAdapter
...
... implement IActivateHandlers
by abstract ContainerAdapter
class that looks something like
public abstract ContainerAdapter : IActivateHandlers
{
public IEnumerable<IHandleMessages<TMessage>> GetHandlerInstancesFor<TMessage>()
{
return ResolveAll<IHandleMessages<TMessage>>();
}
public void ReleaseHandlerInstances<T>(IEnumerable<IHandleMessages<T>> handlerInstances)
{
Release(handlerInstances);
}
public abstract T Resolve<T>();
public abstract IEnumerable<T> ResolveAll<T>();
public abstract void Release(object obj);
public abstract void RegisterInstance(object instance, params Type[] serviceTypes);
public abstract bool HasImplementationOf<T>();
}
Due to the combintation of status logger instances and how the logger factory works, all the configurers' loggers will be initialized and statically cached, resulting in NULL-logging of configuration errors.
On the short run, a test should be written that ensures that no configurers contain a static field of the Rebus.Logging.ILog
type. This will be a hint that configurers should pull a new logger every time they want to log something.
Probably some simple Linq2Sql thingie
Don't use BasicGet
, change it to set up a Subscription
at startup and then pull messages with subscription.Next(timeout, out result)
.
Remember to check if Subscription
is re-entrant - otherwise, bind subscriptions to the current thread.
Polymorphic dispatch results in multiple resolutions of handlers, each of which gets invoked in a separate pipeline.
It would be much better (and much more POLA) if one single pipeline was constructed, intercepted, and executed.
Rebus created queue currently inherit permissions from the running process. However, this generates queues that are completely inaccessible if the process is running as LOCAL SYSTEM.
Best solution off the top of my head is to always grant full control to the Administrators group. The permissions granted to Everyone should also be SID based (so we don't run into localization problems).
References for groups / SID translations:
Well-known security identifiers: http://support.microsoft.com/kb/243330
Translating from SID: http://stackoverflow.com/questions/499053/how-can-i-convert-from-a-sid-to-an-account-name-in-c-sharp
When msmq input queue is configured w XML, and the queue is not specified, the resulting error message comes from deep inside MsmqMessageQueue.
The error should be caught by the configurer instead, and the error message should be nice and constructive :)
Make RebusBus know how to handle sagas and store/correlate by using an IStoreOngoingSagas
Currently Rebus just sends failed messages to a queue called "error", but doesn't bother to check whether or not this queue exists.
Should create the queue at start up - and maybe provide the NSB equivalent of naming config for queue name.
Thinking of maybe implementing an alternative to the default 5-retries strategy.
Maybe it would be useful for some scenarios if the service would immediately stop processing messages if an error was encountered, which - if the service was single-threaded - would ensure that messages would always be processed in a FIFO manner.
This would probably require some kind of control bus mechanism so that message processing could be resumed when the error condition was resolved.
Just an idea. Wondering which scenarios could make use of something like this?
As the retry logic works only for deserialized messages, when a message fails during deserialization, it is not moved to the error queue - instead it is retried forever, which is not what we want....
At the moment, IDetermineDestination
is given only the type of the message as input.
Content-based routing could be easily implemented if the entire message was given as input.
But what should happen then, when someone calls bus.Subscribe<TMessage>()
? null
as the message instance?
Hmmm......
Implement a distributor process. This most likely involves doing the following:
Worker
be able to be in workerMode
workerQueue.Send
that message to the workerWas that it?
Think about which metrics make the most sense, and then go implement them :)
The inspector should know enough about Rebus to be able to inspect a queue, look at message headers, and possibly visualize messages if it knows how to deserialize them.
This would probably require that headers be equipped with info that reveals the content type, encoding, etc.
The purpose is to allow for humane inspection of error queues. Also, it would be cool if the inspector could return messages to their source queues.
In order for sagas to be able to model stuff that takes time, a proper timeout process should be built.
It should be made so that it stores its scheduled messages in durable storage like SQL or Mongo.
ATM, SqlServerSagaPersister
depends on the JSON serializer, and it would probably be nice if Rebus core could do JSON serialization - therefore, in order to stay clear of other peoples' assembly references, Newtonsoft should be merged into core to allow for JSON serialization out-of-the-box.
which would be a nifty place to insert encryption and/or compression
Now that the clunky IMessageModule
thing has been removed, RebusBus
should be fitted with a couple of (regular .NET) events that allow for infrastructure to hook up before/after processing messages.
And then MessageContext
should be equipped with a dictionary of things, similar to HttpContext.Current.Items
in ASP.NET, that can be used to hold on to stuff for the duration of handling one message.
or maybe just save the list of machines so that it's still there when Snoop opens
When The Snoop returns message to source queues, the user must refresh all of the involved queues to see the results.
It would be a nice gesture if the view models would be automatically updated in the background, avoiding the need to refresh.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.