Giter Club home page Giter Club logo

nservicebus.callbacks's Introduction

NServiceBus.Callbacks

NServiceBus.Callbacks is an extension for NServiceBus to support defining callbacks on send operations. Callbacks can be used to map a response message to a stateful resource (e.g. an HTTP Request) without needing to use a message handler.

It is part of the Particular Service Platform, which includes NServiceBus and tools to build, monitor, and debug distributed systems.

See the Client-side callbacks documentation for more details on how to use it.

Running tests locally

To test callbacks, install the testing package via NuGet:

Install-Package NServiceBus.Callbacks.Testing

nservicebus.callbacks's People

Contributors

abparticular avatar adamralph avatar aleksandr-samila avatar andreasohlund avatar awright18 avatar bording avatar danielmarbach avatar davidboike avatar dbelcham avatar dependabot-preview[bot] avatar dependabot[bot] avatar dvdstelt avatar helenktsai avatar heskandari avatar internalautomation[bot] avatar jpalac avatar kbaley avatar kentdr avatar mauroservienti avatar mikeminutillo avatar particularbot avatar ramonsmits avatar seanfarmar avatar seanfeldman avatar simoncropp avatar soujay avatar szymonpobiega avatar timbussmann avatar tmasternak avatar williambza avatar

Stargazers

 avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

nservicebus.callbacks's Issues

TaskCompletionSourceAdapter.TrySetCanceled() Throws NullReferenceException

I am using Callbacks on an IIS hosted WebApi.

Under some circumstances, the CancellationToken callback on UpdateRequestResponseCorrelationTableBehavior is throwing a NullReferenceException when attempting to cancel the adapter.

As this exception occurs on a Task outside the scope of a HTTP request, the entire AppPool crashes.

The only clue I can provide is that I link the WebApi CancellationToken with a timeout token and pass that to IMessageSession.Request

Sending request with IMessageHandlerContext?

Is there any breaking reason why currently we can't send a request while handling a message?
I was just upgrading my app and there are a couple of places that I've been sending requests inside a message handler using IBus and am wondering if maybe you guys forgot or if there is some reason I shouldn't do that.

Both requesting endpoint AND responding endpoint need to implement NServiceBus.Callbacks

On 5/18/2016 @bording wrote:

Since both the requesting and replying endpoints need the package installed ...

Endpoint requires instance discriminator even when only replying to a callback

I've read through the documentation (listed on Add usage examples / documentation to Readme.MD) and I haven't seen this requirement listed anywhere.

Is this statement accurate? I was hoping to use this library to upgrade some integration testing code I've written to NSB 6. On NSB 5 I wrote a simple library that I could use from an integration test I could use to Send a message to a handler (running on my actual host) and wait for a Reply:

public class NServiceBusNUnitTestHarnessHost{
public IBus Bus {get;} // initialized in constructor (code removed for brevity)

public void SendMessageAndWaitForResult<TRequest, TResponse>(
        string queue, TRequest message, Action<TResponse> continuation, int timeoutInMilliseconds = 60 * 1000)
        where TRequest : IMessage
        where TResponse : class, IMessage
    {
        var task =
            Task.Run(
                async () =>
                {
                    var cb = Bus.Send(queue, message);

                    var replyMessage =
                        await cb.Register(cr =>
                        {
                            if (null == cr.Messages)
                                return null;

                            return
                                (cr.Messages ?? new object[0])
                                    .OfType<TResponse>()
                                    .FirstOrDefault();
                        });

                    if (null != replyMessage)
                        continuation(replyMessage);
                });

        if (!Task.WaitAll(new[] { task }, timeoutInMilliseconds))
            throw new Exception(
                $"Did not receive expected response [{typeof(TResponse).Name}] in the given timeout [{timeoutInMilliseconds / 1000} seconds]");
    }

}
then from my test, I could write something like

[Test}]
public void ExampleTest()
{
      Result expectedResult;
      new NServiceBusNUnitTestHarnessHost().SendMessageAndWaitForResult<Request, Result>(
              "example-queue",
              new Request(),
              r => expectedResult = r);

     // Asserts on expectedResult
}

This solution did not require any modification to the handler listening on "example-queue".

Question: Is this library not meant for this use case? Or will I now need to modify the project containing my handlers to support integration testing?

I had hoped to re-write my Test as:

[Test}]
public async Task ExampleTest()
{
      IEndpointInstance endpoint;  // initialization code removed for brevity

      var expectedResult = 
            await endpoint.Request<Result>(
                    new Request(),
                    "example-queue");

     // Asserts on expectedResult
}

cant Request in callbacks v2 and reply in core v4

Doing a Request in callbacks v2 and a reply in core v4 results in

System.InvalidOperationException: No handlers could be found for message type: ObjectResponseMessage
   at NServiceBus.LoadHandlersConnector.&lt;Invoke&gt;d__1.MoveNext() in NServiceBus.Core\Pipeline\Incoming\LoadHandlersConnector.cs:line 31
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at NServiceBus.DeserializeLogicalMessagesConnector.&lt;Invoke&gt;d__1.MoveNext() in NServiceBus.Core\Pipeline\Incoming\DeserializeLogicalMessagesConnector.cs:line 33
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at NServiceBus.SubscriptionReceiverBehavior.&lt;Invoke&gt;d__1.MoveNext() in NServiceBus.Core\Routing\MessageDrivenSubscriptions\SubscriptionReceiverBehavior.cs:line 29
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at NServiceBus.ReceivePerformanceDiagnosticsBehavior.&lt;Invoke&gt;d__2.MoveNext() in NServiceBus.Core\Performance\Statistics\ReceivePerformanceDiagnosticsBehavior.cs:line 40
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at NServiceBus.ProcessingStatisticsBehavior.&lt;Invoke&gt;d__0.MoveNext() in NServiceBus.Core\Performance\Statistics\ProcessingStatisticsBehavior.cs:line 27
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at NServiceBus.TransportReceiveToPhysicalMessageProcessingConnector.&lt;Invoke&gt;d__1.MoveNext() in NServiceBus.Core\Pipeline\Incoming\TransportReceiveToPhysicalMessageProcessingConnector.cs:line 37
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at NServiceBus.MainPipelineExecutor.&lt;Invoke&gt;d__1.MoveNext() in NServiceBus.Core\Pipeline\MainPipelineExecutor.cs:line 32
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at NServiceBus.ReceiveStrategy.&lt;TryProcessMessage&gt;d__7.MoveNext() in NServiceBus.Core\Transports\Msmq\ReceiveStrategy.cs:line 109
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at NServiceBus.TransactionScopeStrategy.&lt;ProcessMessage&gt;d__2.MoveNext() in NServiceBus.Core\Transports\Msmq\TransactionScopeStrategy.cs:line 86</Value>
  </HeaderInfo>

Adding the handler for ObjectResponseMessage results in the callback never being execited

Repro: https://github.com/SimonCropp/scratch/tree/master/callbacks-v6-v4

Click on SendObjectMessage in the web project.

Google groups issue: https://groups.google.com/forum/#!topic/particularsoftware/551q9DNTBCQ

Note support for 4.7.x ended in 2016-09-29 https://docs.particular.net/nservicebus/upgrades/supported-versions#nservicebus

TaskCompletionSourceAdapter.TrySetCanceled throws NullReferenceException when .NET 4.6 or higher is not installed

Who's affected

All customers without .NET 4.6 or higher installed under the circumstances when the cancellation token is cancelled.

Symptoms

On machines without .NET 4.6 or higher

        public void TrySetCanceled()
        {
            var method = taskCompletionSource.GetType().GetMethod("TrySetCanceled", TrySetCancelledArgumentTypes);
            method.Invoke(taskCompletionSource, TrySetCancelledArguments);
        }

        static Type[] TrySetCancelledArgumentTypes = {
            typeof(CancellationToken)
        };

        static object[] TrySetCancelledArguments =
        {
            CancellationToken.None
        };

the GetMethod call will return null. A NullReferenceException will eventually tear down the application domain

Steps to reproduce

  • Machine without 4.6 or higher installed
  • Call TaskCompletionSourceAdapter.TrySetCanceled
    --> Throws NullReferenceException

Disable Callback support

Is there a way to disable callback support?

I have need (in a testing library) to reference NServiceBus.Callbacks but I don't want to load it into every endpoint (specifically because of #24).

CallbackSupport is private (https://github.com/Particular/NServiceBus.Callbacks/blob/develop/src/NServiceBus.Callbacks/CallbackSupport.cs#L5), so config.DisableFeature<CallbackSupport>() doesn't work.

Is there an alternative recommended way to disable callback support?

I can get it to work via config.ExcludeAssemblies("NServiceBus.Callbacks.dll") but this seems pretty hacky.

Request and response between callbacks and NServiceBus v4 throws an InvalidOperationException

Who's affected

  • Customers using Request and Response between callbacks and NServiceBus v4 with object messages

Symptoms

When an NServiceBus v6 sender/receiver issues a request/response with the callbacks to an NServiceBus v4 receiver which replies with an object message (bus.Reply(new ObjectResponseMessage())) an InvalidOperationException is thrown in the NServiceBus v6 receiver indicating that no handler could be found

System.InvalidOperationException: No handlers could be found for message type: ObjectResponseMessage

More details can be found #66 (comment)

Expected behavior

The callback registered should be triggered

Details

NServiceBus v4 issues a reply with MessageIntentEnum.Reply which then triggers the version check.

  1. v6 sends a message to 4.7.x
  2. 4.7.x replies with a message and seems to set MessageIntentEnum.Reply but what goes out is MessageIntentEnum.Send
    image
    image
  3. Because the message arrives with intent Send the following code trips over
    https://github.com/Particular/NServiceBus.Callbacks/blob/develop/src/NServiceBus.Callbacks/IncomingContextExtensions.cs#L34
    since with check for Min supported version to be 4.3 https://github.com/Particular/NServiceBus.Callbacks/blob/develop/src/NServiceBus.Callbacks/IncomingContextExtensions.cs#L49

Solution

When the version that replied is less or equal to 4.7 do not check the message intent

Request and response between callbacks and NServiceBus v4 throws an InvalidOperationException

Who's affected

  • Customers using Request and Response between callbacks and NServiceBus v4 with object messages

Symptoms

When an NServiceBus v6 sender/receiver issues a request/response with the callbacks to an NServiceBus v4 receiver which replies with an object message (bus.Reply(new ObjectResponseMessage())) an InvalidOperationException is thrown in the NServiceBus v6 receiver indicating that no handler could be found

System.InvalidOperationException: No handlers could be found for message type: ObjectResponseMessage

More details can be found #66 (comment)

Expected behavior

The callback registered should be triggered

Details

NServiceBus v4 issues a reply with MessageIntentEnum.Reply which then triggers the version check.

  1. v6 sends a message to 4.7.x
  2. 4.7.x replies with a message and seems to set MessageIntentEnum.Reply but what goes out is MessageIntentEnum.Send
    image
    image
  3. Because the message arrives with intent Send the following code trips over
    https://github.com/Particular/NServiceBus.Callbacks/blob/develop/src/NServiceBus.Callbacks/IncomingContextExtensions.cs#L34
    since with check for Min supported version to be 4.3 https://github.com/Particular/NServiceBus.Callbacks/blob/develop/src/NServiceBus.Callbacks/IncomingContextExtensions.cs#L49

Solution

When the version that replied is less or equal to 4.7 do not check the message intent

Not possible to access the message headers via a 'context' on a response

When using callbacks it is not possible to access header data via the context on a response. Meaning that if an endpoint does something like the following the receiver of the response is unable to process this:

Client

var response = await endpoint.Request<MyResponse>(new MyRequest())
// No access to header data

Server

Task Handle(MyRequest request, IMessageHandlerContext context)
{
    var replyOptions = new ReplyOptions();
    var replyOptions.SetHeader("MyHeaderKey", "All your base are belong to us!");
    return context.Reply(new MyResponse(), replyOptions);
}

For this to work we would require an overload like any of the following or even a different signature:

var responseEnvelope = await bus.RequestWithHeaders<MyResponse>(new MyRequest(), options)
var headerValue = responseEnvelope.Headers["MyHeaderKey"]
var message = responseEnvelope.Message;

-or-

await bus.Request<MyResponse>(new MyRequest(), options,  Func<MyResponse,CallbackResponseContext>)

Merge behaviors needed to support callbacks

I think we need to merge behaviors that are implementing callbacks. Currently we have 5. It's hard to track what is happening when and it looks that there are some dependencies between them.

Maybe we could implement the incoming and outgoing pipeline behaviors in a single class as well.

Callbacks use InsertAfterExists

Relates to Particular/NServiceBus#3582 (comment)
Moved from Particular/NServiceBus#4201

InsertAfterExists is used by the callbacks in the correlation table update behavior.

It uses context.MessageId. The intention of placing it after the outgoing mutators is to capture any updates on the message id. This seems to be a tricky one to fix because it requires us to reach consensus whether we still allow overriding the message id in mutators or in general.

Callback continuation can block pipeline execution

Bug description

Callbacks are invoked in a blocking manner from the pipeline. Depending on the user code after awaiting the callback response, this can delay or mess up the pipeline completeley.

E.g. in the following scenario, a callback is sent to a endpoint which also has a message handler for the response message. The handler invocation is always delayed until the next callback is invoked as the pipeline is blocked at the await in the next iteration of the while loop:

        while (true)
        {
            var k = Console.ReadKey();
            if (k.Key == ConsoleKey.Enter)
            {
                var response = await endpoint.Request<Response>(new Request() {Message = Guid.NewGuid().ToString("D")});
                Console.WriteLine($"Handling callback: {response.Message}");
                
                //workaround:
                //await Task.Yield();
            }
            [...]
        }

Workaround

As the sample shows, the pipeline is blocked until the next await. e.g. a Task.Yield can free the pipeline and allow the pipeline to continue to process the message.

Fixed Versions

This bug has also been fixed in version NServiceBus.Callbacks 1.1.3.

Provide an easy way to unit test code that uses callbacks

It is currently hard to unit test code that is using callbacks as it is required to create a custom mock:

Sample using NSubstitute, as is shared on our google group.

messageSession.When(x=> x.Send(msg, Arg.Any<SendOptions>())).Do(call =>
            {
                var opts = call.Arg<SendOptions>();

                //need to workaround some nsb internals here using reflection
                object responseStateObj;
                if (opts.GetExtensions().TryGet("NServiceBus.RequestResponseStateLookup+State", out responseStateObj))
                {
                    var taskAdapter = responseStateObj.GetType().GetField("TaskCompletionSource")?.GetValue(responseStateObj);
                    Assert.NotNull(taskAdapter, "TaskCompletionSource field missing from adapter");
                    var setResultMethod = taskAdapter.GetType().GetMethod("TrySetResult");
                    Assert.NotNull(setResultMethod, "TrySetResult not found on " + taskAdapter.GetType());
                    setResultMethod.Invoke(taskAdapter, new object[] {response});
                }
                else
                {
                    Assert.Fail("Can not find expected response state: NServiceBus.RequestResponseStateLookup+State");
                }
            });

Source: https://groups.google.com/forum/?utm_medium=email&utm_source=footer#!msg/particularsoftware/PNu6mKh9BUE/tMmPGFI1AAAJ

Documentation improvements: Concurrent requests and handling of messages without callback

I think the documentation https://docs.particular.net/nservicebus/messaging/callbacks should be improved. I figured these things out only by analyzing the implementation:

  • Concurrent requests are possible, because the correlation ID is used for dispatching the replies.
  • Messages are moved to the error queue if a callback is missing, e.g., because of a process crash.

Furthermore, this paragraph is confusing:

To handle responses from the processing endpoint, the sending endpoint must have it's own queue. Therefore, the sending endpoint cannot be configured as a SendOnly endpoint. Messages arriving in this queue are handled using a message handler, similar to that of the processing endpoint, as shown: [...]

Sounds as if you would have to setup a message handler for receiving the reply, but actually you only have to set up a message handler for sending the reply.

Callback when cancelled immediately or right before token registration hangs forever

Who's affected

All customers using callbacks with a cancellation token provided and if the cancellation token gets cancelled immediately or before the internal token registration.

Symptoms

The callback will not be cancelled.

Steps to reproduce

For immediate cancellation use:

    var cs = new CancellationTokenSource();
    cs.Cancel();
    var options = new SendOptions();
    await bus.Request<MyResponse>(new MyRequest(), options, cs.Token);

For before internal token registration use

    var cs = new CancellationTokenSource(TimeSpan.FromSeconds(1)); // or Milliseconds (exact time depending on transport and system used)
    var options = new SendOptions();
    await bus.Request<MyResponse>(new MyRequest(), options, cs.Token);

This one is hard to reproduce since it is a race.

Analysis

  • We need more early cancellation token exit points in the extension method Request and in the UpdateRequestResponseCorrelationTableBehavior
  • There is a race in UpdateRequestResponseCorrelationTableBehavior. A cancellation token which is cancelled accepts a registration but immediately fires the registration. In that case the lambda inside the registration tries to cancel the response state but it can't since the state lookup has not yet registered it.

Fix

  • Change order of lookup.RegisterStateand requestResponseState.Register
  • Add early exit with cancellationToken.ThrowIfCancellationRequestedbefore RegisterState, after Register and in the Request extension method.
  • Challenges: Timing. Simplest "way" is to hack RequestResponseStateLookupto simulate those situation in a test.

Allow to opt-out from uniquely addressable constraint

Raised in https://discuss.particular.net/t/how-do-i-configure-iis-for-use-with-callbacks/131

It makes sense by default to enforce in callbacks that an endpoint needs to be uniquely addressable. Since callbacks can be used with any transport. When a transport by default uses competing consumer a uniquely addressable queue is required for the replies.

For MSMQ though that restriction is unnecessary since the combination of queue@machine name is already a unique combination as long as the same endpoint is not run multiple times on the same machine.

TaskCompletionSourceAdapter.TrySetCanceled throws NullReferenceException when .NET 4.6 or higher is not installed

Who's affected

All customers without .NET 4.6 or higher installed under the circumstances when the cancellation token is cancelled.

Symptoms

On machines without .NET 4.6 or higher

        public void TrySetCanceled()
        {
            var method = taskCompletionSource.GetType().GetMethod("TrySetCanceled", TrySetCancelledArgumentTypes);
            method.Invoke(taskCompletionSource, TrySetCancelledArguments);
        }

        static Type[] TrySetCancelledArgumentTypes = {
            typeof(CancellationToken)
        };

        static object[] TrySetCancelledArguments =
        {
            CancellationToken.None
        };

the GetMethod call will return null. A NullReferenceException will eventually tear down the application domain

Steps to reproduce

  • Machine without 4.6 or higher installed
  • Call TaskCompletionSourceAdapter.TrySetCanceled
    --> Throws NullReferenceException

Error message when callback state is missing is misleading

When a callback response comes into an endpoint that has been restarted between the request being sent and the response returning, you end up with an "System.InvalidOperationException: No handlers could be found for message type:" error logged for the message.

It would be nice if there would be some way to make this a bit more obvious that this is occurring because the callback state is missing.

Since the lookup is based on correlation id, I know the problem is that there's no good way to distinguish between a callback response missing its state and a regular incoming message to the endpoint.

Maybe it would be as simple as modifying the behaviors that look up the state and logging a warning if the lookup fails? I suppose that might be too noisy if the endpoint makes requests and also received regular messages.

Endpoint requires instance discriminator even when only replying to a callback

Currently, adding the Callbacks package to a project requires you to set an instance discriminator (https://github.com/Particular/NServiceBus.Callbacks/blob/develop/src/NServiceBus.Callbacks/CallbackSupport.cs#L16).

Since both the requesting and replying endpoints need the package installed, this means that both endpoints must have an instance discriminator set, which includes the overhead of an extra queue/messagepump.

Only the requesting endpoint actually needs this extra queue, so could we move this check to somewhere that would only be checked if the endpoint is going to request a callback?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.