Giter Club home page Giter Club logo

deeptiman / grpc-connection-library Goto Github PK

View Code? Open in Web Editor NEW
14.0 4.0 4.0 139 KB

grpc-connection-library that supports the gRPC client-server connection interface for the developers to use as a gRPC middleware in the application.

Home Page: https://pkg.go.dev/github.com/Deeptiman/grpc-connection-library

License: MIT License

Go 99.64% Shell 0.36%
grpc grpc-go grpc-server grpc-client grpc-service concurrency concurrent-programming ping-pong pingaccess network-programming

grpc-connection-library's Introduction

grpc-connection-library

grpc-connection-library supports the gRPC client-server connection interface for the developers to use as a gRPC middleware in the application. The library is written in Golang with a concurrency pipeline design pattern to synchronize the gRPC connection pool system.

GitHub top language Go Report Card

Features

  • The gRPC connection flow among server/client synchronized using the Ping/Pong services.
  • gRPC connection library supports connection pool reuse the gRPC client connection instance.
  • Concurrency Pipeline design pattern synchronizes the data flow among several stages while creating the connection pool.
  • The selection process of the gRPC connection instance from the pool is designed using the reflect.SelectCase that supports pseudo-random technique for choosing among different cases.
  • go-batch processing library implemented to divide the connection instances from the pool into batches.
  • The grpc-retry policy helps to retry the failure of gRPC connections with backoff strategy.
  • The grpclog will show the internal connection lifecycle that will be useful to debug the connection flow.

Installation

go get github.com/Deeptiman/go-connection-library

Go Docs

Documentation at pkg.go.dev

Example

Client:

package main

import (
	"fmt"
	grpc "github.com/Deeptiman/grpc-connection-library/grpc"
)

func main() {
	address := "localhost:50051"
	client, err := grpc.NewGRPCConnection(grpc.WithAddress(address), grpc.WithConnectionType(grpc.Client))
	if err != nil {
		fmt.Println("Failed to create GRPC Connection - ", err.Error())
		return
	}
	conn, err := client.GetConn()
	if err != nil {
		return
	}
	fmt.Println("GRPC Client Connected State= ", conn.GetState().String())
}

Server

package main

import (
	"fmt"
	grpc "github.com/Deeptiman/grpc-connection-library/grpc"
)

func main() {
	
	server, err := grpc.NewGRPCConnection(grpc.WithConnectionType(grpc.Server))
	if err != nil {
		fmt.Println("Failed to create GRPC Connection - ", err.Error())
		return
	}
	server.ListenAndServe()
}

gRPC connection Ping/Pong service

The library provides a Ping/Pong service facility to test the client/server connection flow. The service helps to establish connection health check status.

protos:

syntax = "proto3";

package ping;

option go_package = ".;ping";

import "google/protobuf/timestamp.proto";

service PingService {
    rpc SendPongMsg(Request) returns (Response) {}
}

message Request {
    string message = 1;
}

message Pong {
    int32 index = 1;
    string message = 2;
    google.protobuf.Timestamp received_on = 3;
}

message Response {
    Pong pong = 1;
}

Server:

  1. ListenAndServe will start listening to the specific serverPort with the "tcp" network type.
  2. After the server listener socket is opened, the initial Ping request gets registered that can be used by any gRPC clients to sent Ping-Pong request to check the client-server gRPC connection health status.
grpcServer := grpc.NewServer(serverOptions...)
// Register the Ping service request
pb.RegisterPingServiceServer(grpcServer, &pb.PingService{})
if err = grpcServer.Serve(listener); err != nil {
	g.log.Errorln("failed start server - %v", err)
	g.log.Fatal(err)
	return err
}

Client:

  1. gRPC client connection instance creates Ping service to test connection health.
  2. The SendPingMsg sends a test msg to the target server address to get the Pong response msg back to verify the connection flow.
conn, err := grpc.Dial(address, opts...)
if err != nil {
    c.Log.Fatal(err)
    return nil, err
}
client := pb.NewPingServiceClient(c.Conn)
respMsg, err := pb.SendPingMsg(client)
if err != nil {
   return nil, err
}
c.Log.Infoln("GRPC Pong msg - ", respMsg)

Concurrency Pipeline for gRPC Connection Pool

  1. ConnPoolPipeline() follows the concurrency pipeline technique to create a connection pool in a higher concurrent scenarios. The pipeline has several stages that use the Fan-In, Fan-Out technique to process the data pipeline using channels.
  2. The entire process of creating the connection pool becomes a powerful function using the pipeline technique. The four stages work as a generator pattern for the connection pool.

Pipeline Stages

Stage-1:

This stage will create the initial gRPC connection instance that gets passed to the next pipeline stage for replication.

connInstancefn := func(done chan interface{}) <-chan *grpc.ClientConn {

  connCh := make(chan *grpc.ClientConn)

  conn, err := c.ClientConn()
  if err != nil {
   done <- err
  }

   go func() {
      c.Log.Infoln("1#connInstance ...")
      defer close(connCh)
    select {
      case connCh <- conn:
        c.Log.Infoln("GRPC Connection Status - ", conn.GetState().String())
    }
   }()
 return connCh 
}

Stage-2:

The cloning process of the initial gRPC connection object will begin here. The connection instance gets passed to the next stage iteratively via channels.

connReplicasfn := func(connInstanceCh <-chan *grpc.ClientConn) <-chan *grpc.ClientConn {
	connInstanceReplicaCh := make(chan *grpc.ClientConn)
		go func() {
			c.Log.Infoln("2#connReplicas ...")
			defer close(connInstanceReplicaCh)
			for conn := range connInstanceCh {
				for i := 0; uint64(i) < c.MaxPoolSize; i++ {
					select {
					case connInstanceReplicaCh <- conn:
					}
				}
			}
		}()
	return connInstanceReplicaCh
}

Stage-3:

This stage will start the batch processing using the github.com/Deeptiman/go-batch library. The MaxPoolSize is divided into multiple batches and released via a supply channel from go-batch library internal implementation.

connBatchfn := func(connInstanceCh <-chan *grpc.ClientConn) chan []batch.BatchItems {

		go func() {
			c.Log.Infoln("3#connBatch ...")
			c.ConnInstanceBatch.StartBatchProcessing()
			for conn := range connInstanceCh {
				select {
				case c.ConnInstanceBatch.Item <- conn:
				}
			}
		}()
	return c.ConnInstanceBatch.Consumer.Supply.ClientSupplyCh
}

Stage-4:

The connection queue reads through the go-batch client supply channel and stores the connection instances as channel case in []reflect.SelectCase. So, whenever the client requests a connection instance, reflect.SelectCase retrieves the conn instances from the case using the pseudo-random technique.

connEnqueuefn := func(connSupplyCh <-chan []batch.BatchItems) <-chan batch.BatchItems {
	receiveBatchCh := make(chan batch.BatchItems)
		go func() {
			c.Log.Infoln("4#connEnqueue ...")
			defer close(receiveBatchCh)
			for supply := range connSupplyCh {

				for _, s := range supply {
					c.EnqueConnBatch(s)
					select {
					case receiveBatchCh <- s:
					}
				}
			}
		}()
	return receiveBatchCh
}

Run the pipeline:

for s := range connEnqueuefn(connBatchfn(connReplicasfn(connInstancefn(done)))) {
	go func(s batch.BatchItems) {
		atomic.AddUint64(&ConnPoolPipeline, 1)
		if c.GetConnPoolSize() == c.MaxPoolSize {
			select {
			case pipelineDoneChan <- "Done":
				return
			}
		}
	}(s)
}

Batch Processing Item structure in the connection pool

type BatchItems struct {
   Id      int         `json:"id"`
   BatchNo int         `json:"batchNo"`
   Item    interface{} `json:"item"`
}

Scenario:

There are 12 gRPC connection instances stored in 3 batches.

Batch Items
Batch-1
{Id: 1, BatchNo: 1, Item: *grpc.ClientConn}
{Id: 2, BatchNo: 1, Item: *grpc.ClientConn}
{Id: 3, BatchNo: 1, Item: *grpc.ClientConn}
{Id: 4, BatchNo: 1, Item: *grpc.ClientConn}
Batch-2
{Id: 1, BatchNo: 2, Item: *grpc.ClientConn}
{Id: 2, BatchNo: 2, Item: *grpc.ClientConn}
{Id: 3, BatchNo: 2, Item: *grpc.ClientConn}
{Id: 4, BatchNo: 2, Item: *grpc.ClientConn}
Batch-3
{Id: 1, BatchNo: 3, Item: *grpc.ClientConn}
{Id: 2, BatchNo: 3, Item: *grpc.ClientConn}
{Id: 3, BatchNo: 3, Item: *grpc.ClientConn}
{Id: 4, BatchNo: 3, Item: *grpc.ClientConn}

So, the connection pool stores the connection instances as a batch processing array []batch.BatchItems{}.

gRPC Connection Pool Selection Process

The connection instance selection from the pool happens using the reflect.SelectCase package. The connection instances stored as a channel case in the []reflect.SelectCase{}. The search for any available connection instance from the pool works as a pseudo-random case selector.

Store conn instances into reflect.SelectCase

// The enqueCh will store the array of channel batchItems and get added to the reflect.SelectCase.
   enqueCh    []chan batch.BatchItems
   
   q.itemSelect[i] = reflect.SelectCase{
		Dir: reflect.SelectRecv, 
		Chan: reflect.ValueOf(q.enqueCh[i])
   }

Retrive conn instance from pool of cases

for {
	chosen, rcv, ok := reflect.Select(q.itemSelect)
	if !ok {
		q.log.Infoln("Conn Batch Instance Not Chosen = ", chosen)
		continue
	}
	q.log.Infoln("SelectCase", "Batch Conn : chosen = ", chosen)

	// Remove the selected case from the array to avoid the duplicate choosing of the cases.
	q.itemSelect = append(q.itemSelect[:chosen], q.itemSelect[chosen+1:]...)

	return rcv.Interface().(batch.BatchItems)
}

License

This project is licensed under the MIT License

References

  1. Go gRPC Middleware
  2. Concurrency Pipeline Patterns

grpc-connection-library's People

Contributors

deeptiman avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.