Giter Club home page Giter Club logo

nservicebus.azurefunctions.storagequeues's Introduction

NServiceBus.AzureFunctions.StorageQueues

Process messages in AzureFunctions using the Azure Storage Queues trigger and the NServiceBus message pipeline.

This Particular Preview has been archived and is not maintained or supported by Particular Software.

Migration

Any users of the Preview package have two options available:

  1. Switch your Azure Functions to use Azure Service Bus instead of Azure Storage Queues. Azure Functions for Azure Service Bus is fully supported by Particular Software.
  2. Continue to use Azure Storage Queues transport, but use traditional cloud services hosting rather than Azure Functions. The Azure Storage Queue Transport is fully supported by Particular Software.

Basic usage

Endpoint configuration

static readonly IFunctionEndpoint endpoint = new FunctionEndpoint(executionContext =>
{
    var storageQueueTriggeredEndpointConfiguration = StorageQueueTriggeredEndpointConfiguration.FromAttributes();

    return storageQueueTriggeredEndpointConfiguration;
});

The endpoint is automatically configured with the endpoint name, the transport connection string, and the logger passed into the function using a static factory method provided by the ServiceBusTriggeredEndpointConfiguration.FromAttributes method.

Alternatively, the endpoint name can be passed in manually:

static readonly IFunctionEndpoint endpoint = new FunctionEndpoint(executionContext =>
{
    var storageQueueTriggeredEndpointConfiguration = new StorageQueueTriggeredEndpointConfiguration("ASQTriggerQueue");

    return storageQueueTriggeredEndpointConfiguration;
});

Azure Function definition

[FunctionName("ASQTriggerQueue")]
public static async Task Run(
    [QueueTrigger(queueName: "ASQTriggerQueue")]
    CloudQueueMessage message,
    ILogger logger,
    ExecutionContext executionContext)
{
    await endpoint.Process(message, executionContext, logger);
}

Dispatching outside a message handler

Messages can be dispatched outside a message handler in functions activated by queue- and non-queue-based triggers.

[FunctionName("HttpSender")]
public static async Task<IActionResult> Run(
    [HttpTrigger(AuthorizationLevel.Function, "get", "post", Route = null)] HttpRequest request, ExecutionContext executionContext, ILogger logger)
{
    logger.LogInformation("C# HTTP trigger function received a request.");

    var sendOptions = new SendOptions();
    sendOptions.SetDestination("DestinationEndpointName");

    await functionEndpoint.Send(new TriggerMessage(), sendOptions, executionContext, logger);

    return new OkObjectResult($"{nameof(TriggerMessage)} sent.");
}

Note: For statically defined endpoints, dispatching outside a message handler within a non-queue-triggered function will require a separate send-only endpoint.

private static readonly IFunctionEndpoint functionEndpoint = new FunctionEndpoint(executionContext =>
{
    var configuration = new StorageQueueTriggeredEndpointConfiguration("HttpSender");

    configuration.AdvancedConfiguration.SendOnly();

    return configuration;
});

IFunctionsHostBuilder usage

As an alternative to the configuration approach described in the previous section, an endpoint can also be configured with a static IFunctionEndpoint field using the IFunctionsHostBuilder API as described in Use dependency injection in .NET Azure Functions.

Endpoint configuration

NServiceBus can be registered and configured on the host builder using the UseNServiceBus extension method in the startup class:

class Startup : FunctionsStartup
{
    public override void Configure(IFunctionsHostBuilder builder)
    {
        builder.UseNServiceBus(() => new StorageQueueTriggeredEndpointConfiguration("MyFunctionsEndpoint"));
    }
}

Any services registered via the IFunctionsHostBuilder will be available to message handlers via dependency injection. The startup class needs to be declared via the FunctionStartup attribute: [assembly: FunctionsStartup(typeof(Startup))].

Azure Function definition

To access IFunctionEndpoint from the Azure Function trigger, inject the IFunctionEndpoint via constructor-injection into the containing class:

class MyFunction
{
    readonly IFunctionEndpoint endpoint;

    // inject the FunctionEndpoint via dependency injection:
    public MyFunction(IFunctionEndpoint endpoint)
    {
        this.endpoint = endpoint;
    }

    [FunctionName("MyFunctionsEndpoint")]
    public async Task Run(
        [QueueTrigger(queueName: "MyFunctionsEndpoint")]
        CloudQueueMessage message,
        ILogger logger,
        ExecutionContext executionContext)
    {
        await endpoint.Process(message, executionContext, logger);
    }
}

Dispatching outside a message handler

Triggering a message using HTTP function:

public class HttpSender
{
    readonly IFunctionEndpoint functionEndpoint;

    public HttpSender(IFunctionEndpoint functionEndpoint)
    {
        this.functionEndpoint = functionEndpoint;
    }

    [FunctionName("HttpSender")]
    public async Task<IActionResult> Run(
        [HttpTrigger(AuthorizationLevel.Function, "get", "post", Route = null)] HttpRequest request, ExecutionContext executionContext, ILogger logger)
    {
        logger.LogInformation("C# HTTP trigger function received a request.");

        var sendOptions = new SendOptions();
        sendOptions.RouteToThisEndpoint();

        await functionEndpoint.Send(new TriggerMessage(), sendOptions, executionContext, logger);

        return new OkObjectResult($"{nameof(TriggerMessage)} sent.");
    }
}

Configuration

License

The license is provided via the NSERVICEBUS_LICENSE environment variable, which is set via the Function settings in the Azure Portal. Use a local.settings.json file for local development. In Azure, specify a Function setting using the environment variable as the key.

include: license-file-local-setting-file

Custom diagnostics

NServiceBus startup diagnostics are disabled by default when using Azure Functions. Diagnostics can be written to the logs with the following snippet:

storageQueueTriggeredEndpointConfiguration.LogDiagnostics();

Persistence

The Azure Storage Queues transport requires a persistence for pub/sub and sagas to work.

var persistence = endpointConfiguration.AdvancedConfiguration.UsePersistence<AzureStoragePersistence>();
persistence.ConnectionString("<connection-string>");

Endpoints that do not have sagas and do not require pub/sub can omit persistence registration using the following transport option:

endpointConfiguration.Transport.DisablePublishing();

Error queue

For recoverability to move the continuously failing messages to the error queue rather than the Functions poison queue, the error queue must be created in advance and configured using the following API:

endpointConfiguration.AdvancedConfiguration.SendFailedMessagesTo("error");

Known constraints and limitations

When using Azure Functions with Azure Storage Queues, the following points must be taken into consideration:

  • Endpoints cannot create their own queues or other infrastructure using installers; the infrastructure required by the endpoint to run must be created in advance. For example:
    • Queues for commands
    • Subscription records in storage for events
  • The Configuration API exposes NServiceBus transport configuration options via the configuration.Transport method to allow customization; however, not all of the options will be applicable to execution within Azure Functions.
  • When using the default recoverability or specifying custom number of immediate retries, the number of delivery attempts specified on the underlying queue or Azure Functions host must be greater than the number of the immediate retries. The Azure Functions default is 5 (DequeueCount) for the Azure Storage Queues trigger.

Message polling

Polling for new messages is handled by the Azure Storage Queues trigger. Using the default configuration, the latency for new messages to be processed by the function might take up to 1 second (see the polling algorithm documentation for further details). The maximum polling interval can be adjusted via the maxPollingIntervall setting in the hosts.json file, see the official documentation for further details.

Features dependent upon delayed delivery

The delayed delivery feature of the Azure Storage Queues transport polls for the delayed messages information and must run continuously in the background. With the Azure Functions Consumption plan, this time is limited to the function execution duration with some additional non-deterministic cool off time. Past that time, delayed delivery will not work as expected until another message to process or the Function is kept "warm".

For features that require timely execution of the delayed delivery related features, use one of the following options:

  • Keep the Function warm in the Consumption plan
  • Use an App Service Plan for Functions hosting
  • Use a Premium plan

The following features are supported but are not guaranteed to execute timely on the Consumption plan:

The following features require an explicit opt-in:

var recoverability = endpointConfiguration.AdvancedConfiguration.Recoverability();
recoverability.Delayed(settings =>
{
    settings.NumberOfRetries(numberOfDelayedRetries);
    settings.TimeIncrease(timeIncreaseBetweenDelayedRetries);
});

Preparing the Azure Storage account

Queues must be provisioned manually.

Subscriptions to events are created when the endpoint executes at least once. To ensure the endpoint processes all the events, subscriptions should be created manually.

Running tests locally

Test projects included in the solution rely on two environment variable AzureWebJobsStorage used by Azure Functions SDK. In order to run the tests, the value needs to contain a real connection string.

nservicebus.azurefunctions.storagequeues's People

Contributors

adamralph avatar andreasohlund avatar awright18 avatar boblangley avatar danielmarbach avatar davidboike avatar dependabot[bot] avatar helenktsai avatar heskandari avatar jpalac avatar kbaley avatar kentdr avatar mauroservienti avatar mikeminutillo avatar particularbot avatar ramonsmits avatar seanfeldman avatar sergioc avatar soujay avatar timbussmann avatar williambza avatar

Watchers

 avatar  avatar  avatar

nservicebus.azurefunctions.storagequeues's Issues

NServiceBus.AzureFunctions.StorageQueues - Public preview release available

With NServiceBus.AzureFunctions.StorageQueues you can deploy your message handlers to the cloud as Azure Functions. The powerful NServiceBus programming model, combined with Azure Functions, makes it a breeze to build easy-to-deploy, scalable, pay-as-you-go distributed systems.

NServiceBus.AzureFunctions.StorageQueues allows Azure Functions to do the heavy lifting of deployment and runtime management, leaving you to focus on business functionality, while benefiting from all the usual NServiceBus features:

  • Full compatibility with NServiceBus endpoints
  • Immediate and delayed retries
  • Stateful message processing with NServiceBus Sagas
  • Auditing
  • Dependency injection support
  • Advanced pipeline extensibility
  • and more...

We've also released NServiceBus.AzureFunctions.ServiceBus, for using Azure Functions with Azure ServiceBus triggers.

Getting Started

NServiceBus.AzureFunctions.StorageQueues makes endpoint configuration super simple:

static readonly FunctionEndpoint endpoint = new FunctionEndpoint(_ => StorageQueueTriggeredEndpointConfiguration.FromAttributes());

[FunctionName("ASQTriggerQueue")]
public static async Task Run(
    [QueueTrigger(queueName: "ASQTriggerQueue")]
    CloudQueueMessage message,
    ILogger logger,
    ExecutionContext executionContext)
{
    await endpoint.Process(message, executionContext, logger);
}

With the configuration done, just add message handlers for any message type received by the function:

public class TriggerMessageHandler : IHandleMessages<TriggerMessage>
{
    private static readonly ILog Log = LogManager.GetLogger<TriggerMessageHandler>();

    public async Task Handle(TriggerMessage message, IMessageHandlerContext context)
    {
        Log.Info($"Handling {nameof(TriggerMessage)} in {nameof(TriggerMessageHandler)}");
        await context.SendLocal(new FollowupMessage());
    }
}

About the public preview

The NServiceBus.AzureFunctions.StorageQueues package is released as a public preview. Public previews are separately licensed, production-ready packages, aiming to react more quickly to customer's needs. See the support policy for previews for more information about our support commitment. Preview packages may transition to fully supported versions after the preview period.

User adoption is crucial during this product development phase and helps us decide whether to make NServiceBus.AzureFunctions.StorageQueues a permanent part of the Particular Platform. Please let us know if you are using this preview by emailing us at [email protected].

We'd also love to receive your feedback about the new NServiceBus.AzureFunctions.StorageQueues package via our support channels, the project repository, or our public previews discussion group.

Where to get it

You can install the preview from NuGet.

With thanks,
The team in Particular

Please read our release policy for more details. Follow @ParticularNews to be notified of new releases and bug fixes.

Consider using a custom IQueueProcessorFactory to move all messages to a central error queue

By overriding the queue processor with builder.Services.AddSingleton<IQueueProcessorFactory>(new MyQueueProcessor()); we can override the handling of poison messages to:

  • Write them to a central error queue instead, or perhaps straight to a blob
  • Access exception details and write them either to the message or to eg. blob storage to be able to present them to the users

Configure JSON as the default serializer

Currently, you have to define the serializer to use manually (see documentation https://docs.particular.net/samples/azure/functions/service-bus/). This is a mandatory boiler-plate code because the transports need a specific serializer configured. Almost always, users will end up with JSON as the serializer, requiring them to first install the NServiceBus.Newtonsoft.Json package and then configuring it.
Instead, we could reference the JSON package directly and make it a mandatory dependency. Since the referenced Azure packages are also referencing newtwon.json, it's not really adding that much "dependency noise", even if you would end up selecting the XML serializer. So should we take a dependency on NServiceBus.Newtonsoft.Json by default and configure the endpoint with it by default to reduce boilerplate code for the user?

An alternative approach would be to implement a custom Json serializer using the .NET Core 3.1 APIs. Because we're targeting 3.1 by default, there is no issue regarding older versions. I'm not sure what this would mean in regards to compatibility with other endpoints using Newtonsoft.Json, sending/receiving messages from the function though.

Delayed delivery does not work

Azure Storage Queues transport is polling internally to implement delayed delivery. With Azure Functions, an endpoint hosted in a Function is not guaranteed to execute all the time. This can cause an polling to stop when there are no active messages to trigger the Function and the Function is terminated. The Function will be up and running when the next message arrives, restarting polling and executing all the delayed messages. This can potentially lead to the messages executed much later than they were supposed to. This behaviour will impact the following features:

  • Saga timeouts
  • Delayed retries (recoverability)
  • Messages sent with an intentional delay using sendOptions.DelayDeliveryWith() or sendOptions.DoNotDeliverBefore()

Possible workarounds

  1. Keep the Function warm in Consumption plan
  2. Use App Service Plan for Functions hosting
  3. Use Premium plan

Further investigation shows it's not currently possible.

The default diagnostics writer causes function to fail

Source: NServiceBus.HostStartupDiagnostics.BuildDefaultDiagnosticsWriter
Exception: Exception while executing function: ASQTriggerQueue Access to the path 'D:\Program Files (x86)\SiteExtensions\Functions\3.0.13901\32bit.diagnostics' is denied.

NServiceBus does not collaborate properly with ServiceCollection provided by Azure Functions

Describe the problem

When trying to use NServiceBus for Azure Functions and register Entity Framework (EF) into the container, get the following exception:

Unable to resolve service for type 'Microsoft.Azure.WebJobs.Script.IEnvironment' while attempting to activate 
  'Microsoft.Azure.WebJobs.Script.Configuration.ScriptHostOptionsSetup'.

After raising the Azure/azure-functions-host#6783 issue with Azure Functions it became apparent that the way NServiceBus.AzureFunctions is leveraging the ServiceCollection is wrong for both the static and non-static usages.

When instantiating an NServiceBus.AzureFunctions FunctionEndpoint using the static approach, the ServiceCollection does not share any services/components with Azure Functions SDK. When instantiating an NServiceBus.AzureFunctions FunctionEndpoint using the non-static approach using DI, the ServiceCollection that we get in the Startup class to configure services is a subset of all the services. Regardless of the approach, the service provider built by NServiceBus will not contain all the dependencies.

Potential workaround

Not possible with the current implementation.

Potential solutions

The NServiceBus.AzureFunctions implementation needs to treat the Azure Functions hosted endpoint as an endpoint with an externally-managed container, using the service collection and service provider supplied by Azure Functions. This means using the DI style of Functions and not the static implementation, providing an extension method such as .UseNServiceBus(...). This extension method would be used to configure the FunctionEndpoint.

Additional information to help with this issue

I've created a repository with all of the findings

Summary

This issue prevents the adoption of NServiceBus.AzureFunctions when 3rd party dependencies fail to be resolved.

Use System.Text.Json as default serializer

Currently, the Azure Functions packages use the NServiceBus.Newtonsoft.Json package as a dependency and configure the Newtonsoft.Json serializer as the default serializer for function endpoints. This introduces an additional dependency on those JSON related packages which technically we might not need given that the Azure Functions packages target .NET Core 3.1 which has has a built-in JSON support (System.Text.Json).

Community projects like https://github.com/NServiceBusExtensions/NServiceBus.Json have already shown that NServiceBus can work with the built-in Json serializer instead of the Newtonsoft serializer.
It might be necessary to investigate potential compatibility issues given that existing NServiceBus endpoints might still use the Newtonsoft.Json packages instead.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.