DEBUG: [CommandBus|Publish]: user.RegisterWithEmail {Email:[email protected]}
2019/12/28 15:21:35.993717 DEBUG: [EventBus|Publish]: user.WasRegisteredWithEmail {"id":"9b8d95a1-e9dc-4ef7-9c88-428898b0d0a3","metadata":{"type":"user.WasRegisteredWithEmail","stream_id":"cd8d6723-75d4-4d5a-bb1f-904cde2c45e2","stream_name":"user.User","stream_version":0,"occurred_at":"2019-12-28T15:21:35.9931463Z"},"payload":{"id":"cd8d6723-75d4-4d5a-bb1f-904cde2c45e2","email":"[email protected]"}}
Command and event buses publish but the event handlers are a hit or miss to be registered and subscribed for topics. Can't seem to find a pattern yet as to when it works and when not.
The easiest way to reproduce it is to open a tab with list users, after I create a user it doesn't appear in the list 9 out of 10 restarts of the system.
Replicated locally, the issue persists, seems like event handlers are not registering for all topics.
// Register registers event handlers for topics
// will panic after timeout if unable to register handlers
func Register(conn *grpc.ClientConn, eventBus eventbus.EventBus, topicToHandlerMap map[string]eventbus.EventHandler, timeout time.Duration) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
fmt.Printf("topicToHandlerMap: %v \n", topicToHandlerMap)
connName := "pubsub"
// Will retry infinitely until timeouts by context (after 5 seconds)
_, err := gollback.New(ctx).Retry(0, func(ctx context.Context) (interface{}, error) {
//fmt.Printf("conn: %v", conn)
fmt.Printf("isConnectionServing %s: %t \n", connName, grpc_utils.IsConnectionServing(connName, conn))
if grpc_utils.IsConnectionServing(connName, conn) {
for topic, handler := range topicToHandlerMap {
fmt.Printf("for topic: %v \n", topic)
// Will resubscribe to handler on error infinitely
go gollback.New(context.Background()).Retry(0, func(ctx context.Context) (interface{}, error) {
fmt.Printf("gollback topic: %v \n", topic)
err := eventBus.Subscribe(ctx, topic, handler)
fmt.Printf("err: %v \n", err)
return nil, errors.Newf(errors.INTERNAL, "EventHandler %s unsubscribed (%v)", topic, err)
})
}
return nil, nil
}
return nil, errors.Newf(" %s gRPC connection is not serving", connName)
})
if err != nil {
panic(err)
}
}
isConnectionServing pubsub: true
for topic: user.AccessTokenWasRequested
for topic: user.WasRegisteredWithEmail
for topic: user.WasRegisteredWithGoogle
for topic: user.WasRegisteredWithFacebook
for topic: user.EmailAddressWasChanged
gollback topic: user.EmailAddressWasChanged
gollback topic: user.EmailAddressWasChanged
gollback topic: user.EmailAddressWasChanged
gollback topic: user.EmailAddressWasChanged
2019/12/29 16:25:00.862596 INFO: [EventBus|Subscribe]: user.EmailAddressWasChanged
2019/12/29 16:25:00.862612 INFO: [EventBus|Subscribe]: user.EmailAddressWasChanged
gollback topic: user.EmailAddressWasChanged
2019/12/29 16:25:00.862598 INFO: [EventBus|Subscribe]: user.EmailAddressWasChanged
2019/12/29 16:25:00.862609 INFO: [EventBus|Subscribe]: user.EmailAddressWasChanged
2019/12/29 16:25:00.862665 INFO: [EventBus|Subscribe]: user.EmailAddressWasChanged
The async nature of gollback is somehow producing unexpected results with the sync for loop as it is repeating some topics and skipping others.
pubsub logs:
2019/12/29 15:34:23.166428 INFO: gRPC Server|Subscribe] user.WasRegisteredWithGoogle
2019/12/29 15:34:23.166430 INFO: gRPC Server|Subscribe] user.WasRegisteredWithGoogle
2019/12/29 15:34:23.166434 INFO: gRPC Server|Subscribe] user.WasRegisteredWithGoogle
2019/12/29 15:34:23.166456 INFO: gRPC Server|Subscribe] user.WasRegisteredWithGoogle
2019/12/29 15:34:23.167126 INFO: gRPC Server|Subscribe] user.WasRegisteredWithGoogle
2019/12/29 15:35:43.166624 INFO: gRPC Server|Subscribe] user.WasRegisteredWithGoogle
2019/12/29 15:35:43.166624 INFO: gRPC Server|Subscribe] user.WasRegisteredWithGoogle
2019/12/29 15:35:43.166651 INFO: gRPC Server|Subscribe] user.WasRegisteredWithGoogle
2019/12/29 15:35:43.166651 INFO: gRPC Server|Subscribe] user.WasRegisteredWithGoogle
2019/12/29 15:35:43.166775 INFO: gRPC Server|Subscribe] user.WasRegisteredWithGoogle
2019/12/29 15:37:53.446528 INFO: gRPC Server|Subscribe] user.AccessTokenWasRequested
2019/12/29 15:37:53.446543 INFO: gRPC Server|Subscribe] user.AccessTokenWasRequested
2019/12/29 15:37:53.446530 INFO: gRPC Server|Subscribe] user.AccessTokenWasRequested
2019/12/29 15:37:53.446547 INFO: gRPC Server|Subscribe] user.AccessTokenWasRequested
2019/12/29 15:37:53.446584 INFO: gRPC Server|Subscribe] user.AccessTokenWasRequested
2019/12/29 15:39:43.447461 INFO: gRPC Server|Subscribe] user.AccessTokenWasRequested
2019/12/29 15:39:43.447473 INFO: gRPC Server|Subscribe] user.AccessTokenWasRequested
2019/12/29 15:39:43.447467 INFO: gRPC Server|Subscribe] user.AccessTokenWasRequested
2019/12/29 15:39:43.447487 INFO: gRPC Server|Subscribe] user.AccessTokenWasRequested
2019/12/29 15:39:43.447472 INFO: gRPC Server|Subscribe] user.AccessTokenWasRequested
2019/12/29 15:42:23.449741 INFO: gRPC Server|Subscribe] user.AccessTokenWasRequested
2019/12/29 15:42:23.449757 INFO: gRPC Server|Subscribe] user.AccessTokenWasRequested
2019/12/29 15:42:23.449766 INFO: gRPC Server|Subscribe] user.AccessTokenWasRequested
2019/12/29 15:42:23.449782 INFO: gRPC Server|Subscribe] user.AccessTokenWasRequested
2019/12/29 15:42:23.449789 INFO: gRPC Server|Subscribe] user.AccessTokenWasRequested