Giter Club home page Giter Club logo

cap's Introduction

CAP                     中文

Docs&Dashboard AppVeyor NuGet NuGet Preview Member project of .NET Core Community GitHub license

CAP is a library based on .Net standard, which is a solution to deal with distributed transactions, has the function of EventBus, it is lightweight, easy to use, and efficient.

In the process of building an SOA or MicroService system, we usually need to use the event to integrate each service. In the process, simple use of message queue does not guarantee reliability. CAP adopts local message table program integrated with the current database to solve exceptions that may occur in the process of the distributed system calling each other. It can ensure that the event messages are not lost in any case.

You can also use CAP as an EventBus. CAP provides a simpler way to implement event publishing and subscriptions. You do not need to inherit or implement any interface during subscription and sending process.

Architecture overview

cap.png

CAP implements the Outbox Pattern described in the eShop ebook.

Getting Started

NuGet

CAP can be installed in your project with the following command.

PM> Install-Package DotNetCore.CAP

CAP supports most popular message queue as transport, following packages are available to install:

PM> Install-Package DotNetCore.CAP.Kafka
PM> Install-Package DotNetCore.CAP.RabbitMQ
PM> Install-Package DotNetCore.CAP.AzureServiceBus
PM> Install-Package DotNetCore.CAP.AmazonSQS
PM> Install-Package DotNetCore.CAP.NATS
PM> Install-Package DotNetCore.CAP.RedisStreams
PM> Install-Package DotNetCore.CAP.Pulsar

CAP supports most popular database as event storage, following packages are available to install:

// select a database provider you are using, event log table will integrate into.

PM> Install-Package DotNetCore.CAP.SqlServer
PM> Install-Package DotNetCore.CAP.MySql
PM> Install-Package DotNetCore.CAP.PostgreSql
PM> Install-Package DotNetCore.CAP.MongoDB     //need MongoDB 4.0+ cluster

Configuration

First, you need to configure CAP in your Startup.cs:

public void ConfigureServices(IServiceCollection services)
{
    //......

    services.AddDbContext<AppDbContext>(); //Options, If you are using EF as the ORM
    services.AddSingleton<IMongoClient>(new MongoClient("")); //Options, If you are using MongoDB

    services.AddCap(x =>
    {
        // If you are using EF, you need to add the configuration:
        x.UseEntityFramework<AppDbContext>(); //Options, Notice: You don't need to config x.UseSqlServer(""") again! CAP can autodiscovery.

        // If you are using ADO.NET, choose to add configuration you needed:
        x.UseSqlServer("Your ConnectionStrings");
        x.UseMySql("Your ConnectionStrings");
        x.UsePostgreSql("Your ConnectionStrings");

        // If you are using MongoDB, you need to add the configuration:
        x.UseMongoDB("Your ConnectionStrings");  //MongoDB 4.0+ cluster

        // CAP support RabbitMQ,Kafka,AzureService as the MQ, choose to add configuration you needed:
        x.UseRabbitMQ("HostName");
        x.UseKafka("ConnectionString");
        x.UseAzureServiceBus("ConnectionString");
        x.UseAmazonSQS();
    });
}

Publish

Inject ICapPublisher in your Controller, then use the ICapPublisher to send messages.

The version 7.0+ supports publish delay messages.

public class PublishController : Controller
{
    private readonly ICapPublisher _capBus;

    public PublishController(ICapPublisher capPublisher)
    {
        _capBus = capPublisher;
    }

    [Route("~/adonet/transaction")]
    public IActionResult AdonetWithTransaction()
    {
        using (var connection = new MySqlConnection(ConnectionString))
        {
            using (var transaction = connection.BeginTransaction(_capBus, autoCommit: true))
            {
                //your business logic code

                _capBus.Publish("xxx.services.show.time", DateTime.Now);

                // Publish delay message
                _capBus.PublishDelayAsync(TimeSpan.FromSeconds(delaySeconds), "xxx.services.show.time", DateTime.Now);
            }
        }

        return Ok();
    }

    [Route("~/ef/transaction")]
    public IActionResult EntityFrameworkWithTransaction([FromServices]AppDbContext dbContext)
    {
        using (var trans = dbContext.Database.BeginTransaction(_capBus, autoCommit: true))
        {
            //your business logic code

            _capBus.Publish("xxx.services.show.time", DateTime.Now);
        }

        return Ok();
    }
}

Subscribe

In Controller Action

Add the Attribute [CapSubscribe()] on Action to subscribe to messages:

public class PublishController : Controller
{
    [CapSubscribe("xxx.services.show.time")]
    public void CheckReceivedMessage(DateTime datetime)
    {
        Console.WriteLine(datetime);
    }
}

In Business Logic Service

If your subscription method is not in the Controller, then your subscribe class needs to implement ICapSubscribe interface:

namespace BusinessCode.Service
{
    public interface ISubscriberService
    {
        void CheckReceivedMessage(DateTime datetime);
    }

    public class SubscriberService: ISubscriberService, ICapSubscribe
    {
        [CapSubscribe("xxx.services.show.time")]
        public void CheckReceivedMessage(DateTime datetime)
        {
        }
    }
}

Then register your class that implements ISubscriberService in Startup.cs

public void ConfigureServices(IServiceCollection services)
{
    services.AddTransient<ISubscriberService,SubscriberService>();

    services.AddCap(x=>
    {
        //...
    });
}

Async subscription

You are able to implement async subscription. Subscription's method should return Task and receive CancellationToken as parameter.

public class AsyncSubscriber : ICapSubscribe
{
    [CapSubscribe("name")]
    public async Task ProcessAsync(Message message, CancellationToken cancellationToken)
    {
        await SomeOperationAsync(message, cancellationToken);
    }
}

Use partials for topic subscriptions

To group topic subscriptions on class level you're able to define a subscription on a method as a partial. Subscriptions on the message queue will then be a combination of the topic defined on the class and the topic defined on the method. In the following example the Create(..) function will be invoked when receiving a message on customers.create

[CapSubscribe("customers")]
public class CustomersSubscriberService : ICapSubscribe
{
    [CapSubscribe("create", isPartial: true)]
    public void Create(Customer customer)
    {
    }
}

Subscribe Group

The concept of a subscription group is similar to that of a consumer group in Kafka. it is the same as the broadcast mode in the message queue, which is used to process the same message between multiple different microservice instances.

When CAP startups, it will use the current assembly name as the default group name, if multiple same group subscribers subscribe to the same topic name, there is only one subscriber that can receive the message. Conversely, if subscribers are in different groups, they will all receive messages.

In the same application, you can specify Group property to keep subscriptions in different subscribe groups:

[CapSubscribe("xxx.services.show.time", Group = "group1" )]
public void ShowTime1(DateTime datetime)
{
}

[CapSubscribe("xxx.services.show.time", Group = "group2")]
public void ShowTime2(DateTime datetime)
{
}

ShowTime1 and ShowTime2 will be called one after another because all received messages are processed linear. You can change that behaviour to set UseDispatchingPerGroup true.

BTW, You can specify the default group name in the configuration:

services.AddCap(x =>
{
    x.DefaultGroup = "default-group-name";  
});

Dashboard

CAP also provides dashboard pages, you can easily view messages that were sent and received. In addition, you can also view the message status in real time in the dashboard. Use the following command to install the Dashboard in your project.

PM> Install-Package DotNetCore.CAP.Dashboard

In the distributed environment, the dashboard built-in integrates Consul as a node discovery, while the realization of the gateway agent function, you can also easily view the node or other node data, It's like you are visiting local resources.

View Consul config docs

If your service is deployed in Kubernetes, please use our Kubernetes discovery package.

PM> Install-Package DotNetCore.CAP.Dashboard.K8s

View Kubernetes config docs

The dashboard default address is: http://localhost:xxx/cap , you can configure relative path /cap with x.UseDashboard(opt =>{ opt.MatchPath="/mycap"; }).

Contribute

One of the easiest ways to contribute is to participate in discussions and discuss issues. You can also contribute by submitting pull requests with code changes.

License

MIT

cap's People

Contributors

3ldar avatar alexinea avatar andriilab avatar blashbul avatar bschwehn avatar cocosip avatar cuibty avatar demorgi avatar difuteam avatar dima-zhemkov avatar flipdoubt avatar github-actions[bot] avatar greatkeke avatar hetaoos avatar jonekdahl avatar kiler398 avatar li-zheng-hao avatar lukazh avatar luox78 avatar mahmoudsamir101 avatar mtls-chpa avatar mviegas avatar mzorec avatar patheems avatar revazashvili avatar stratosblue avatar weihanli avatar wwwu avatar xiangxiren avatar yang-xiaodong avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

cap's Issues

CAP Processor triggers the Command Timeout error (MySql) when the consumer takes time to deal with it

重现方法:
1.CAP配置:

services.AddCap(x =>
            {
                x.QueueProcessorCount = 15;
                ……
            });

2.消费者代码:
```
[CapSubscribe("xxx.services.account.check")]
public async Task CheckReceivedMessage(Person person)
{
System.Threading.Thread.Sleep(5000);
return Task.CompletedTask;
}



异常信息:

[2017-12-08 10:05:37.0894] [] [WARN ] [DotNetCore.CAP.Processor.InfiniteRetryProcessor]:
Processor 'DotNetCore.CAP.Processor.DefaultDispatcher' failed. Retrying...
MySql.Data.MySqlClient.MySqlException (0x80004005): The Command Timeout expired before the operation completed. ---> MySql.Data.MySqlClient.MySqlException (0x80004005): The Command Timeout expired before the operation completed.
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredValueTaskAwaitable1.ConfiguredValueTaskAwaiter.GetResult() at MySqlConnector.Protocol.Serialization.BufferedByteReader.<ReadBytesAsync>d__2.MoveNext() in C:\projects\mysqlconnector\src\MySqlConnector\Protocol\Serialization\BufferedByteReader.cs:line 37 --- End of stack trace from previous location where exception was thrown --- at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw() at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at System.Threading.Tasks.ValueTask1.get_Result()
at MySqlConnector.Protocol.Serialization.ProtocolUtility.ReadPacketAsync(BufferedByteReader bufferedByteReader, IByteHandler byteHandler, Func1 getNextSequenceNumber, ProtocolErrorBehavior protocolErrorBehavior, IOBehavior ioBehavior) in C:\projects\mysqlconnector\src\MySqlConnector\Protocol\Serialization\ProtocolUtility.cs:line 405 at MySqlConnector.Protocol.Serialization.ProtocolUtility.DoReadPayloadAsync(BufferedByteReader bufferedByteReader, IByteHandler byteHandler, Func1 getNextSequenceNumber, ArraySegmentHolder1 previousPayloads, ProtocolErrorBehavior protocolErrorBehavior, IOBehavior ioBehavior) in C:\projects\mysqlconnector\src\MySqlConnector\Protocol\Serialization\ProtocolUtility.cs:line 459 at MySqlConnector.Protocol.Serialization.StandardPayloadHandler.ReadPayloadAsync(ArraySegmentHolder1 cache, ProtocolErrorBehavior protocolErrorBehavior, IOBehavior ioBehavior) in C:\projects\mysqlconnector\src\MySqlConnector\Protocol\Serialization\StandardPayloadHandler.cs:line 37
at MySqlConnector.Core.ServerSession.ReceiveReplyAsync(IOBehavior ioBehavior, CancellationToken cancellationToken) in C:\projects\mysqlconnector\src\MySqlConnector\Core\ServerSession.cs:line 557
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at MySqlConnector.Core.ServerSession.TryAsyncContinuation(Task1 task) in C:\projects\mysqlconnector\src\MySqlConnector\Core\ServerSession.cs:line 1014 at System.Threading.Tasks.ContinuationResultTaskFromResultTask2.InnerInvoke()
at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state)
at System.Threading.Tasks.Task.ExecuteWithThreadLocal(Task& currentTaskSlot)
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredValueTaskAwaitable1.ConfiguredValueTaskAwaiter.GetResult() at MySqlConnector.Core.ResultSet.<ReadResultSetHeaderAsync>d__1.MoveNext() in C:\projects\mysqlconnector\src\MySqlConnector\Core\ResultSet.cs:line 43 at MySql.Data.MySqlClient.MySqlDataReader.ActivateResultSet(ResultSet resultSet) in C:\projects\mysqlconnector\src\MySqlConnector\MySql.Data.MySqlClient\MySqlDataReader.cs:line 92 at MySql.Data.MySqlClient.MySqlDataReader.<ReadFirstResultSetAsync>d__65.MoveNext() in C:\projects\mysqlconnector\src\MySqlConnector\MySql.Data.MySqlClient\MySqlDataReader.cs:line 297 --- End of stack trace from previous location where exception was thrown --- at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw() at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at MySql.Data.MySqlClient.MySqlDataReader.<CreateAsync>d__64.MoveNext() in C:\projects\mysqlconnector\src\MySqlConnector\MySql.Data.MySqlClient\MySqlDataReader.cs:line 287 --- End of stack trace from previous location where exception was thrown --- at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw() at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at MySqlConnector.Core.TextCommandExecutor.<ExecuteReaderAsync>d__3.MoveNext() in C:\projects\mysqlconnector\src\MySqlConnector\Core\TextCommandExecutor.cs:line 73 --- End of stack trace from previous location where exception was thrown --- at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw() at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at MySql.Data.MySqlClient.MySqlCommand.ExecuteDbDataReader(CommandBehavior behavior) in C:\projects\mysqlconnector\src\MySqlConnector\MySql.Data.MySqlClient\MySqlCommand.cs:line 168 at System.Data.Common.DbCommand.System.Data.IDbCommand.ExecuteReader(CommandBehavior behavior) at Dapper.SqlMapper.ExecuteReaderWithFlagsFallback(IDbCommand cmd, Boolean wasClosed, CommandBehavior behavior) at Dapper.SqlMapper.ExecuteReaderImpl(IDbConnection cnn, CommandDefinition& command, CommandBehavior commandBehavior, IDbCommand& cmd) at Dapper.SqlMapper.ExecuteReader(IDbConnection cnn, String sql, Object param, IDbTransaction transaction, Nullable1 commandTimeout, Nullable1 commandType) at DotNetCore.CAP.MySql.MySqlStorageConnection.<FetchNextMessageCoreAsync>d__19.MoveNext() in D:\Work\aimei_finance\trunk\code\aimei_finance\CAP\DotNetCore.CAP.MySql\MySqlStorageConnection.cs:line 179 --- End of stack trace from previous location where exception was thrown --- at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw() at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at System.Runtime.CompilerServices.TaskAwaiter1.GetResult()
at DotNetCore.CAP.Processor.DefaultDispatcher.d__11.MoveNext() in D:\Work\aimei_finance\trunk\code\aimei_finance\CAP\DotNetCore.CAP\Processor\IDispatcher.Default.cs:line 71
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.TaskAwaiter`1.GetResult()
at DotNetCore.CAP.Processor.DefaultDispatcher.d__9.MoveNext() in D:\Work\aimei_finance\trunk\code\aimei_finance\CAP\DotNetCore.CAP\Processor\IDispatcher.Default.cs:line 40
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at DotNetCore.CAP.Processor.InfiniteRetryProcessor.d__3.MoveNext() in D:\Work\aimei_finance\trunk\code\aimei_finance\CAP\DotNetCore.CAP\Processor\IProcessor.InfiniteRetry.cs:line 25

MySqlStorage.InitializeAsync syntax error occurred when using Mysql 5.5.44-0ubuntu0.12.04.1

CREATE TABLE IF NOT EXISTS `cap.published` (
  `Id` int(127) NOT NULL AUTO_INCREMENT,
  `Name` varchar(200) NOT NULL,
  `Content` longtext,
  `Retries` int(11) DEFAULT NULL,
  `Added` datetime(6) NOT NULL,
  `ExpiresAt` datetime(6) DEFAULT NULL,
  `StatusName` varchar(40) NOT NULL,
  PRIMARY KEY (`Id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
[Err] 1064 - You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near '(6) NOT NULL,
  `ExpiresAt` datetime(6) DEFAULT NULL,
  `StatusName` varchar(4' at line 6

replaces datetime(6) with datetime, then working fine.

CapSubscribe method with DateTime parameter will error occur at model binding

1.when publish a DateTime type
[Route("~/publish")]
public IActionResult PublishMessage()
{
_capBus.Publish("sample.rabbitmq.mysql", DateTime.Now);

        return Ok();
    }

2.will error occur at model binding on CapSubscribe method
[NonAction]
[CapSubscribe("sample.rabbitmq.mysql")]
public void ReceiveMessage(DateTime time)
{
Console.WriteLine("[sample.rabbitmq.mysql] message received: " + DateTime.Now.ToString() + " , sent time: " + time.ToString());
}

3.debug SimpleTypeModelBinder found that:
public Task BindModelAsync(string content), value of content is ""2017-10-27T15:31:11.2759131+08:00"",

cause the following conversion date type failed:
model = _typeConverter.ConvertFrom(
null,
CultureInfo.CurrentCulture,
content);

I add following code before conversion and then work fine:
if (parameterType == typeof(DateTime))
{
content = content.Replace(""", string.Empty);
}

Message priority

系统中经常会有2种消息类型存在,一种是控制消息,一种数据消息。
一般控制消息的到达优先级会比数据消息高。
按目前cap的实现,数据和控制消息会被同等对待。
是否可以考虑加入消息优先级的支持?

Question about project without any subscribers

我新建了两个项目:
ProjectA:消息生产者项目(只发布消息,没有任务订阅方法),
ProjectB:消息消费者项目(只有订阅方法),
都引用了CAP。

现在有个问题:
当ProjectA发布完消息后,CAP发现ProjectA没有订阅这个消息的方法就会报异常,但是我的订阅方法都写在ProjectB里了,为什么要在ProjectA一定有订阅的方法呢?或者有什么解决方案或建议吗?

[2017-11-24 15:48:37.7795] [] [WARN ] [DotNetCore.CAP.SubscribeQueueExecutor]: 
Job failed to execute. Will retry.
DotNetCore.CAP.Internal.SubscriberNotFoundException: Topic:finance.mac_account.detail, can not be found subscriber method.
   at DotNetCore.CAP.Internal.DefaultSubscriberExecutor.<ExecuteAsync>d__7.MoveNext() in D:\Work\aimei_finance\trunk\code\aimei_finance\CAP\DotNetCore.CAP\Internal\ISubscriberExecutor.Default.cs:line 42

[2017-11-24 15:48:40.6096] [] [ERROR] [DotNetCore.CAP.Internal.DefaultSubscriberExecutor]: 
Consumer method 'Group:group1, Topic:finance.mac_account.detail' failed to execute.
DotNetCore.CAP.Internal.SubscriberNotFoundException: Topic:finance.mac_account.detail, can not be found subscriber method.
   at DotNetCore.CAP.Internal.DefaultSubscriberExecutor.<ExecuteAsync>d__7.MoveNext() in D:\Work\aimei_finance\trunk\code\aimei_finance\CAP\DotNetCore.CAP\Internal\ISubscriberExecutor.Default.cs:line 42

PS:
ProjectB是可以触发订阅的方法,但是查看CAP Dashboard发现 发出的 100 , 接收的 99,
在接收的里,有1个一直在执行中状态,一直重试也不行


Message Content
{
"Id": "5a17ccf8c47f7643a878d780",
"Timestamp": "2017-11-24T15:40:40.1612137+08:00",
"Content": "1",
"CallbackName": null,
-"ExceptionMessage": {
"Source": "DotNetCore.CAP",
"Message": "Topic:finance.mac_account.detail, can not be found subscriber method.",
"InnerMessage": null
}
}

Why I can't open solution with Visual Studio 2013 ?

C:\Users\98497\Desktop\CAP-dev_2.0\test\DotNetCore.CAP.Test\DotNetCore.CAP.Test.csproj : error : 项目的默认 XML 命名空间必须为 MSBuild XML 命名空间。如果项目是用 MSBuild 2003 格式创建的,请将 xmlns="http://schemas.microsoft.com/developer/msbuild/2003" 添加到 元素中。如果项目是用旧的 1.0 或 1.2 格式创建的,请将其转换为 MSBuild 2003 格式。 C:\Users\98497\Desktop\CAP-dev_2.0\test\DotNetCore.CAP.Test\DotNetCore.CAP.Test.csproj
这是打开错误的一条。。。。

An error is encountered to receive the message repeatedly

1、直接用 _capBus.Publish("sample.rabbitmq.mysql", DateTime.Now);发布了一条消息(数据库published表也只有一条),在订阅的地方设置了断点等了几十秒后才接受到消息,并且一直在接受这条消息(数据库received表中417条记录)。

此除延迟是什么原因?是不是只要一接受到消息就要删除这条消息就行了?
2、用了事务又发布了一条消息(published表等了几十秒状态是成功),但是(received表增加了8条记录)
为什么事务和问题1在received表增加的记录数量不一样?

3、published表中状态都成功了,received表中状态为:Enqueued。系统没有中断

asp.net core 2.0 not compatibility

when in asp.net core 2.0
in ConfigureServices AddCap -- UseEntityFramework, UseRabbitMQ
in Configure UseCap

runtime throw

Method not found: 'System.IServiceProvider Microsoft.Extensions.DependencyInjection.ServiceCollectionContainerBuilderExtensions.BuildServiceProvider(Microsoft.Extensions.DependencyInjection.IServiceCollection)'.

StackTrace:
at DotNetCore.CAP.SqlServerCapOptionsExtension.TempBuildService(IServiceCollection services)
at DotNetCore.CAP.SqlServerCapOptionsExtension.AddServices(IServiceCollection services)
at Microsoft.Extensions.DependencyInjection.ServiceCollectionExtensions.AddCap(IServiceCollection services, Action`1 setupAction)
at namespace.Startup.ConfigureServices(IServiceCollection services)

Why can't I run samples ?

startup.cs

public void ConfigureServices(IServiceCollection services)
    {
        services.AddDbContext<AppDbContext>();

        services.AddCap(x =>
        {
            x.UseEntityFramework<AppDbContext>();
            x.UseRabbitMQ("localhost:9092");
            x.UseSqlServer("Data Source=127.0.0.1;Initial Catalog=cap;User Id=sa;Password=123456;Connect Timeout=120;MultipleActiveResultSets=True");

        });

        services.AddMvc();
    }

controller

public ValuesController(ICapPublisher producer, AppDbContext dbContext)
{
_capBus = producer;
_dbContext = dbContext;
}

    // GET api/values
    [Route("Publish")]
    [HttpGet]
    public IEnumerable<string> Publish()
    {

        _capBus.Publish("Web", "");
      
        return new string[] { "Web", "value2" };
    }

AppDbContext

public class AppDbContext : DbContext
{
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
//optionsBuilder.UseSqlServer("Server=192.168.2.206;Initial Catalog=Test;User Id=cmswuliu;Password=h7xY81agBn*Veiu3;MultipleActiveResultSets=True");
optionsBuilder.UseSqlServer("Server=127.0.0.1;Initial Catalog=cap;User Id=sa;Password=123456;MultipleActiveResultSets=True");
}
}
执行的时候,_dbContext 是null啊。
public ValuesController(ICapPublisher producer, AppDbContext dbContext)
{
_capBus = producer;
_dbContext = dbContext;
} 这个是不是缺东西啊,还是有注入的地方要写啊。 我是用core2新建了网站测试的,代码扒实例的代码, 数据库的表生成了,mq也没问题。

RabbitMQ : SubscriberNotFoundException

This situation is because the Queue is bound to the RoutingKey before using RabbitMQ because the Queue is persistent, so the previously bound RoutingKey will not be lost.When you reduce the subscription to RoutingKey, you actually have the same RoutingKey that was bound before the RabbitMQ end, so you get messages from previous subscriptions.

这个情况是由于在使用RabbitMQ的时候以前 Queue上绑定过 Routing Key,因为 Queue是持久化的,所以之前绑定的RoutingKey不会丢失。当你减少RoutingKey的订阅以后,实际上在RabbitMQ端以前绑定的RoutingKey还存在,所以会收到以前的订阅的消息。

You can see the current Queue binding's RoutingKey in the admin page.

你可以在管理页面看到当前的Queue绑定的RoutingKey有哪些。

image

Solution :
解决方案:

I tried to remove all the binding of RoutingKey by modifying the code and then rebinding, but RabbitMQ didn't provide the relevant API.

我尝试通过修改代码的方式来删除所有绑定的RoutingKey然后重新绑定,但是RabbitMQ没有提供相关API。

Another way to do this is to unbind a specific RoutingKey, but I can't get access to the already binding RoutingKey, which is only available through the client tool, so it's not feasible.

另外一种方式解绑某个具体的RoutingKey,但是我无法获取到已经绑定的RoutingKey有哪些,这个只能够通过客户端工具来获取,所以此种方案不可行。

After careful consideration, I don't think it should be done by CAP program, if there is a scene.Project A and Project B is subscribed to the same Group, A Group of Topic, corresponding to the RabbitMQ are they listening to the same queue, when ProjectA reduce the subscribed to the Topic, for some reason so if unbundling RoutingKey causes ProejctB cannot receive related to subscribe to news, this is obviously wrong behavior.

通过仔细思考后,我认为不应该由CAP程序来做这个事情,假如有以下场景。Project A 和Project B都订阅了同一Group组的Topic,对应到RabbitMQ也就是他们监听的是同一个队列,当ProjectA由于某种原因减少Topic的订阅了,那么如果解绑了RoutingKey会导致ProejctB也无法收到相关订阅的消息,这显然是错误的行为。

So I decided to let the user to do this, and they said if you reduce the subscription topic, so you need to manually to the rabbitmq console the queue is removed (unbundling) old topic, and then the user must make clear the expected behavior of the message queue.

所以我决定让使用者来做这个事情,也是就说如果你减少了订阅的topic,那么你需要手动去rabbitmq的控制台的queue中删除(解绑)旧的topic,那么使用者就必须要清楚消息队列的预期行为。

In addition, this place where the program is wrong will be treated as a more elegant prompt, because it is also a warning to the user!.

另外,程序报错的这个地方我会处理为一种更加优雅的提示方式,因为这对使用者来说也是一个警告!。

PS: if Kafka is used, there are no problems.

PS: 如果使用 Kafka , 不存在以上问题。

Can not send message for string type

1、无法发布订阅字符串类型消息

        [Route("~/publish")]
        public IActionResult PublishMessage()
        {
            //_capBus.Publish("sample.kafka.sqlserver", "消息内容"); //不能直接发布string类型的消息 得搞个实体
            _capBus.Publish("sample.kafka.sqlserver", new MyMessage { Title = "your dd" });

            return Ok();
        }
 [NonAction]
        [CapSubscribe("sample.kafka.sqlserver")]
        //订阅消息,消息参数为string无法接收
        public void ReceiveMessage1(string data)
        {
            Console.WriteLine($"receive data{data}");
            Debug.WriteLine($"receive debug data{data}");
        }

2、先注释掉订阅的action,运行程序,然后发布一条消息a。重新启用订阅action,运行程序,消息a无法被消费

3、core 2.0 还无法使用,异常:

An error occurred while starting the application.
MissingMethodException: Method not found: 'System.IServiceProvider Microsoft.Extensions.DependencyInjection.ServiceCollectionContainerBuilderExtensions.BuildServiceProvider(Microsoft.Extensions.DependencyInjection.IServiceCollection)'.

DotNetCore.CAP.MySqlCapOptionsExtension.TempBuildService(IServiceCollection services)

    MissingMethodException: Method not found: 'System.IServiceProvider Microsoft.Extensions.DependencyInjection.ServiceCollectionContainerBuilderExtensions.BuildServiceProvider(Microsoft.Extensions.DependencyInjection.IServiceCollection)'.
        DotNetCore.CAP.MySqlCapOptionsExtension.TempBuildService(IServiceCollection services)
        DotNetCore.CAP.MySqlCapOptionsExtension.AddServices(IServiceCollection services)
        Microsoft.Extensions.DependencyInjection.ServiceCollectionExtensions.AddCap(IServiceCollection services, Action<CapOptions> setupAction)
        Core2Website.Startup.ConfigureServices(IServiceCollection services) in Startup.cs

                services.AddCap(c =>

System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
Microsoft.AspNetCore.Hosting.ConventionBasedStartup.ConfigureServices(IServiceCollection services)
Microsoft.AspNetCore.Hosting.Internal.WebHost.EnsureApplicationServices()
Microsoft.AspNetCore.Hosting.Internal.WebHost.BuildApplication()

Sending multiple messages within a request will cause an exception

RabbitMQ,MySql环境
在同一个请求内两次Publish消息,第二次发送错误。
是bug还是我写的有问题?

`
//发送消息

        await _eventPublisher.PublishAsync("msg1", new UserLoginEvent {UserId = entity.Id});
        try
        {
            await _eventPublisher.PublishAsync("backend_user_action_log", log);
        }
        catch (Exception e)
        {
            Console.WriteLine(e);
            throw;
        }

`

Exception

System.ObjectDisposedException: Cannot access a disposed object. Object name: 'MySqlConnection'. at MySql.Data.MySqlClient.MySqlConnection.VerifyNotDisposed() at MySql.Data.MySqlClient.MySqlConnection.<OpenAsync>d__14.MoveNext() --- End of stack trace from previous location where exception was thrown --- at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw() at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at Dapper.SqlMapper.<ExecuteImplAsync>d__29.MoveNext() --- End of stack trace from previous location where exception was thrown --- at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw() at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at DotNetCore.CAP.MySql.CapPublisher.<PublishWithTransAsync>d__24.MoveNext() --- End of stack trace from previous location where exception was thrown --- at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw() at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at DotNetCore.CAP.MySql.CapPublisher.<PublishCoreAsync>d__22.MoveNext() --- End of stack trace from previous location where exception was thrown --- at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw() at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at System.Runtime.CompilerServices.TaskAwaiter.GetResult()

send messages without persistent

一些低到达率要求的场景,是否可以设定跳过数据库存储消息这个过程,直接由kafka/rabbitmq将消息发送出去。

An excepiton occured when sent message with async

aspnetcore2.0+mysql
DotNetCore.CAP version: 2.1.0
DotNetCore.CAP.RabbitMQ version: 2.1.0
DotNetCore.CAP.MySql version: 2.1.0

Microsoft.EntityFrameworkCore.UnitOfWork version 2.0.2

项目引用其它nuget包:

<ItemGroup>
    <PackageReference Include="Autofac" Version="4.6.1" />
    <PackageReference Include="Autofac.Extensions.DependencyInjection" Version="4.2.0" />
    <PackageReference Include="Autofac.Extras.Quartz" Version="3.5.0-unstable0010" />
    <PackageReference Include="Microsoft.AspNetCore.All" Version="2.0.0" />
    <PackageReference Include="LibLog" Version="4.2.6" />
    <PackageReference Include="Microsoft.VisualStudio.Web.CodeGeneration.Design" Version="2.0.0" />
    <PackageReference Include="MySql.Data" Version="8.0.8-dmr" />
    <PackageReference Include="System.Text.Encoding.CodePages" Version="4.4.0" />
    <PackageReference Include="Swashbuckle.AspNetCore" Version="1.0.0" />
    <PackageReference Include="Quartz" Version="3.0.0-beta1" />
    <PackageReference Include="Quartz.Serialization.Json" Version="3.0.0-beta1" />
    <PackageReference Include="NLog.Extensions.Logging" Version="1.0.0-rtm-rc2" />
  </ItemGroup>

异常触发条件:

  • 定义了一个Quartz Job,每5秒执行一次,Quzrtz Host on aspnetcore2.0 website

  • 当使用PublishAsync发布MQ消息时,第1次触发Job时正常工作,从第2次触发Job开始,可能触发错误: There is already an open DataReader associated with this Connection which must be closed first.

详细错误信息:

ex
{MySql.Data.MySqlClient.MySqlException (0x80004005): There is already an open DataReader associated with this Connection which must be closed first.
   at MySql.Data.Serialization.MySqlSession.StartQuerying(MySqlCommand command)
   at MySql.Data.MySqlClient.CommandExecutors.TextCommandExecutor.<ExecuteReaderAsync>d__3.MoveNext()
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at MySql.Data.MySqlClient.CommandExecutors.TextCommandExecutor.<ExecuteNonQueryAsync>d__1.MoveNext()
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at MySql.Data.MySqlClient.MySqlTransaction.<CommitAsync>d__2.MoveNext()
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at MySql.Data.MySqlClient.MySqlTransaction.Commit()
   at DotNetCore.CAP.Abstractions.CapPublisherBase.ClosedCap() in D:\Work\aimei_finance\trunk\code\aimei_finance\CAP\DotNetCore.CAP\Abstractions\CapPublisherBase.cs:line 164
   at DotNetCore.CAP.Abstractions.CapPublisherBase.PublishWithTransAsync(String name, String content) in D:\Work\aimei_finance\trunk\code\aimei_finance\CAP\DotNetCore.CAP\Abstractions\CapPublisherBase.cs:line 137
   at DotNetCore.CAP.Abstractions.CapPublisherBase.PublishAsync[T](String name, T contentObj, String callbackName) in D:\Work\aimei_finance\trunk\code\aimei_finance\CAP\DotNetCore.CAP\Abstractions\CapPublisherBase.cs:line 36
   at AAM.Quartz.Domain.Service.Finance.AccountPeriodService.SendAccountPeriodMQMessageAsync() in D:\Work\aimei_finance\trunk\code\aimei_finance\Quartz\AAM.Quartz.Domain.Service\Finance\AccountPeriodService.cs:line 32
   at AAM.Quartz.Tasks.Finance.AccountPeriodJob.<Invoke>d__3.MoveNext() in D:\Work\aimei_finance\trunk\code\aimei_finance\Quartz\AAM.Quartz.Task\Finance\AccountPeriodJob.cs:line 25
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at AAM.Quartz.Tasks.JobBase.<Execute>d__2.MoveNext() in D:\Work\aimei_finance\trunk\code\aimei_finance\Quartz\AAM.Quartz.Task\JobBase.cs:line 32}
    Data: {System.Collections.ListDictionaryInternal}
    ErrorCode: -2147467259
    HResult: -2147467259
    HelpLink: null
    InnerException: null
    Message: "There is already an open DataReader associated with this Connection which must be closed first."
    Number: 0
    Source: "MySqlConnector"
    SqlState: null
    StackTrace: "   at MySql.Data.Serialization.MySqlSession.StartQuerying(MySqlCommand command)\r\n   at MySql.Data.MySqlClient.CommandExecutors.TextCommandExecutor.<ExecuteReaderAsync>d__3.MoveNext()\r\n   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()\r\n   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)\r\n   at MySql.Data.MySqlClient.CommandExecutors.TextCommandExecutor.<ExecuteNonQueryAsync>d__1.MoveNext()\r\n   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()\r\n   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)\r\n   at MySql.Data.MySqlClient.MySqlTransaction.<CommitAsync>d__2.MoveNext()\r\n   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()\r\n   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)\r\n   at MySql.Data.MySqlClient.MySqlTransaction.Commit()\r\n   at DotNetCore.CAP.Abstractions.CapPublisherBas
e.ClosedCap() in D:\\Work\\aimei_finance\\trunk\\code\\aimei_finance\\CAP\\DotNetCore.CAP\\Abstractions\\CapPublisherBase.cs:line 164\r\n   at DotNetCore.CAP.Abstractions.CapPublisherBase.PublishWithTransAsync(String name, String content) in D:\\Work\\aimei_finance\\trunk\\code\\aimei_finance\\CAP\\DotNetCore.CAP\\Abstractions\\CapPublisherBase.cs:line 137\r\n   at DotNetCore.CAP.Abstractions.CapPublisherBase.PublishAsync[T](String name, T contentObj, String callbackName) in D:\\Work\\aimei_finance\\trunk\\code\\aimei_finance\\CAP\\DotNetCore.CAP\\Abstractions\\CapPublisherBase.cs:line 36\r\n   at AAM.Quartz.Domain.Service.Finance.AccountPeriodService.SendAccountPeriodMQMessageAsync() in D:\\Work\\aimei_finance\\trunk\\code\\aimei_finance\\Quartz\\AAM.Quartz.Domain.Service\\Finance\\AccountPeriodService.cs:line 32\r\n   at AAM.Quartz.Tasks.Finance.AccountPeriodJob.<Invoke>d__3.MoveNext() in D:\\Work\\aimei_finance\\trunk\\code\\aimei_finance\\Quartz\\AAM.Quartz.Task\\Finance\\AccountPeriodJob.cs:line 25\r\n   
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()\r\n   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)\r\n   at AAM.Quartz.Tasks.JobBase.<Execute>d__2.MoveNext() in D:\\Work\\aimei_finance\\trunk\\code\\aimei_finance\\Quartz\\AAM.Quartz.Task\\JobBase.cs:line 32"
    TargetSite: {Void StartQuerying(MySql.Data.MySqlClient.MySqlCommand)}

1.ConfigureServices
//注入Db Context和UnitOfWork
```
services.AddMyDbContext(Configuration);

        services.AddCap(x =>
        {
            x.UseEntityFramework<aimeiFinanceContext>();
            x.UseRabbitMQ((m) => {
                m.HostName = "10.0.1.7";
                m.Port = 5672;
                m.UserName = "aimei";
                m.Password = "a123456";
            });
            x.UseDashboard();
        });

2.AddMyDbContext扩展方法为:

///


/// 配置要使用的Db Context
///

///
///
public static void AddMyDbContext(this IServiceCollection services, IConfigurationRoot config)
{
//EF Core For Mysql
//Core 2.0 支持连接池配置,但暂测试不成功 AddDbContextPool
services.AddDbContext(options => options.UseMySql(config.GetSection("ConnectionStrings:housekeeper").Value))
.AddUnitOfWork();

        services.AddDbContext<aimeiFinanceContext>(options => options.UseMySql(config.GetSection("ConnectionStrings:aimei_finance").Value))
               .AddUnitOfWork<aimeiFinanceContext>();
    }

3.Configure

//使用CAP
app.UseCap();


4.Quartz Job

///


/// 账务系统账期定时触发统计作业
///

public class AccountPeriodJob : JobBase
{
private readonly ILogger _logger;
private readonly IAccountPeriodService _accountPeriodService;

    public AccountPeriodJob(ILogger<AccountPeriodJob> logger, IAccountPeriodService accountPeriodService)
    {
        _logger = logger;
        _accountPeriodService = accountPeriodService;
    }

    protected override async Task Invoke(IJobExecutionContext context)
    {
        _logger.LogInformation("AccountPeriodJob Runing...");
        await _accountPeriodService.SendAccountPeriodMQMessageAsync();
        //return;
    }
}

5.服务类:

public interface IAccountPeriodService
{
Task SendAccountPeriodMQMessageAsync();
}

public class AccountPeriodService : IAccountPeriodService
{
    private readonly ILogger<AccountPeriodService> _logger;
    private readonly ICapPublisher _capBus;
    private readonly IUnitOfWork<aimeiFinanceContext> _financeunitOfWork;

    public AccountPeriodService(ILogger<AccountPeriodService> logger, ICapPublisher capPublisher, IUnitOfWork<aimeiFinanceContext> financeunitOfWork)
    {
        _logger = logger;
        _capBus = capPublisher;
        _financeunitOfWork = financeunitOfWork;
    }

    public Task<bool> SendAccountPeriodMQMessageAsync()
    {

   //异步发布时,会第二次触发时可能会抛出错误
_capBus.PublishAsync("sample.finance.mac.acc.detail", DateTime.Now);
//如果使用同步方法,一切正常
//_capBus.Publish("sample.finance.mac.acc.detail", DateTime.Now);

        return Task.FromResult(true);
    }

model bind for type like datetime guid always failed.

when you send message type that can simplely convert from string like guid, datetime, you always get a format exception that not valid value for this type. the problem is than you publish message always use json string , but you can't just use typeconvert convert json string to a object.

Concurrent problem with many micro-service instances

Message state should be update before executor invoking subscribe method on message subscriber microservice. A situation will happen when have many micro-service instances (eg. load balancing service start many same micro-services automatly and they are using same database instance) :
A micro-service instance is heading to update a message's status to Processing after this message has been changed and subscribe method should been invoke already by another micro-service instance, and subscribe method will be invoked again, but should be invoked just once and no more.
I don't ensure this will cause any bad things or not, but it should invoid by check message update execution's afftected rows count, and break the method in time.

About publish message

假如我的业务是如下的
using(new 事务)
{
await 异步a
await 异步b
其他的逻辑
commit
}

假如异步a执行成功了,异步b失败了,那么异步a产生的数据变动,会做何处理额。

IConsumerInvokerFactory lifetime cause unexpected subscription callback function exceptions

IConsumerInvokerFactory的生命周期为单例,导致消费者回调函数中注入的依赖对象全部都是单例,一旦销毁某个对象,将导致后续所有回调全部失败。

Sample:

public async Task<bool> SampleCallback()

{
    using(var connection = _dbContext.Database.GetDbConnection())
    {
        await connection.OpenAsync();
    }
    return true;
}

以上代码中,_dbContext 通过构造函数注入进来,并在函数执行离开using域时销毁数据库连接,导致第二次进入此函数时抛出异常。

Some questons about CAP develop version

@yuleyule66,
(1)In CAP develop version,The Sample Sample.Kafka.SqlServer is based on Kafka and SqlServer,but why
is " _capBus.Publish("sample.rabbitmq.mysql", "");” and "await _capBus.PublishAsync("sample.rabbitmq.mysql", "");" in valuesController .
(2)How subscribe happens?It means how the following code run?
[NonAction]
[CapSubscribe("sample.kafka.sqlserver", Group = "test")]
public void KafkaTest()
{
Console.WriteLine("[sample.kafka.sqlserver] message received");
Debug.WriteLine("[sample.kafka.sqlserver] message received");
}
Thank you!

Throwing an exception using multiple ICapSubscribe services

see CAP/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs

private static void AddSubscribeServices(IServiceCollection services)
{
var consumerListenerServices = new Dictionary<Type, Type>();
foreach (var rejectedServices in services)
{
if (rejectedServices.ImplementationType != null
&& typeof(ICapSubscribe).IsAssignableFrom(rejectedServices.ImplementationType))
// here throw exception
consumerListenerServices.Add(typeof(ICapSubscribe), rejectedServices.ImplementationType);
}

Cap Dashboard Js syntax issue

Hi:
Dashboard Js (src/DotNetCore.CAP/Dashboard/Content/js/cap.js),第374行多了个逗号,导致较低版本的浏览器Js报错,部分Dashboard 功能无法使用

image

去掉这个逗号可兼容低版本的浏览器也能正常使用

Custom successed message destruction time

历史消息根据文档,处理方式是默认1个小时,会做删除处理。这个接口能否放开,由用户自定义,比如可以做备份再删除的操作。

Exception of ArgumentOutOfRangeException and ObjectDisposedException

RabbitMQ&Mysql环境

当我publish一条消息后,如果没有在Controller添加任何CapSubscribe属性的action,则报以下错误
System.ArgumentOutOfRangeException: Index was out of range. Must be non-negative and less than the size of the collection.
Parameter name: index
at System.ThrowHelper.ThrowArgumentOutOfRange_IndexException()
at System.Collections.Generic.List1.get_Item(Int32 index) at DotNetCore.CAP.SubscibeQueueExecutor.<ExecuteSubscribeAsync>d__7.MoveNext()
然后写了CapSubscribe属性的action,则会报以下出错信息
System.ObjectDisposedException: Instances cannot be resolved and nested lifetimes cannot be created from this LifetimeScope as it has already been disposed. at Autofac.Core.Lifetime.LifetimeScope.CheckNotDisposed() at Autofac.Core.Lifetime.LifetimeScope.ResolveComponent(IComponentRegistration registration, IEnumerable1 parameters)
at Autofac.ResolutionExtensions.TryResolveService(IComponentContext context, Service service, IEnumerable1 parameters, Object& instance) at Autofac.ResolutionExtensions.ResolveOptionalService(IComponentContext context, Service service, IEnumerable1 parameters)
at Microsoft.Extensions.Internal.ActivatorUtilities.ConstructorMatcher.CreateInstance(IServiceProvider provider)
at Microsoft.Extensions.Internal.ActivatorUtilities.CreateInstance(IServiceProvider provider, Type instanceType, Object[] parameters)
at DotNetCore.CAP.Internal.DefaultConsumerInvoker.d__6.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at DotNetCore.CAP.SubscibeQueueExecutor.d__7.MoveNext()
`

How to use distributed transaction ?

我测试的代码如下,想测试事务一致性,订阅者手动抛出了一个异常,期望在这种情况下本地业务回进行事务回滚,但最终结果是本地业务(PersonInsert插入成功并未回滚)也许是我的理解或使用方式不对?请指点一下,谢谢!

 [Route("publish")]
        public IActionResult PublishMessage([FromServices]ICapPublisher _publisher)
        {
            var sqlOptions = (SqlServerOptions)HttpContext.RequestServices.GetService(typeof(SqlServerOptions));
            var cstr = sqlOptions.ConnectionString;
            using (var sqlconnection = new SqlConnection(cstr))
            {
                sqlconnection.Open();
                using (var sqlTrans = sqlconnection.BeginTransaction())
                {
                    var person = new Person { Name = "老张", Age = 50 };
                    PersonInsert(sqlTrans, person); // 我的业务代码(向数据库插入一条记录), 这里不抛出异常,数据库插入记录成功
                    _publisher.Publish("CapDemo", person, sqlTrans); //接收并打印消息,手动抛了个异常
                    sqlTrans.Commit();
                }

                //最终结果是 PersonInsert(本地业务插入成功,订阅者抛出异常,但本地业务没有回滚)
            }
            return Ok();
        }

        /// <summary>
        /// 订阅者业务
        /// </summary>
        /// <param name="person"></param>
        [NonAction]
        [CapSubscribe("CapDemo")]
        public void ReceiveMessage(Person person)
        {
            var cstr = "Data Source=192.168.0.250;Initial Catalog=CapDemo;User ID=sa;Password=123123;";

            using (var sqlconnection = new SqlConnection(cstr))
            {
                sqlconnection.Open();
                using (var trans = sqlconnection.BeginTransaction())
                {
                    try
                    {
                        sqlconnection.Execute($"insert into Person(Name, Age) values('{person.Name} + _subscribe', {person.Age})", null, trans);
                        Console.WriteLine("received message: {0}", person.ToString());
                        throw new Exception("订阅者抛出异常检查事务性");
                        trans.Commit();
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine(ex.Message);
                        trans.Rollback();
                    }
                }
            }
        }

        /// <summary>
        /// 本地业务逻辑
        /// </summary>
        /// <param name="trans"></param>
        /// <param name="person"></param>
        /// <returns></returns>
        [NonAction]
        private int PersonInsert(SqlTransaction trans, Person person)
        {
            var sqlconnection = trans.Connection;

            // check connection
            if (sqlconnection.State == System.Data.ConnectionState.Closed) sqlconnection.Open();

            var sql = $"insert into person (Name, Age) values ('{person.Name}', {person.Age})";
            int row_count = sqlconnection.Execute(sql: sql, transaction: trans);
            return row_count;
        }

how dapper user works?

        [Route("publish")]
        public async Task<IActionResult> PublishMessage()
        {
            const string cstr = "Data Source=192.168.0.250;Initial Catalog=CapDemo;User ID=sa;Password=123123;";

            var sql1 = new System.Data.SqlClient.SqlConnection(cstr);
            await _publisher.PublishAsync("CapDemo", new Person { Name = "老张", Age = 30 }, sql1);

            var sql2 = new System.Data.SqlClient.SqlConnection(cstr);
            await _publisher.PublishAsync("CapDemo.Service", new Person { Name = "老李", Age = 40 }, sql2);

            return Ok();
        }

should I create a instance of SqlConnection when publish a message ?

How to subscribe topic with config file ?

我的系统的各个业务系统是由各种语言开发的,不方便直接在各个系统中直接嵌入CAP库.
所以我考虑做一个类似代理服务的模块,subcriber使用同步方式去调用各个系统的api来完成消息通知.
为了在各个系统的消息需求增加或变化的时候,这个代理模块的代码保持相对稳定,希望能通过本地配置文件来注册topoic而不是用[CapSubscriber()]方式.
目前的思路是仿造ISubscriber接口方式增加一 个ICustomerSubscriber接口,然后把所有注册的topic消息用一个函数在处理,这个函数再根据消息的topic名去调用不同的业务系统api.
因为是第一次使用asp.net core,对自定义services不是很了解,可否请教从哪个类入手修改最合适.

Using Mysql 5.5.44-0ubuntu0.12.04.1 get error

CAP数据使用Mysql版本为5.5.44-0ubuntu0.12.04.1 时,我测试只发送1条MQ消息,但接收到33条一模一样的消息,详情请看以下载图:

1.发出的:
image

2.接收的:
image

Id都一样,内容也一样,接收到了33条消息,订阅方法触发了33次,好奇怪的现象,想请教一下大概是什么问题呢?

PS:当我数据库切换成Mysql 5.7后就没有这种问题了,是否也是mysql语法的问题导致?貌似是不断的重试机制造成的,cap.queue表里会出现重复的MessageId一样的记录在运行……

When the connection is given, CAP failed to insert message into Cap message table.

void Publish(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction = null);
void Publish<T>(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null);
Task PublishAsync(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction = null);
Task PublishAsync<T>(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null);

When I use such four APIs, it is failed to insert message into cap message table, for the table named Published does not yet exist.

请问有CAP学习群吗?

如题,因为小弟运行过程中有碰到一些问题,想在群里咨询各位大神,帮忙解决下,谢谢!

Sent async message in the loop causes an exception

Async publish message in controller or services with the loop :

        [Route("~/publish")]
        public async Task<IActionResult> PublishMessage()
        {
            for (int i = 0; i < 10; i++)
            {
                 await _capBus.PublishAsync("sample.rabbitmq.mysql", DateTime.Now);
            }
            return Ok();
        }

This type of sent can lead to an exception !

image

Kafka cluster can't published message

配置了3台kafka集群 x.UseKafka("x.x.x.x:9092,x.x.x.x:9093,x.x.x.x:9094"); 生产者和消费者都是一样的kafka配置 不同的程序 链接的数据库也不同 关闭其中一台kafka后 生产者有时候发送消息会失败 cap.published表里状态为 Processing 当启动刚刚关闭的Kafka后 消息能重新发送出去 但是 消费者接收不到..
1.生产者偶尔不能正常发送消息出去
2.重启服务后 消息发送成功后 不能监听到消息

Error in Asp.Net Core 2.0 when app.UseCap()

Because Asp.Net Core 2.0 Update the Program.cs code "WebHost.CreateDefaultBuilder(args)",when Startup.cs call app.UseCap() will cause an error:Cannot resolve scoped service 'xxxx.xxxDbContext' from root provider.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.