Giter Club home page Giter Club logo

go-dcp's Introduction

Go Dcp Go Reference Go Report Card

This repository contains go implementation of a Couchbase Database Change Protocol (DCP) client.

Contents

Why?

Example

package main

import (
  "github.com/Trendyol/go-dcp"
  "github.com/Trendyol/go-dcp/logger"
  "github.com/Trendyol/go-dcp/models"
)

func listener(ctx *models.ListenerContext) {
  switch event := ctx.Event.(type) {
  case models.DcpMutation:
    logger.Log.Info(
      "mutated(vb=%v,eventTime=%v) | id: %v, value: %v | isCreated: %v",
      event.VbID, event.EventTime, string(event.Key), string(event.Value), event.IsCreated(),
    )
  case models.DcpDeletion:
    logger.Log.Info(
      "deleted(vb=%v,eventTime=%v) | id: %v",
      event.VbID, event.EventTime, string(event.Key),
    )
  case models.DcpExpiration:
    logger.Log.Info(
      "expired(vb=%v,eventTime=%v) | id: %v",
      event.VbID, event.EventTime, string(event.Key),
    )
  }

  ctx.Ack()
}

func main() {
  connector, err := dcp.NewDcp("config.yml", listener)
  if err != nil {
    panic(err)
  }

  defer connector.Close()

  connector.Start()
}

Usage

$ go get github.com/Trendyol/go-dcp

Configuration

Variable Type Required Default Description
hosts []string yes - Couchbase host like localhost:8091.
username string yes - Couchbase username.
password string yes - Couchbase password.
bucketName string yes - Couchbase DCP bucket.
dcp.group.name string yes DCP group name for vbuckets.
scopeName string no _default Couchbase scope name.
collectionNames []string no _default Couchbase collection names.
connectionBufferSize uint, string no 20mb gocbcore library buffer size. 20mb is default. Check this if you get OOM Killed.
connectionTimeout time.Duration no 5s Couchbase connection timeout.
secureConnection bool no false Enable TLS connection of Couchbase.
rootCAPath string no *not set if secureConnection set true this field is required.
debug bool no false For debugging purpose.
dcp.bufferSize int no 16mb Go DCP listener pre-allocated buffer size. 16mb is default. Check this if you get OOM Killed.
dcp.connectionBufferSize uint, string no 20mb gocbcore library buffer size. 20mb is default. Check this if you get OOM Killed.
dcp.connectionTimeout time.Duration no 5s DCP connection timeout.
dcp.listener.bufferSize uint no 1000 Go DCP listener buffered channel size.
dcp.listener.skipUntil time.Time no Set this if you want to skip events until certain time.
dcp.group.membership.type string no DCP membership types. couchbase, kubernetesHa, kubernetesStatefulSet or static. Check examples for details.
dcp.group.membership.memberNumber int no 1 Set this if membership is static. Other methods will ignore this field.
dcp.group.membership.totalMembers int no 1 Set this if membership is static or kubernetesStatefulSet. Other methods will ignore this field.
dcp.group.membership.rebalanceDelay time.Duration no 20s Works for autonomous mode.
dcp.group.membership.config map[string]string no *not set Set key-values of config. expirySeconds,heartbeatInterval,heartbeatToleranceDuration,monitorInterval,timeout for couchbase type
dcp.config.disableChangeStreams bool no false Set this to true if you did not want to get older versions of changes for Couchbase Server 7.2.0+ using Magma storage buckets
leaderElection.enabled bool no false Set this true for memberships kubernetesHa.
leaderElection.type string no kubernetes Leader Election types. kubernetes
leaderElection.config map[string]string no *not set Set key-values of config. leaseLockName,leaseLockNamespace, leaseDuration, renewDeadline, retryPeriod for kubernetes type.
leaderElection.rpc.port int no 8081 This field is usable for kubernetesStatefulSet membership.
checkpoint.type string no auto Set checkpoint type auto or manual.
checkpoint.autoReset string no earliest Set checkpoint start point to earliest or latest.
checkpoint.interval time.Duration no 20s Checkpoint checking interval.
checkpoint.timeout time.Duration no 60s Checkpoint checking timeout.
healthCheck.disabled bool no false Disable Couchbase connection health check.
healthCheck.interval time.Duration no 20s Couchbase connection health checking interval duration.
healthCheck.timeout time.Duration no 5s Couchbase connection health checking timeout duration.
rollbackMitigation.disabled bool no false Disable reprocessing for roll-backed Vbucket offsets.
rollbackMitigation.interval time.Duration no 500ms Persisted sequence numbers polling interval.
rollbackMitigation.configWatchInterval time.Duration no 2s Cluster config changes listener interval.
metadata.type string no couchbase Metadata storing types. file or couchbase.
metadata.readOnly bool no false Set this for debugging state purposes.
metadata.config map[string]string no *not set Set key-values of config. bucket,scope,collection,connectionBufferSize,connectionTimeout for couchbase type
api.disabled bool no false Disable metric endpoints
api.port int no 8080 Set API port
metric.path string no /metrics Set metric endpoint path.
logging.level string no info Set logging level.

Environment Variables

These environment variables will overwrite the corresponding configs.

Variable Type Corresponding Config Description
GO_DCP__DCP_GROUP_MEMBERSHIP_MEMBERNUMBER int dcp.group.membership.memberNumber To be able to prevent making deployment to scale up or down.
GO_DCP__DCP_GROUP_MEMBERSHIP_TOTALMEMBERS int dcp.group.membership.totalMembers To be able to prevent making deployment to scale up or down.

Monitoring

The client offers an API that handles different endpoints and expose several metrics.

API

Endpoint Description Debug Mode
GET /status Returns a 200 OK status if the client is able to ping the couchbase server successfully.
GET /rebalance Triggers a rebalance operation for the vBuckets.
GET /states/offset Returns the current offsets for each vBucket. x
GET /states/followers Returns the list of follower clients if service discovery enabled x
GET /debug/pprof/* Fiber Pprof x

The Client collects relevant metrics and makes them available at /metrics endpoint. In case you haven't configured a metric.path, the metrics will be exposed at the /metrics.

Exposed metrics

Metric Name Description Labels Value Type
cbgo_mutation_total The total number of mutations on a specific vBucket vbId: ID of the vBucket Counter
cbgo_deletion_total The total number of deletions on a specific vBucket vbId: ID of the vBucket Counter
cbgo_expiration_total The total number of expirations on a specific vBucket vbId: ID of the vBucket Counter
cbgo_agent_queue_current The current number of agent queue address: Couchbase, is dcp: Is Dcp Agent Gauge
cbgo_agent_queue_max The max number of agent queue address: Couchbase, is dcp: Is Dcp Agent Gauge
cbgo_seq_no_current The current sequence number on a specific vBucket vbId: ID of the vBucket Gauge
cbgo_start_seq_no_current The starting sequence number on a specific vBucket vbId: ID of the vBucket Gauge
cbgo_end_seq_no_current The ending sequence number on a specific vBucket vbId: ID of the vBucket Gauge
cbgo_persist_seq_no_current The persist sequence number on a specific vBucket vbId: ID of the vBucket Gauge
cbgo_lag_current The current lag on a specific vBucket vbId: ID of the vBucket Gauge
cbgo_total_lag_current The current total lag N/A Gauge
cbgo_process_latency_ms_current The latest process latency in milliseconds N/A Gauge
cbgo_dcp_latency_ms_current The latest consumed dcp message latency in milliseconds N/A Counter
cbgo_rebalance_current The number of total rebalance N/A Counter
cbgo_active_stream_current The number of total active stream N/A Gauge
cbgo_total_members_current The total number of members in the cluster N/A Gauge
cbgo_member_number_current The number of the current member N/A Gauge
cbgo_membership_type_current The type of membership of the current member Membership type Gauge
cbgo_offset_write_current The latest number of the offset write N/A Gauge
cbgo_offset_write_latency_ms_current The latest offset write latency in milliseconds N/A Gauge

Compatibility

Go DCP Version Minimum Couchbase Server Version
x<1.1.16 6.5.x
1.1.16>=x 5.x.x

Breaking Changes

Date taking effect Version Change How to check
December 14, 2023 v1.1.19 dcp.config.[DisableExpiryOpcode,DisableStreamEndByClient, EnableChangeStreams] removed Review your configs

Examples

go-dcp's People

Contributors

abdulsametileri avatar canerpatir avatar dogukank avatar emreodabas avatar emretanriverdi avatar erayarslan avatar erenarslan avatar firateski avatar firatferoglu avatar gulumseraslann avatar keremdndr avatar mhmtszr avatar mstryoda avatar muhammedsaidkaya avatar oguzyildirim avatar ozgeonay avatar ramazan avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

go-dcp's Issues

create contributing.md

We need to create a Contributing guideline to this project.

It will enable a person to understand what it requires to contribute to this project.

feat: detect bucket type and apply config dynamically

Feature request related to a problem

  • When using ephemeral bucket type (Metadata bucket also same) connector get started successfully but mutation events does not listen by the connector. Also there is no error logs everything seems OK. Then trigger a mutation from couchbase and run project on my local, there is no events received from dcp. We though it might be DCP side but we tested it via couchbase eventing function to receive mutation/delete logs same bucket and collection it works.
    issue link

Describe the solution you'd like
dcp-client observes and desires data to persist on Couchbase. Persistence is not required on Ephemeral Bucket. When adding following config to your config.yaml you can pass that logic.

rollbackMitigation:
  disabled: true

Describe alternatives you've considered
Adding new feature to detect bucket type and apply the config as I mentioned above automatically.

Lag metric anomally

After the cluster configuration changed, lag metrics continuously increased.

Screenshot 2023-08-14 at 14 38 42

This behavior causes false alerting behaviour! We need to debug and solve it asap!

image

feat: make healthcheck more resilient (retry + reconnect)

In current implementation, I noticed healthcheck function 1 does not support for retrying along with graceful handling. 2

Proposal
It would be nice to add a something failureThreshold to ConfigHealthCheck. So that after ping fails failureThreshold times in a row, go-dcp-client should treats the CB connection as unhealthy and triggers a restart/reconnect for that specific server.

Happy to discuss reasonable.

Footnotes

  1. https://github.com/Trendyol/go-dcp-client/blob/16ab8a163b085e1578d688e0e38a617e8cfd028b/dcp.go#L56-L70

  2. https://github.com/Trendyol/go-dcp-client/blob/16ab8a163b085e1578d688e0e38a617e8cfd028b/dcp.go#L117

feat: New configuration field to support skipping changes until a date.

The Problem
When running a DCP app in a Multi-AZ setup. We can only run as active-passive because the same messages will be processed in multiple availability zones, and messages will be processed as duplicated.
In an active-passive setup, when we need to failover our dcp application, we can not continue where the previous dcp app instance left off. This is because sequence numbers of mutations are changed in the target couchbase cluster after mutations are replicated with XDCR.

Proposed Solution
Our proposed solution for a failover scenario is to start the dcp app with snapshot mode as it listens to changes from the beginning. In this case, we need a configuration field such as SkipUntil: "2024-08-12T13:40:03+0000" that the app will skip changes before this date.

A possible alternative solution
If we could find virtual buckets' corresponding sequence numbers in the target cluster, we could find a way to create new offset storage records for virtual buckets. However, we couldn't find a way to do this yet.

Setting default couchbase port

Describe the bug
When more than one couchbase data node IP is given, an error is received when connecting because the default port is not set.

Expected behavior
The default port can be 8091.

feat: support multiple connector with a http server

Is your feature request related to a problem? Please describe.
Currently, we can't create multiple connectors that api's enabled, because each connector tries to create a new api. We need to support getting the HTTP server from outside.

Also, we need to find a way to show the metrics of each connector separately.

Support initial snapshot of CB table

Is your feature request related to a problem? Please describe.
I need to sync a CB table to another database (may not be CB) or micro service. Current go-dcp library is able to capture the latest changes to the table. However, my project also needs the initial snapshot of the table because not all the docs are updated actively.

Describe the solution you'd like
It would be ideal to allow a go-dcp client (connector) to capture the initial snapshot of a given table before starting reading the change logs. The snapshot events may be directly read by the connector and published to kafka without being saved to the change logs. After all the snapshot events are processed, the connector then starts reading the change logs from where the snapshot ends. The behavior could be similar to the Debezium MySQL connector. However, implementing the same for CouchBase is technically a lot more challenging.

Describe alternatives you've considered
There is no clear alternative. It's hard to implement a separate snapshot tool that is seamlessly integrated with a dcp connector.

Additional context
The open-source Couchbase source connector used to have a configuration option "use_snapshot" but dropped in recent releases because it didn't work well.
Ref this discussion for more details: https://www.couchbase.com/forums/t/couchbase-kafka-connector-connection-error-cannot-fetch-configuration-for-bucket/35047

feat: fetch couchbase server version from couchbase api

we need to fetch server version to set these configurations

dcp.config.disableExpiryOpcode
dcp.config.disableStreamEndByClient

currently we are expecting from user to set them because gocbcore did not provide server version.

Getting panic while closing the application

Describe the bug
We are getting panic: send on closed channel exception while closing the application.

Screenshots
image

Version:

  • OS: macOS
  • Golang version: v1.20
  • github.com/Trendyol/go-kafka-connect-couchbase v0.0.43

Update vbucket checkpoint documents that were updated after the last commit

Is your feature request related to a problem? Please describe.
The commit function writes 1024 vbucket checkpoint documents to Couchbase. There may be a small number of transactions but either way, we make 1024 write operations.

Describe the solution you'd like
Update vbucket checkpoint documents that were updated after the last commit.

Introduce log severities

Is your feature request related to a problem? Please describe.
Checkpoint logs almost fill 2.5m of data in a day.

Describe the solution you'd like
We should define the severity of all the logs and clients must be able to configure via configuration.

feature: Add service discovery couchbase host support

Currently, we are getting Couchbase's host from the config file as

hosts:
  - xx.xxx.xx.xxx:8091
  - xx.xxx.xx.xxx:8091
  - xx.xxx.xx.xxx:8091

We need to support service discovery URLs like couchbase://hostname.external

This feature is already implemented in gocb, we can get the related codes and implement ours like this

Health check endpoint

We need to check if dcp works or not via an endpoint.

If go-dcp-client provides an endpoint like "/_healthcheck" and it returns the status of the dcp client would be great.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.