Giter Club home page Giter Club logo

Comments (45)

alexeyzimarev avatar alexeyzimarev commented on June 17, 2024

Can you post the checkpoint query, as it's hard to understand what NR fields mean and the query is not fully visible.

from eventuous.

alexeyzimarev avatar alexeyzimarev commented on June 17, 2024

One more thing, could you post your bootstrap code? The subscription configuration part is essential, and where you tell the application to use the particular checkpoint store.

from eventuous.

alexeyzimarev avatar alexeyzimarev commented on June 17, 2024

And one more thing:

if only a single Save is missed the Checkpoint can never again be updated by the same running application

That's by design, as the expectation is that all the events are projected in the right order. If the projecting handler fails to execute an update, the read model will be in an unknown state. By default, the subscription is set to ignore these errors, and it should move the checkpoint regardless. Maybe you can point out if you got any logs from your projectors that indicate the failure.

from eventuous.

ReinderReinders avatar ReinderReinders commented on June 17, 2024

Hi, sorry for the late reaction, I have been ill this past week.

--> Can you post the checkpoint query, as it's hard to understand what NR fields mean and the query is not fully visible.

The Checkpoint part of the query is not that complicated, just a select, but sure here you go:

select 

(select POSITION from eventuous.Checkpoints) as CP,

(select count(*) from receiving_table) as 'NR TOTAL',

(select count(*) from receiving_table
where information_fields like '%OK%'
and information_fields not like '%NOK%') as 'NR OK',

(select count(*) from receiving_table
where information_fields like '%NOK%') as 'NR NOK';

(slightly redacted to hide business secrets)

The idea is that in Create and Update events I fill certain fields (it's a column containing JSON) with a string value containing "NOK". The subsequent update then overwrites the previous property with "OK" and adds a new "NOK" (except for the last update, that one does not add another NOK). This way, if any event is processed out of order, or not processed at all, the read model at the end has a NOK value in it. It's an easy and foolproof way of being able to ensure at the end that the event stream was fully processed, and in order.

--> One more thing, could you post your bootstrap code? The subscription configuration part is essential, and where you tell the application to use the particular checkpoint store.

Certainly. The registration for the Checkpoint store has been hidden behind two extension methods:

public void ConfigureServices(IServiceCollection services)
  {	
  	// (other registrations, redacted)

  	// DatabaseSettings contains settings for the receiving DB, i.e. the SQL Server database
      var databaseSettings = services.BuildServiceProvider().GetRequiredService<DatabaseSettings>();
  	
  	// EventSourcingSettings contains settings for the Event Store (i.e. Postgres) and configuration for the Subscriptions
  	var eventSourcingSettings = services.BuildServiceProvider().GetRequiredService<EventSourcingSettings>();
  	
      services.AddSqlServerCheckpointStore(databaseSettings.ConnectionString);

      if (eventSourcingSettings.InitializeCheckpointSchema)
      {
          services.InitializeSqlServerCheckpointSchema(databaseSettings.ConnectionString,
              eventSourcingSettings.SchemaName);
      }
  
  	// here goes the subscription registration, see below (last snippet)
  }

The extension methods mentioned above:

  public static IServiceCollection AddSqlServerCheckpointStore(this IServiceCollection services,
       string connectionString)
   {
       SqlConnection GetConnection() => new(connectionString);
       services.AddSingleton((GetSqlServerConnection)GetConnection);
       services.AddCheckpointStore(cfg => new SqlServerCheckpointStore(
           cfg.GetRequiredService<GetSqlServerConnection>(),
           Constants.Eventuous));

       return services;
   }

(just a bread-and-butter Checkpoint store registration as far as I can see. I see no options or alternative configuration possible there)

public static IServiceCollection InitializeSqlServerCheckpointSchema(this IServiceCollection services,
      string connectionString, string schemaName)
  {
      ILogger? logger = services.BuildServiceProvider().GetService<ILogger>();

      try
      {
          using var connection = new SqlConnection(connectionString);
          connection.Open();
          using var cmd = new SqlCommand($"SELECT 1 FROM sys.schemas WHERE name = '{schemaName}'", connection);
          var response = cmd.ExecuteScalar();

          if (response != null)
          {
              logger?.LogInformation(
                  "InitializeSqlServerCheckpointSchema: skipped creating Checkpoint schema, already exists.");
          }
          else
          {
              SqlConnection GetConn() => new(connectionString);
              var schema = new Schema(new SqlServerStoreOptions($"{schemaName}").Schema);
              schema.CreateSchema(GetConn).Wait();

              logger?.LogInformation(
                  "InitializeSqlServerCheckpointSchema created the Checkpoint schema.");
          }
      }
      catch (Exception ex)
      {
          // try-catch because the NSwag client generation tool fails on build
          logger?.LogError(ex, "InitializeSqlServerCheckpointSchema failed with error:");
      }

(this is an attempt of mine to make the receiving database Idempotent, i.e. create the Schema for Checkpoints if it doesn't yet exist. Should execute immediately and synchronously (.Wait() ) so I can't really see how that would break the subscription)

Subscriptions can be configured at appsettings level and are each registered in turn (in this testing scenario, I am only configuring 1 Subscription anyway):

  foreach (ExternalEntitySubscription subscription in eventSourcingSettings.ExternalEntitySubscriptions)
     {
         services.AddSubscription<PostgresAllStreamSubscription, PostgresAllStreamSubscriptionOptions>(
             subscription.SubscriptionName,
             builder => builder
                // .AddConsumeFilterFirst(new AsyncHandlingFilter(20))
                 .AddConsumeFilterLast(new MessageFilter(x =>
                 {
                     foreach (string filter in subscription.FiltersOnStreamName)
                     {
                         if (!x.Stream.ToString().Contains(filter))
                         {
                             return false;
                         }
                     }

                     return true;
                 }))
                 .AddEventHandler<MyCustomEventHandler>()
                 // Enables parallel processing. The default option uses the message stream name as partition key, see: https://eventuous.dev/docs/subscriptions/pipes/
                 .WithPartitioningByStream(subscription.PartitionsCount));
     }	

(slightly redacted, but the idea is clear. I have an Event Handler that extends the abstract Eventuous.Subscriptions.EventHandler base class. The AsyncHandlingFilter has been outcommented since it completely ruined the logic... I don't think it belonged here).

--> That's by design, as the expectation is that all the events are projected in the right order. If the projecting handler fails to execute an update, the read model will be in an unknown state. By default, the subscription is set to ignore these errors, and it should move the checkpoint regardless. Maybe you can point out if you got any logs from your projectors that indicate the failure.

I understand the design, though I had initially interpreted this somewhat differently. Namely the 'at-least once processing' guarantee mentioned in the documentation (if we can't guarantee that event 14 was processed, we can't write Checkpoint 14; if the application restarts, pick up from the last point we are certain was successful).

But this does not appear to be the case here. I can verify that the entire event stream is processed succesfully and in proper order (the OK/NOK test described above). In other words, the projecting handler definitely does not fail to execute an update. The ONLY thing I see failing is the Checkpoint write (I have been unable to catch it in logging yet, but am still attempting to). This also tells me that the event update and the Checkpoint update are not part of a transaction (but I expected that, the documentation never mentions that it should be).
I guess what I am missing is a retry or failover system in case a Checkpoint update fails to execute? Since it is not transactional?

from eventuous.

ReinderReinders avatar ReinderReinders commented on June 17, 2024

Update: attempting to capture logging has so far been unsuccessful (due to my cloud application apparently refusing to log when I demand too much of it... such as during a load test) but I have found out that I am exceeding the CPU capacities of my receiving DB server. That would imply that this is a database capacity issue, not an Eventuous issue. Will upgrade and retest.

from eventuous.

ReinderReinders avatar ReinderReinders commented on June 17, 2024

Update: I'm not sure if this adds anything to the issue as already reported, but I now have a slightly different deployment with a MicroService in the cloud (upped the Partitioning Count to 50), which loses its checkpoint early on (but later than previously seen) during a subscription. Glancing at my logging, I see certain Checkpoint commits being executed more than once (e.g. 66, 275, 413, 428) until this commit action just stops being called (after 428, no more commits... although by now 20.000 events have been handled):

image

There are no Errors in the logging. I have no idea why the checkpoint commit just suddenly stops working.

Edit: a correction, it is a Partitioning Count of 100, not 50.

Edit2: With a Partitioning of 1 the Checkpoint is not lost. My suspicion already was that the Partitioning has something to do with it, now I can reproduce it consistently.

from eventuous.

alexeyzimarev avatar alexeyzimarev commented on June 17, 2024

Commits don't happen when the commit handler cannot get a gapless sequence of checkpoints. Each event received by a subscription gets its own monotonically increasing sequence number. When events are partitioned, the sequence gets broken inside each partition. When the event is successfully handled or ignored, the event position and sequence are passed to the checkpoint commit handler. It then tries to re-sequence all the positions linearly.

I can try adding some diagnostics to the gap detection, so you can plug in and see why it gets stuck.

from eventuous.

ReinderReinders avatar ReinderReinders commented on June 17, 2024

Some more diagnostics would be most welcome.

from eventuous.

alexeyzimarev avatar alexeyzimarev commented on June 17, 2024

Here comes some new diags: #172

from eventuous.

ReinderReinders avatar ReinderReinders commented on June 17, 2024

@alexeyzimarev I heartily approve and I have approved.
When will a new version be released? I would like to use this in my tests soon.

edit: oh, I just realised I could checkout the /dev branch and manually add these projects to my solution to test it.

from eventuous.

alexeyzimarev avatar alexeyzimarev commented on June 17, 2024

All the preview versions are on MyGet. It's described in the readme.

from eventuous.

ReinderReinders avatar ReinderReinders commented on June 17, 2024

@alexeyzimarev I am not sure what this tells us, but I've run a test run with the new diagnostics. The checkpoint is lost after 10243. If I look in the logging I see no error (failed to store Checkpoint) but I do see a constant repeat of the following:

image

edit: if you "cut out the middle man" (all the other logging) you can pinpoint exactly where it goes wrong:

image

but sadly no explanation as to why.

edit2: the thing I do notice is that for the first time, these last two values do not increment correctly - it falls back somehow?

image

from eventuous.

alexeyzimarev avatar alexeyzimarev commented on June 17, 2024

It seems really weird as it looks the same to me

Last commit position 10243:10243 is behind latest position 10243:10243

Seems like a bug, the same event was processed twice, but it should move on.

from eventuous.

alexeyzimarev avatar alexeyzimarev commented on June 17, 2024

I let it commit the duplicate position and it will raise a warning when it happens. Try with the latest preview from MyGet.

from eventuous.

ReinderReinders avatar ReinderReinders commented on June 17, 2024

I have upgraded to the latest preview version (0.13.1-alpha.0.4) and have run it a few times with very verbose logging.

I no longer see the same ["eventuous"] log lines I saw yesterday (where it logs the gap/last commit position). Instead every time I see only this one line. The logging breaks exactly at ID 101 every time:

image

Despite the logging breaking, I have not yet seen a checkpoint being lost. So it appears to be a little more stable now (despite my losing the extra diagnostics logging that was added last week).

This is a smaller batch test, I am going to retry my large bulktest next with this version. That one will take a while to run.

from eventuous.

alexeyzimarev avatar alexeyzimarev commented on June 17, 2024

It's a bug, I will fix it now.

from eventuous.

alexeyzimarev avatar alexeyzimarev commented on June 17, 2024

It's in the latest

from eventuous.

ReinderReinders avatar ReinderReinders commented on June 17, 2024

Does this help? First time I'm seeing this error.

2023-01-13 13:47:16 [ERR] ["Eventuous.Subscription"] [] [dispatcher_base_dev] Unable to commit position "CommitPosition { Position = 932983, Sequence = 32982, Timestamp = 01/06/2023 03:53:29, Valid = True, LogContext = Eventuous.Subscriptions.Logging.LogContext }"
System.NullReferenceException: Object reference not set to an instance of an object.
   at System.Collections.Generic.SortedSet`1.DoRemove(T item)
   at System.Collections.Generic.SortedSet`1.RemoveWhere(Predicate`1 match)
   at Eventuous.Subscriptions.Checkpoints.CheckpointCommitHandler.CommitInternal(CommitPosition position, CancellationToken cancellationToken)

Strangely enough, the checkpoint that failed to save to the database is the last value that WAS saved:

image

So maybe a timeout/error on the return from DB to application code?

from eventuous.

alexeyzimarev avatar alexeyzimarev commented on June 17, 2024

I don't think it can be fixed without debugging, as the code is very simple, and I can't see where the null reference can happen:

            _positions.RemoveWhere(x => x.Sequence <= position.Sequence);

The content of _positions is CommitPosition record struct, which can't be null.

from eventuous.

alexeyzimarev avatar alexeyzimarev commented on June 17, 2024

Ok, I made sure that there are no duplicate positions added to the list of pending positions. I would expect the change to fix the issue.

from eventuous.

ReinderReinders avatar ReinderReinders commented on June 17, 2024

@alexeyzimarev for clarity, which of these commits fixes the issue?
image
(i.e. has this already been pushed to the latest stable?)

from eventuous.

ReinderReinders avatar ReinderReinders commented on June 17, 2024

I have retested with both the latest stable (0.13.1) and the latest alpha (0.8). Sadly I am still able to sometimes lose the Checkpoint.

from eventuous.

alexeyzimarev avatar alexeyzimarev commented on June 17, 2024

0.13.1 contains everything. I haven't done any change after that other than in branches.

I didn't claim that the issue is fixed, as I am unable to reproduce it. The following changes are included:

  • Resolve the "gap" issue when the new and previous commits are the same (your first discovery with additional diags)
  • Attempt to resolve the SortedSet null reference exception by changing the code to add new positions to the set using Union.

from eventuous.

ReinderReinders avatar ReinderReinders commented on June 17, 2024

@alexeyzimarev I have managed to find the error (though not the solution):

ss1

  • I have included the relevant Eventuous .csprojs in my solution instead of using nuget packages, so I can debug
  • Ran a large bulk subscription (1 million events to process all at once) with a large partition count (100)
  • Let Visual Studio break on any exception

I already suspected that it was the CheckpointCommitHandler (or one of its components) that crashed, since I keep observing the same issue: the application logic (including processing events) continues to work (flawlessly), but the checkpoint is no longer updated in the Event Store (I'm storing checkpoints in the Event Store now instead of at the receiving end).
It appears to be a race condition: the CommitPositionSequence contains so many elements that it takes a 'while' to enumerate the collection. During this interval, more checkpoints are added to the sequence, which causes above exception. The CommitHandler apparently runs on a separate thread (I guess?) since, after F5-ing after above Exception, the application continues to run. However checkpoints are no longer written to the store.

from eventuous.

ReinderReinders avatar ReinderReinders commented on June 17, 2024

Also, perhaps I should add that this exception occurs in the Core libraries of Eventuous (not the Postgres implementation of..). So this issue could occur with any implementation.

from eventuous.

ReinderReinders avatar ReinderReinders commented on June 17, 2024

As for a suggested solution, hmm... make a copy of the collection and run the enumeration on that? I'm not sure if that's a waterproof solution.

from eventuous.

alexeyzimarev avatar alexeyzimarev commented on June 17, 2024

Do you still get null reference exception?

from eventuous.

ReinderReinders avatar ReinderReinders commented on June 17, 2024

No
edit: not in this test run. This was the first exception that occurred.

from eventuous.

ReinderReinders avatar ReinderReinders commented on June 17, 2024

@alexeyzimarev Edit:
On the next test run, I now get a null reference exception:
image

image

from eventuous.

alexeyzimarev avatar alexeyzimarev commented on June 17, 2024

Would be nice if you can check what's inside both collections.

from eventuous.

ReinderReinders avatar ReinderReinders commented on June 17, 2024

This one is just a list of commitpositions. The other (null reference) bug is more intermittent and I haven't been able to reproducde it yet.

image

from eventuous.

ReinderReinders avatar ReinderReinders commented on June 17, 2024

Strangely, I now get a null reference at a different location (the previous was at the .UnionWith operation):

image

from eventuous.

alexeyzimarev avatar alexeyzimarev commented on June 17, 2024

Yeah, the reason must be the same. Something inside the collection is causing it to crash. I need to know what the collection contains. I know it is a list of commit positions, but I want to know what's in there when it crashes with null reference.

from eventuous.

ReinderReinders avatar ReinderReinders commented on June 17, 2024

I am unable to run any more tests today (some infrastructure downtime) but will return to this later. I would hazard a guess that this has something to do with the remove duplicates functionality, but I have no concrete basis to support that guess.

from eventuous.

Steve-OH avatar Steve-OH commented on June 17, 2024

Test app that reproduces the problem in a local SQL Server database: https://github.com/Steve-OH/eventuous-test-issue-165

from eventuous.

Steve-OH avatar Steve-OH commented on June 17, 2024

I've added an even simpler app (no DI) to the solution that exhibits the same problem; see the NoAsp project in the same repository.

from eventuous.

Steve-OH avatar Steve-OH commented on June 17, 2024

Per the Slack discussion (https://eventuousworkspace.slack.com/archives/C02ANQZKFMF/p1695695247275699), I'm still seeing problems even with the latest code (as of eba012c).

Depending on the phase of the moon, I get one of two exceptions, both null references, and both in CheckpointCommitHandler.cs, but in different places. This one shows a likely async-related race condition, because by the time the exception message is displayed in the code editor the variables that might be null references are pointing to actual objects (I also viewed the content of list, and it's fine):
image

The error message displayed on mouse-over on this one doesn't make sense to me. I guess it's unable to display a value that's inside a lambda expression?
image

In any case, any attempt to log the value of various things in order to determine what's causing the problem makes the problem disappear.

from eventuous.

alexeyzimarev avatar alexeyzimarev commented on June 17, 2024

I thought of a race condition too, but I can't figure out how it happens. All the checkpoints for one subscription are getting to a channel with a single consumer. Then, they go to an observable.

I looked, however, at the CommitPosition struct, and it has the LogContext-type property, which is a reference type. It can theoretically be null, and it can fail on comparisons by the sorted list. As the repro is available, I am going to write a custom equality function, excluding irrelevant properties, and see if it helps.

from eventuous.

Steve-OH avatar Steve-OH commented on June 17, 2024

Aha! I spoke to soon about the new Eventuous code working with my small test app. It does work, but only if the number of injected events is small (1000 in the test app). Once I bump that number up to 100,000, it fails, although only intermittently. I've updated the other repo with the changes. (Use the NoAsp project, which doesn't use DI and has been updated to use the latest Eventuous; the .sln file looks for Eventuous in ..\EventuousBeta.) If I get some time I'll update the other app that does use DI later today.

In general, I would say that the problem appears when a large number of events are added to the Messages table in a relatively short amount of time. In my real app, about 30,000 events are added in about four seconds.

from eventuous.

Steve-OH avatar Steve-OH commented on June 17, 2024

I've made some progress in understanding what's going on. The exceptions are being caused by the fact that CheckpointCommitHandler.AddBatchAndGetLast() and CheckpointCommitHandler.CommitInternal() typically run on different threads, and the exceptions occur when the execution of one method overlaps that of the other. Depending on the exact timing of the overlap, the exception is raised in one of three places:

  1. CheckpointCommitHandler.AddBatchAndGetLast, line 76 (UnionWith - null reference)
  2. CheckpointCommitHandler.CommitInternal, line 129 (RemoveWhere - null reference)
  3. CommitPositionSequence.Get(), line 23 (Zip - collection modified during enumeration) (This one is invoked as part of AddBatchAndGetLast.)

I tried adding a mutex so that the two methods can't overlap, and that does prevent the exceptions from being raised. If added in just the right places, it also appears to fix the problem of the checkpoint update seizing up, so I think the checkpoint update seizing up is also caused by overlapped execution of the methods, but only when that doesn't lead to an exception being raised.

I used mutex.WaitOne()/mutex.ReleaseMutex() to wrap the code in three places:

  1. around _positions.UnionWith(list); in AddBatchAndGetLast
  2. around _positions.FirstBeforeGap; in GetCommitPosition
  3. around the first _positions.RemoveWhere(x => x.Sequence <= position.Sequence); in CommitInternal

I'm sure that the execution overlap is the result of the way that the observable subscription is set up in the CheckpointCommitHandler constructor, but I don't have enough experience in the ins and outs of RX, especially when async methods are involved, to be able to offer any insight as to where things might be going wrong and how to fix it more elegantly.

from eventuous.

alexeyzimarev avatar alexeyzimarev commented on June 17, 2024

@Steve-OH I hope you don't do this in your real app:

[EventType($"V1.{nameof(TestAccountInserted)}")]

:)

from eventuous.

alexeyzimarev avatar alexeyzimarev commented on June 17, 2024

Ok, I think I fixed it

from eventuous.

Steve-OH avatar Steve-OH commented on June 17, 2024

@Steve-OH I hope you don't do this in your real app:

[EventType($"V1.{nameof(TestAccountInserted)}")]

:)

How else can I ensure that someone foolish enough to change the name of the event type gets what they deserve?

from eventuous.

Steve-OH avatar Steve-OH commented on June 17, 2024

A quick test with the new code seems to have fixed the issue, but since it's intermittent, I will need to do some more test runs to be sure.

(Meanwhile, I'm seeing some other unrelated concurrency issues; more on that later....)

from eventuous.

alexeyzimarev avatar alexeyzimarev commented on June 17, 2024

I thought that since checkpoints are added to the observable sequentially, it should not cause concurrency issues. What I haven't thought is that (I believe) unlike channels, observable consumer will be called when the value is there, so it might enter the consumer even before the previous element wasn't processed. In addition, the collection was accessed for getting the value at the same as it was manipulated. Adding a semaphore to synchronise those should definitely solve the issue regardless of the method of delivering checkpoint batches downstream. The only reason to use the observable now is for time+size batching.

Would love to hear about other concurrency issues :)

from eventuous.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.