Comments (4)
It seems that this has been an issue on our end. Closing this for now. Thanks for your help.
from nservicebus.
@klemmchr do you have any code to see how you created the endpoint? Maybe even a project?
from nservicebus.
I've wrapped the endpoint creation in a custom builder for convenience like this
public class EventBusBuilder
{
internal const string EventBusConfigurationKey = "EventBus";
private readonly List<Action<IServiceCollection>> _components = new();
private readonly HostBuilderContext _host;
private readonly IConfigurationSection _persistenceSection;
private readonly IConfigurationSection _transportSection;
private string? _appId;
private bool _sendOnly;
private bool _enableInstallers;
private Type? _tenantContextType;
private Action<EndpointConfiguration>? _persistenceConfiguration;
private Action<EndpointConfiguration>? _transportConfiguration;
private List<RegisterStep> _registerSteps = new();
public IHostEnvironment Environment => _host.HostingEnvironment;
internal EventBusBuilder(HostBuilderContext context)
{
_host = context;
var eventBusSection = _host.Configuration.GetSection(EventBusConfigurationKey);
_persistenceSection = eventBusSection.GetSection("Persistence");
_transportSection = eventBusSection.GetSection("Transport");
}
public EventBusBuilder UseAppIdFromConfiguration(string configKey)
{
_appId = _host.Configuration.GetRequiredValue<string>(configKey);
if (_host.HostingEnvironment.IsDevelopment())
{
_appId = $"{_host.Configuration.GetDeveloperId()}-local-{_appId}";
}
return this;
}
public EventBusBuilder SendOnly()
{
_sendOnly = true;
return this;
}
public EventBusBuilder EnableInstallers()
{
_enableInstallers = true;
return this;
}
public EventBusBuilder UseTenantContext<T>()
where T : class, ITenantContext
{
_tenantContextType = typeof(T);
return this;
}
public EventBusBuilder AddInterceptor<T>(string description)
where T : EventBusInterceptor
{
var registerStep = new DefaultRegisterStep(
typeof(T).Name,
typeof(T),
description,
s => ActivatorUtilities.CreateInstance<T>(s)
);
_registerSteps.Add(registerStep);
return this;
}
internal EndpointConfiguration Build()
{
ValidateConfiguration();
var configuration = new EndpointConfiguration(_appId);
ConfigureScanner(configuration);
ConfigureSerialization(configuration);
foreach (var component in _components)
{
configuration.RegisterComponents(component);
}
configuration.RegisterComponents(
s => s.Add(new ServiceDescriptor(typeof(ITenantContext), _tenantContextType, ServiceLifetime.Singleton))
);
foreach (var registerStep in _registerSteps)
{
configuration.Pipeline.Register(registerStep);
}
_transportConfiguration(configuration);
_persistenceConfiguration(configuration);
configuration.EnableOutbox();
if (_sendOnly)
{
configuration.SendOnly();
}
if (_host.HostingEnvironment.IsDevelopment() || _enableInstallers)
{
configuration.EnableInstallers();
}
if (_host.HostingEnvironment.IsDevelopment())
{
ConfigureDevelopmentLicense(configuration);
}
return configuration;
}
private static void ConfigureSerialization(EndpointConfiguration configuration)
{
// TODO: switch to SystemJsonSerializer when NServiceBus >= 8.1.0
var jsonSerializerSettings = new JsonSerializerSettings
{
DateFormatHandling = DateFormatHandling.IsoDateFormat,
ContractResolver = new DefaultContractResolver { NamingStrategy = new CamelCaseNamingStrategy() }
};
jsonSerializerSettings.Converters.Add(new StringEnumConverter());
configuration.UseSerialization<NewtonsoftJsonSerializer>().Settings(jsonSerializerSettings);
}
private static void ConfigureScanner(EndpointConfiguration configuration)
{
var scanner = configuration.AssemblyScanner();
var nonProjectAssemblies = AppDomain
.CurrentDomain
.GetAssemblies()
.Where(x => x.GetName().Name is not null && !x.GetName().Name!.StartsWith("Namespace."))
.Select(x => Path.GetFileNameWithoutExtension(x.Location))
.ToArray();
scanner.ExcludeAssemblies(nonProjectAssemblies);
scanner.ExcludeTypes(typeof(IntegrationEvent));
}
[MemberNotNull(nameof(_appId))]
[MemberNotNull(nameof(_transportConfiguration))]
[MemberNotNull(nameof(_persistenceConfiguration))]
[MemberNotNull(nameof(_tenantContextType))]
private void ValidateConfiguration()
{
if (_appId.IsNullOrEmpty())
{
throw new InvalidOperationException("Missing app id");
}
if (_transportConfiguration is null)
{
throw new InvalidOperationException("No transport configured");
}
if (_persistenceConfiguration is null)
{
throw new InvalidOperationException("No persistence configured");
}
if (_tenantContextType is null)
{
throw new InvalidOperationException("No tenant context configured");
}
}
private void ConfigureDevelopmentLicense(EndpointConfiguration configuration)
{
var licenseText = typeof(EventBusBuilder)
.Assembly
.GetManifestResourceString($"{nameof(Namespace)}.License.xml");
configuration.License(licenseText);
}
#region Persistence
public EventBusBuilder UseCosmosPersistence()
{
var configSection = _persistenceSection.GetRequiredSection("Cosmos");
_components.Add(services =>
{
services
.AddTransient<IProvideCosmosClient, CosmosClientProvider>()
.AddTransient<ISessionOptionsProvider, CosmosSessionOptionsProvider>()
.AddAzureClients(c =>
{
c.AddClient<CosmosClient, CosmosClientOptions>(
(o, cred, _) =>
{
var endpoint = configSection.GetValue<string>("Endpoint");
var connectionString = configSection.GetValue<string>("ConnectionString");
if (!connectionString.IsNullOrEmpty())
{
return new CosmosClient(connectionString, o);
}
if (!endpoint.IsNullOrEmpty())
{
return new CosmosClient(endpoint, cred, o);
}
throw new InvalidOperationException(
"Cosmos persistence needs either connection string or endpoint to be configured"
);
}
)
.WithCredential(new DefaultAzureCredential())
.WithName(CosmosClientProvider.ClientName);
});
});
_persistenceConfiguration = c =>
{
const string pkPath = $"/{IntegrationEvent.TenantIdPropertyName}";
var database = configSection.GetValue("Database", "event-bus");
if (_host.HostingEnvironment.IsDevelopment())
{
database = $"{database}-local";
}
var persistence = c.UsePersistence<CosmosPersistence>().DatabaseName(database);
var container = configSection.GetValue("Container", _appId)!;
persistence.DefaultContainer(container, pkPath);
persistence.EnableTransactionalSession();
var transactionInformation = persistence.TransactionInformation();
transactionInformation.ExtractPartitionKeyFromMessages(new CosmosPartitionKeyExtractor());
};
return this;
}
#endregion
#region Transport
public EventBusBuilder UseServiceBusTransport()
{
_transportConfiguration = c =>
{
var configSection = _transportSection.GetRequiredSection("ServiceBus");
var connectionString = configSection.GetValue<string>("ConnectionString");
var fqdn = configSection.GetValue<string>("Fqdn");
var topicName = configSection.GetValue<string>("Topic", "event-bus");
var errorQueueName = configSection.GetValue<string>("ErrorQueue", "event-bus-error");
AzureServiceBusTransport transport;
if (!connectionString.IsNullOrEmpty())
{
transport = new AzureServiceBusTransport(connectionString);
}
else
{
if (fqdn.IsNullOrEmpty())
{
throw new InvalidOperationException("Service bus needs either connection string or fqdn");
}
transport = new AzureServiceBusTransport(fqdn, new DefaultAzureCredential());
}
if (_host.HostingEnvironment.IsDevelopment())
{
topicName = $"{_host.Configuration.GetDeveloperId()}-local-{topicName}";
errorQueueName = $"{_host.Configuration.GetDeveloperId()}-local-{errorQueueName}";
}
transport.EntityMaximumSize = 5;
transport.EnablePartitioning = true;
transport.Topology = TopicTopology.Single(topicName);
transport.TransportTransactionMode = TransportTransactionMode.ReceiveOnly;
c.SendFailedMessagesTo(errorQueueName);
c.UseTransport(transport);
};
return this;
}
#endregion
}
which is the provided as an extension method
public static class HostBuilderExtensions
{
public static IHostBuilder UseEventBus(this IHostBuilder hostBuilder, Action<EventBusBuilder> configureBuilder)
{
hostBuilder
.UseNServiceBus(c =>
{
var builder = new EventBusBuilder(c);
configureBuilder(builder);
return builder.Build();
})
.ConfigureServices(
(c, s) =>
{
var isEnabled = c.Configuration
.GetSection(EventBusBuilder.EventBusConfigurationKey)
.GetValue("IsEnabled", true);
if (isEnabled)
{
s.AddTransient<IIntegrationEventPublisher, DefaultIntegrationEventPublisher>()
.AddTransient<IIntegrationEventTransactionHandler, DefaultTransactionHandler>();
}
else
{
s.AddTransient<IIntegrationEventPublisher, DisabledIntegrationEventPublisher>()
.AddTransient<IIntegrationEventTransactionHandler, DisabledTransactionHandler>();
}
}
);
return hostBuilder;
}
}
and added in Program.cs
public static class Program
{
public static void Main(string[] args) => CreateHostBuilder(args).Build().Run();
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.UseEventBus(
c =>
c.UseAppIdFromConfiguration("AppId")
.UseTenantContext<EventBusTenantContext>()
.UseCosmosPersistence()
.UseServiceBusTransport()
.EnableInstallers()
)
// Rest of init code
}
I don't have a full project on hand that I could share. However, I suspect that autofac could be an issue here. But that's just a wild guess. My expectation would be that Autofac does not allow to inject null values into a constructor and rather throws an exception.
from nservicebus.
@klemmchr There are lots of branches in your supplied code so it would take considerable effort to review.
Can you provide the configuration permutation where you observe this?
from nservicebus.
Related Issues (20)
- Remove polyfil dependencies HOT 3
- For OpenTelemetry metrics, allow Func<string, string> to be specified to trim the enclosed message types tag HOT 6
- Saga and outbox persistence that uses Entity Framework
- Overriding the host name does not work consistently
- AssemblyScanner finds private classes HOT 1
- Obsolete NServiceBus.Logging in favor of Microsoft.Extensions.Logging
- DefineCriticalErrorAction access to IServiceProvider HOT 3
- Support for Azure Event Hubs (Kafka) HOT 1
- Support for Azure Event Grid (MQTT)
- Messages cannot be sent from Windows to Linux using the file share data bus HOT 1
- Providing an empty license string throws an exception HOT 3
- Creating an endpoint and not referencing the instance results in memory/resource leak HOT 1
- Add tests for scenarios without `InstallersEnabled`
- Allow user to define the scope of a trace HOT 1
- Dispatch binary messages in incoming message context via `ReadOnlyMemory<Char>` or `Stream`
- Register multiple serializers for outgoing messages
- Customizable envelope formatting (Integration, standardization, interop) HOT 1
- Make it easier to use the hosting builder from Microsoft.Extensions.Hosting
- `IMessageSerializer` is not registered in DI and cannot be injected when replacing `SerializeMessageConnector`
- OpenTelemetry Metrics: Emit an UpDownCounter (gauge) for active handlers HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from nservicebus.