zarusz / slimmessagebus Goto Github PK
View Code? Open in Web Editor NEWLightweight message bus interface for .NET (pub/sub and request-response) with transport plugins for popular message brokers.
License: Apache License 2.0
Lightweight message bus interface for .NET (pub/sub and request-response) with transport plugins for popular message brokers.
License: Apache License 2.0
Currently Avro serialization is not supported.
We need to add this plugin in addition to the existing Json serializer.
https://www.nuget.org/packages/Apache.Avro/
Consider the following message types and inheritance:
public abstract class BaseMessage
{
public DateTime Created { get; set; }
}
public class CustomerEvent : BaseMessage
{
public Guid CustomerId { get; set; }
}
public class CustomerCreatedEvent: CustomerEvent { }
public class CustomerChangedEvent: CustomerEvent { }
All of these messages are serialized by Newtonsoft.Json, and thus can be serialized/deserialized into the same topic. The Json serializer is able to infer the proper message type using the $type
property.
Now, using SMB I can send these messages:
await bus.Publish(new CustomerCreatedEvent { CustomerId = Guid.NewGuid() });
await bus.Publish(new CustomerChangedEvent { CustomerId = Guid.NewGuid() });
However, I need to configure each message separately:
MessageBusBuilder mbb = ...;
mbb
.Produce<CustomerEvent>(x => x.DefaultTopic("customer-events"))
.Produce<CustomerCreatedEvent>(x => x.DefaultTopic("customer-events"))
.Produce<CustomerChangedEvent>(x => x.DefaultTopic("customer-events"))
It would be better, if we could just configure the CustomerEvent
and have any subclass of that message follow the same config, eg:
mbb
// will apply to CustomerCreatedEvent and CustomerChangedEvent
.Produce<CustomerEvent>(x => x.DefaultTopic("customer-events"))
I'm trying to use the .Do(...)
reflective registration of producers and consumers as outlined in the samples repository of the WebApi: https://github.com/zarusz/SlimMessageBus/blob/master/src/Samples/Sample.DomainEvents.WebApi/Startup.cs#L98
However, when I use this registration method for my one sample producer/consumer, the following is logged:
[20:01:35 DBG] Found a base type of Namespace.XyzEvent that is configured in the bus: System.Object
Seems to me like the sample is missing the following piece of code down in the .ForEach(find => ..)
method:
builder.Produce(find.EventType,
x => x.DefaultTopic(x.Settings.MessageType.Name));
I'll happily contribute an PR fixing it if you agree :)
Hi,
Currently IConsumer is used as an interface, and message received through OnHandle function.
In scenarios like a infinite loop, it makes more sense to register a handler like IReceiverClient from Microsoft.Azure.ServiceBus.Core which process incoming messages in a local function, which can fit into the scanario I mentioned.
receiverClient.RegisterMessageHandler<FooMessageType>((message, token) =>
{
//process the message here
});
HI, Is possible to add support for a key in the consumer builder?
I am talking about this line:
Key is helpful when someone want to separate messages by type in one topic, and let consumer listen only on own typed partition, or if someone need to implement specific consumers per partition key, this could be also consumer per saga.
For example i can define partition key in producer, to be based on message namespace :
public static MessageBusBuilder AddProducer<TMessage>(
this MessageBusBuilder builder, string defaultProduceTopic)
{
builder.Produce<TMessage>(x =>
{
x.DefaultTopic(defaultProduceTopic);
x.KeyProvider((request, topic) => Encoding.ASCII.GetBytes(typeof(TMessage).Name));
});
return builder;
}
But key sellector on consumer side is missing. Would be nice to extend cconsumer configuration to be able to specify partition key as byte array, or delegate + key deserializer for other types.
I want to implement something like on this image: https://docs.datastax.com/en/kafka/doc/kafka/images/partitionsKafka.png
When I set kafka as provider Request Response is not working.
I am using Sample.Simple.ConsoleApp
Hi, congrats on the abstraction library. I have recently come across it, and there is an aspect of it I wanted to check with you guys.
One of Kafka's best practices is to reuse a consumer by subscribing it to multiple topics. Is such set up supported? I see most Topic methods take only a String (e.g., ConsumerBuilder.Topic(string topic)) and not a list of strings, but perhaps it is supported some other way. I recognize beforehand that I haven't dug too deep in the implementation classes still.
Thanks in advance.
Regards,
JP
Hello, Microsoft has officially deprecated Microsoft.Azure.ServiceBus in favor of Azure.Messaging.ServiceBus
The ASB Transport layer should be roworked to use this newer library. Migration guide available here
Currently we have JSON, Avro serialization plugins. It would be good to add Protocol Buffers support as well:
https://developers.google.com/protocol-buffers
Is there any way to publish a message and be handled by just one consumer? the message is something like a task, I am using redis as provider, it has a command RPOPLPUSH that do what I want but not sure if I can use it with this lib. thanks
Hi, thank you for providing this nice to start and easy to use message bus library. I searched a while for a message bus with quite exactly these features and simpleness. For one of my projects, I plan to start with a simple self-contained application running in a single process, where all parts are connected through an in-memory message bus. But I want to have the ability to scale out to multiple processes/servers without having to rewrite the whole application. This plan looks to be easily made by using your library.
I have a question regarding the pub/sub behavior of the MemoryMessageBus. For my understanding the bub/sub messaging using the Publish
-method should work like a queue where I put things in and stop worrying how long the execution takes. On the other hand, the request-response messaging using the Send
-method with the await
keyword should block the execution of my calling method until the response has arrived.
If I see it correctly, this behavior is supported by all available transport implementations but not by the MemoryMessageBus. The MemoryMessageBus awaits the execution of the work even if I use the Publish
-method. I did not see a note about it in the docs but there is an example in the DomainEvents sample projects. The OrderSubmittedHandler
returns a delay of one second which blocks the Post
-method in the OrdersController
for that second before it can respond to the HTTP request.
Is this the intentional behavior? And what is the reason for it? Maybe I simply have a wrong understanding about the intended behavior of general pub/sub messaging in the message bus.
Cheers from Germany
It would be beneficial if the SMB transport providers send the message type (e.g. the full name of the NET type) in the native headers of a message during message publish. That way:
$type
for Newtonsoft.Json)Respectively, that header could be used on the respective consumer transport implementation to achieve the above.
This would be only applicable for transports that support message headers (Azure Service Bus, Kafka, Event Hubs, etc).
This is a feature request to introduce two types of transport in SlimMessageBus for:
Azure Event Hub supports a partition key for messages. We need to support assigning partition keys during publishing as we do for other transport providers (Kafka / ASB).
mbb
.Produce<CustomerUpdated>(x =>
{
x.DefaultTopic("topic1");
// Message key could be set for the message
x.KeyProvider((message) => message.CustomerId.ToString());
})
In the scenario that there is a hybrid bus composed of Memory bus for domain events and Azure Service Bus for out of process events, when there is an consumption of external ASB message that already created a scope and during the message processing the memory bus is used to send some domain events and later handle, the memory bus ends up creating another nested child scope.
This might not be desired and instead of creating a new child scope, the memory bus should join the scope started by the azure service bus message consumption.
Hi,
I would like to contribute to the project by building a provider for Apache Pulsar.
I would like to hear what you think. I saw in the CONTRIBUTING.md file that we need a discussion about the high-level design.
I am new to open source projects. If you can guide me a little, I would appreciate it. Thank you so much
I'm implementing SlimMessageBus with EventHubs and am targeting a single EventHubs instance (topic) for all messages. Using pub/sub so my registrations more or less look something like:
MessageBusBuilder.Create()
.Consume<FooBar>(config => config
.Topic(configuration["SharedTopic"])
.Group(configuration["ServiceConsumerGroup"])
.WithConsumer<FooBarConsumer>())
I'm noticing that each of my consumer groups picks up messages for all message types...ie. the message processing does not filter on message type. Since the messages are not filtered by type, I'm wondering if your intention was each message would get its own topic.
Please consider to make IRequestMessage interface optional. It will allow to define Infrastructure message bus adapter without specific message bus dependencies on application level. I believe Send method could be extended by another generic parameter TResponse, which would indicate what is expected response.
Currently the in-memory transport provider does not support request-response communication.
We need to implement that.
I've been thinking on building an hybrid bus implementation where you would be able to compose multiple bus implementations (e.g. in-memory, azure service bus) to have part of the bus messages route via one provider (in-memory), and others messages via another provider (Azure Service Bus). That opens up a lot of possibilities.
There is a newer Azur EH library from Microsoft:
https://www.nuget.org/packages/Azure.Messaging.EventHubs.Processor/
MSDN
https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-features
Migration Guide
https://github.com/Azure/azure-sdk-for-net/blob/main/sdk/eventhub/Azure.Messaging.EventHubs/MigrationGuide.md
The System.Text.Json is gaining more popularity.
We need add serialization support for that library.
Hi, would like to ask for few things:
How to configure it to use with situation where different types of events are sent to the same topic and I would like to have one subscription. Reason is to have order of messages around one aggregate. Is this possible?
Is it possible to configure serialization based on some header from message? Like I don't want to use type.Name because in case someone change type name everything will fail. And as we know people are often changing names during refactor. I prefer to have some const with type ma to which deserialize it. What you think?
There is some idea to support Azure Service Bus sessions? Now it fails.
At the end very, very good code. I don't remember when last time I have seen such high quality repository!
Thumb up! For you!
While I was running your tests, I detected a error
"LogManager.cs not found"
Hi @zarusz
I'm looking for a way to change the default MaxAutoRenewDuration parameter. On Azure ServiceBus message is visible again after 5 minutes if the user doesn't mark it as completed manually.
We have a case that processing on a single message could take up to 2 hours. Right now by handle that passing MaxAutoRenewDuration parameter to MessageHandlerOptions.
Unfortunately, BaseConsumer does not set this method.
Is there any method that we can use to achieve the same result?
Or maybe can we made a pull request with changes (but probably you will have to tell us how do you see that).
Hi, we can't seem to get messages to be consumed concurrently in our .NET Core 3.1 web app. Here is the code:
MessageBusBuilder
public class MyMessageBusBuilder
{
private const int NUM_CONSUMER_INSTANCES = 10;
public IMessageBus Build(IServiceProvider provider)
{
return MessageBusBuilder.Create()
.PerMessageScopeEnabled(true)
.Produce<ClientRegisteredMessage>(x => x
.DefaultTopic("test topic"))
.Consume<ClientRegisteredMessage>(x => x
.Topic("test topic")
.WithConsumer<ClientRegisteredMessageConsumer>()
.Instances(NUM_CONSUMER_INSTANCES))
.WithSerializer(new JsonMessageSerializer())
.WithDependencyResolver(new LookupDependencyResolver(provider.GetRequiredService))
.WithProviderRedis(new RedisMessageBusSettings("redis connection string"))
.Build();
}
}
Startup.cs
public void ConfigureServices(IServiceCollection services)
{
var messageBusBuilder = provider.GetRequiredService<MyMessageBusBuilder>();
services
.AddSingleton<IMessageBus>(messageBusBuilder.Build)
.AddTransient<ClientRegisteredMessageConsumer>();
}
public void Configure(IApplicationBuilder app)
{
// Force the singleton SMB instance to be created on app start
app.ApplicationServices.GetRequiredService<IMessageBus>();
}
At startup, after the "Creating consumers" log entry we see only one "Creating consumer X for topic Y and message type Z" message logged. We are seeing only one message processed at a time.
Have looked through the docs and samples including https://zarusz.github.io/SlimMessageBus/docs/intro.html#concurrently-processed-messages, not sure if we are missing something or if there is a bug.
The Confluent.Kafka client was redesigned from version 1.x:
https://github.com/confluentinc/confluent-kafka-dotnet/wiki/Client#version-011x-vs-1x
The 1.x introduced breaking changes.
We need to upgrade SMB to use the latest client.
The log statement does not populate the placeholder properly, for example:
2020-02-07 17:35:58,007 [6] DEBUG SlimMessageBus.Host.AzureServiceBus.ServiceBusMessageBus - Creating ITopicClient for name 0
In some cases the consumer (IConsumer<T>
) needs to inject scoped dependencies (like the EF DbContext) hence needs to be registered as scoped in the DI container. As a result, the SMB should create a scope and resolve such consumers in that scoped container. The scope should be bound to each message which represents a transaction or a unit of work.
Also related to #34 .
Azure Message Bus allows a feature named Message Sessions which guarantees FIFO handling of Messages.
It would be create if there is an easy way to activate this feature in Slim Message Bus.
The Extensions.Logging has become the standard for .NET as an logging abstraction. We should use it instead of the older Common.Logging.
The Kafka and Azure SB providers have a way to expose the native message to the consumer to pull additional transport specific information (correlation id, headers, parition, etc).
However, this is not currently documented.
Are there any plans to port SlimMessageBus to .NET Standard?
On pub/sub, if a handler throws or anything else happens, the last error message in logs is:
SlimMessageBus.Host.Memory.MemoryMessageBus: Waiting on 1 consumer tasks
I believe some info about what went wrong might be expected.
It would be good to add transport for GCP Pub/Sub Lite
https://cloud.google.com/pubsub/docs/overview
It would be good to add transport for GCP Pub/Sub
https://cloud.google.com/pubsub/docs/overview
So I read all the available documents (all the Readme.md files), and I would like to get clarification on the lifecyle of the IMessageBus.
Let's take the Domain Events sample as an example. In a typical web application, incoming request made -> create an event -> publish the event -> some handler receives it and process it -> happy day.
Now, the sample uses MessageBus.Current, as it mentions, it depends on the DI setting (scoped/singleton etc.).
The question is: if it's scoped (as the sample sets), will all the messages (from different sessions) be delivered and consumed?
And for the event consumer, I have a scenario that it uses in-memory transport, and each instance of the consumer only processes certain messages (same type).
Maybe I should use "topic" for this?
RabbitMQ Implementation seems to be missing. We would love to have that.
Hi,
In my case( in-memory host), I set EnableMessageSerialization is false, it is still required to install and configure the SlimMessageBus.Host.Serialization.Json.
Is it possible not to force the usage of the serialization?
There should be an option (or another plugin) added to Host.AzureServiceBus
plugin that would generate a topic (with the relevant subscruption) or queue if it does not exist yet.
The information about the topology could be read from the queue / topic / subscriptions registered within the SMB (on the producer and consumer side).
Ideally some additional options should be made available that would affect the topic/queue attributes (partitioning enabled, duplicate detection, etc).
It should be an opt-in feature.
I'm following the approach in the WebApi sample for our web application. My message consumer needs a DbContext (or a class which needs a DbContext), which must be scoped to avoid concurrency errors. However, since the IMessageBus is a singleton, created when the application starts, it cannot have a scoped service as a dependency. How can the message consumer use a scoped dependency, such as a DbContext?
Hi,
I saw the Do method to auto discover producers and consumers. Is it possible to use it multiple time? For example if I have different project within the solution and I want to register the producers and consumers there.
Thanks
Create sample of example usage of SMB with Azure Functions V4 dotnet isolated process with at least:
Are there any plans to support .NET 5 applications?
I have an application that I am upgrading from a netcoreapp3.1 to net5 and it seems like the EventHub code no longer works after I make the upgrade.
I get the following error when trying to publish a message:
Operation is not valid due to the current state of the object.
at Microsoft.Azure.Amqp.Transport.TransportStream.Flush()
at System.IO.Stream.<>c.b__39_0(Object state)
at System.Threading.Tasks.Task.InnerInvoke()
at System.Threading.Tasks.Task.<>c.<.cctor>b__277_0(Object obj)
at System.Threading.ExecutionContext.RunFromThreadPoolDispatchLoop(Thread threadPoolThread, ExecutionContext executionContext, ContextCallback callback, Object state)
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Threading.ExecutionContext.RunFromThreadPoolDispatchLoop(Thread threadPoolThread, ExecutionContext executionContext, ContextCallback callback, Object state)
at System.Threading.Tasks.Task.ExecuteWithThreadLocal(Task& currentTaskSlot, Thread threadPoolThread)
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
at System.Net.Security.SslStream.d__1711.MoveNext() at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw() at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at System.Runtime.CompilerServices.TaskAwaiter.GetResult() at Microsoft.Azure.Amqp.TaskHelpers.EndAsyncResult(IAsyncResult asyncResult) at Microsoft.Azure.Amqp.StreamExtensions.EndAuthenticateAsClient(SslStream sslStream, IAsyncResult asyncResult) at Microsoft.Azure.Amqp.Transport.TlsTransport.HandleOpenComplete(IAsyncResult result, Boolean syncComplete) at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw() at Microsoft.Azure.Amqp.ExceptionDispatcher.Throw(Exception exception) at Microsoft.Azure.Amqp.AsyncResult.End[TAsyncResult](IAsyncResult result) at Microsoft.Azure.Amqp.AmqpObject.OpenAsyncResult.End(IAsyncResult result) at Microsoft.Azure.Amqp.AmqpObject.EndOpen(IAsyncResult result) at Microsoft.Azure.Amqp.Transport.TlsTransportInitiator.HandleTransportOpened(IAsyncResult result) at Microsoft.Azure.Amqp.Transport.TlsTransportInitiator.OnTransportOpened(IAsyncResult result) at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw() at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at System.Runtime.CompilerServices.ConfiguredTaskAwaitable
1.ConfiguredTaskAwaiter.GetResult()
at Microsoft.Azure.EventHubs.Amqp.AmqpEventHubClient.d__32.MoveNext()
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable1.ConfiguredTaskAwaiter.GetResult() at Microsoft.Azure.Amqp.FaultTolerantAmqpObject
1.d__6.MoveNext()
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at Microsoft.Azure.Amqp.Singleton1.<GetOrCreateAsync>d__13.MoveNext() at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw() at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at Microsoft.Azure.Amqp.Singleton
1.d__13.MoveNext()
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable1.ConfiguredTaskAwaiter.GetResult() at Microsoft.Azure.EventHubs.Amqp.AmqpEventDataSender.<CreateLinkAsync>d__12.MoveNext() at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw() at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at System.Runtime.CompilerServices.ConfiguredTaskAwaitable
1.ConfiguredTaskAwaiter.GetResult()
at Microsoft.Azure.Amqp.FaultTolerantAmqpObject1.<OnCreateAsync>d__6.MoveNext() at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw() at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at Microsoft.Azure.Amqp.Singleton
1.d__13.MoveNext()
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at Microsoft.Azure.Amqp.Singleton`1.d__13.MoveNext()
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at Microsoft.Azure.EventHubs.Amqp.AmqpEventDataSender.d__10.MoveNext()
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at Microsoft.Azure.EventHubs.Amqp.AmqpEventDataSender.d__10.MoveNext()
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
at Microsoft.Azure.EventHubs.EventHubClient.d__25.MoveNext()
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
at SlimMessageBus.Host.AzureEventHub.EventHubMessageBus.d__9.MoveNext()
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.GetResult()
at SlimMessageBusTest.Program.d__4.MoveNext() in
\SlimMessageBusTest\Program.cs:line 88
As you support Kafka already it would be great to get support for Apache Pulsar as well.
Cheers.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.