Giter Club home page Giter Club logo

singchia / frontier Goto Github PK

View Code? Open in Web Editor NEW
321.0 1.0 55.0 14.27 MB

The first open source cloud-native tcp gateway for edges. 首个开源云原生tcp/udp长连接网关,支持rpc、消息和流,微服务和边缘节点/客户端互相直达!

License: Apache License 2.0

Go 95.76% Makefile 3.24% Shell 0.10% Lua 0.60% Dockerfile 0.30%
bidirectional-rpc edge edge-proxy gateway high-availability k8s messaging multiplexer operator rpc scale stream streaming cloud-native reverse-proxy tcp frontier game-server im tcp-gateway

frontier's Introduction

Go Go Report Card Go Reference License

简体中文 | English

Frontier是一个go开发的全双工开源长连接网关,旨在让微服务直达边缘节点或客户端,反之边缘节点或客户端也同样直达微服务。对于两者,提供了全双工的单双向RPC调用,消息发布和接收,以及点对点流的功能。Frontier符合云原生架构,可以使用Operator快速部署一个集群,具有高可用和弹性,轻松支撑百万边缘节点或客户端在线的需求。

特性

  • RPC 微服务和边缘可以Call对方的函数(提前注册),并且在微服务侧支持负载均衡
  • 消息 微服务和边缘可以Publish对方的Topic,边缘可以Publish到外部MQ的Topic,微服务侧支持负载均衡
  • 多路复用/流 微服务可以直接在边缘节点打开一个流(连接),可以封装例如文件上传、代理等,天堑变通途
  • 上线离线控制 微服务可以注册边缘节点获取ID、上线离线回调,当这些事件发生,Frontier会调用这些函数
  • API简单 在项目api目录下,分别对边缘和微服务提供了封装好的sdk,可以非常简单的基于这个sdk做开发
  • 部署简单 支持多种部署方式(docker docker-compose helm以及operator)来部署Frontier实例或集群
  • 水平扩展 提供了Frontiter和Frontlas集群,在单实例性能达到瓶颈下,可以水平扩展Frontier实例或集群
  • 高可用 支持集群部署,支持微服务和边缘节点永久重连sdk,在当前实例宕机情况时,切换新可用实例继续服务
  • 支持控制面 提供了gRPC和rest接口,允许运维人员对微服务和边缘节点查询或删除,删除即踢除目标下线

架构

组件Frontier

  • Service End:微服务侧的功能入口,默认连接
  • Edge End:边缘节点或客户端侧的功能入口
  • Publish/Receive:发布和接收消息
  • Call/Register:调用和注册函数
  • OpenStream/AcceptStream:打开和接收点到点流(连接)
  • 外部MQ:Frontier支持将从边缘节点Publish的消息根据配置的Topic转发到外部MQ

Frontier需要微服务和边缘节点两方都主动连接到Frontier,Service和Edge的元信息(接收Topic,RPC,Service名等)可以在连接的时候携带过来。连接的默认端口是:

  • :30011 提供给微服务连接,获取Service
  • :30012 提供给边缘节点连接,获取Edge
  • :30010 提供给运维人员或者程序使用的控制面

功能

功能 发起方 接收方 方法 路由方式 描述
Messager Service Edge Publish EdgeID+Topic 必须Publish到具体的EdgeID,默认Topic为空,Edge调用Receive接收,接收处理完成后必须调用msg.Done()或msg.Error(err)保障消息一致性
Edge Service或外部MQ Publish Topic 必须Publish到Topic,由Frontier根据Topic选择某个Service或MQ
RPCer Service Edge Call EdgeID+Method 必须Call到具体的EdgeID,需要携带Method
Edge Service Call Method 必须Call到Method,由Frontier根据Method选择某个的Service
Multiplexer Service Edge OpenStream EdgeID 必须OpenStream到具体的EdgeID
Edge Service OpenStream ServiceName 必须OpenStream到ServiceName,该ServiceName由Service初始化时携带的service.OptionServiceName指定

主要遵守以下设计原则:

  1. 所有的消息、RPC和Stream都是点到点的传递
    • 从微服务到边缘,一定要指定边缘节点ID
    • 从边缘到微服务,Frontier根据Topic和Method路由,最终哈希选择一个微服务或外部MQ,默认根据edgeid哈希,你也可以选择randomsrcip
  2. 消息需要接收方明确结束
    • 为了保障消息的传达语义,接收方一定需要msg.Done()或msg.Error(err),保障传达一致性
  3. Multiplexer打开的流在逻辑上是微服务与边缘节点的直接通信
    • 对方接收到流后,所有在这个流上功能都会直达对方,不会经过Frontierd的路由策略

使用

示例

聊天室

目录examples/chatroom下有简单的聊天室示例,仅100行代码实现一个的聊天室功能,可以通过

make examples

在bin目录下得到chatroom_servicechatroom_egent可执行程序,运行示例:

chatroom-min.mov

在这个示例你可以看到上线离线通知,消息Publish等功能。

直播

目录examples/rtmp下有简单的直播示例,仅80行代码实现一个的直播代理功能,可以通过

make examples

在bin目录下得到rtmp_servicertmp_edge可执行程序,运行后,使用OBS连接rtmp_edge即可直播代理:

在这个示例你可以看到Multiplexer和Stream功能。

微服务如何使用

微服务侧获取Service

package main

import (
	"net"
	"github.com/singchia/frontier/api/dataplane/v1/service"
)

func main() {
	dialer := func() (net.Conn, error) {
		return net.Dial("tcp", "127.0.0.1:30011")
	}
	svc, err := service.NewService(dialer)
	// 开始使用service
}

微服务接收获取ID、上线/离线通知

package main

import (
	"context"
	"net"
	"github.com/singchia/frontier/api/dataplane/v1/service"
)

func main() {
	dialer := func() (net.Conn, error) {
		return net.Dial("tcp", "127.0.0.1:30011")
	}
	svc, _ := service.NewService(dialer)
	svc.RegisterGetEdgeID(context.TODO(), getID)
	svc.RegisterEdgeOnline(context.TODO(), online)
	svc.RegisterEdgeOffline(context.TODO(), offline)
}

// service可以根据meta分配id给edge
func getID(meta []byte) (uint64, error) {
	return 0, nil
}

// edge上线
func online(edgeID uint64, meta []byte, addr net.Addr) error {
	return nil
}

// edge离线
func offline(edgeID uint64, meta []byte, addr net.Addr) error {
	return nil
}

微服务发布消息到边缘节点

前提需要该Edge在线,否则会找不到Edge

package main

import (
	"context"
	"net"
	"github.com/singchia/frontier/api/dataplane/v1/service"
)

func main() {
	dialer := func() (net.Conn, error) {
		return net.Dial("tcp", "127.0.0.1:30011")
	}
	svc, _ := service.NewService(dialer)
	msg := svc.NewMessage([]byte("test"))
	// 发布一条消息到ID为1001的边缘节点
	err := svc.Publish(context.TODO(), 1001, msg)
	// ...
}

微服务声明接收Topic

package main

import (
	"context"
	"fmt"
	"io"
	"net"
	"github.com/singchia/frontier/api/dataplane/v1/service"
)

func main() {
	dialer := func() (net.Conn, error) {
		return net.Dial("tcp", "127.0.0.1:30011")
	}
	// 在获取svc时声明需要接收的topic
	svc, _ := service.NewService(dialer, service.OptionServiceReceiveTopics([]string{"foo"}))
	for {
		// 接收消息
		msg, err := svc.Receive(context.TODO())
		if err == io.EOF {
			// 收到EOF表示svc生命周期已终结,不可以再使用
			return
		}
		if err != nil {
			fmt.Println("receive err:", err)
			continue
		}
		// 处理完msg后,需要通知调用方已完成
		msg.Done()
	}
}

微服务调用边缘节点的RPC

package main

import (
	"context"
	"net"
	"github.com/singchia/frontier/api/dataplane/v1/service"
)

func main() {
	dialer := func() (net.Conn, error) {
		return net.Dial("tcp", "127.0.0.1:30011")
	}
	svc, _ := service.NewService(dialer)
	req := svc.NewRequest([]byte("test"))
	// 调用ID为1001边缘节点的foo方法,前提是边缘节点需要预注册该方法
	rsp, err := svc.Call(context.TODO(), 1001, "foo", req)
	// ...
}

微服务注册方法以供边缘节点调用

package main

import (
	"context"
	"net"
	"github.com/singchia/frontier/api/dataplane/v1/service"
	"github.com/singchia/geminio"
)

func main() {
	dialer := func() (net.Conn, error) {
		return net.Dial("tcp", "127.0.0.1:30011")
	}
	svc, _ := service.NewService(dialer)
	// 注册一个"echo"方法
	svc.Register(context.TODO(), "echo", echo)
	// ...
}

func echo(ctx context.Context, req geminio.Request, rsp geminio.Response) {
	value := req.Data()
	rsp.SetData(value)
}

微服务打开边缘节点的点到点流

package main

import (
	"context"
	"net"
	"github.com/singchia/frontier/api/dataplane/v1/service"
)

func main() {
	dialer := func() (net.Conn, error) {
		return net.Dial("tcp", "127.0.0.1:30011")
	}
	svc, _ := service.NewService(dialer)
	// 打开ID为1001边缘节点的新流(同时st也是一个net.Conn),前提是edge需要AcceptStream接收该流
	st, err := svc.OpenStream(context.TODO(), 1001)
}

基于这个新打开流,你可以用来传递文件、代理流量等。

微服务接收流

package main

import (
	"fmt"
	"io"
	"net"
	"github.com/singchia/frontier/api/dataplane/v1/service"
)

func main() {
	dialer := func() (net.Conn, error) {
		return net.Dial("tcp", "127.0.0.1:30011")
	}
	// 在获取svc时声明需要微服务名,在边缘打开流时需要指定该微服务名
	svc, _ := service.NewService(dialer, service.OptionServiceName("service-name"))
	for {
		st, err := svc.AcceptStream()
		if err == io.EOF {
			// 收到EOF表示svc生命周期已终结,不可以再使用
			return
		} else if err != nil {
			fmt.Println("accept stream err:", err)
			continue
		}
		// 使用stream,这个stream同时是个net.Conn,你可以Read/Write/Close,同时也可以RPC和消息
	}
}

基于这个新打开流,你可以用来传递文件、代理流量等。

消息、RPC和流一起来吧!

package main

import (
	"context"
	"fmt"
	"io"
	"net"
	"github.com/singchia/frontier/api/dataplane/v1/service"
	"github.com/singchia/geminio"
)

func main() {
	dialer := func() (net.Conn, error) {
		return net.Dial("tcp", "127.0.0.1:30011")
	}
	// 在获取svc时声明需要微服务名,在边缘打开流时需要指定该微服务名
	svc, _ := service.NewService(dialer, service.OptionServiceName("service-name"))

	// 接收流
	go func() {
		for {
			st, err := svc.AcceptStream()
			if err == io.EOF {
				// 收到EOF表示svc生命周期已终结,不可以再使用
				return
			} else if err != nil {
				fmt.Println("accept stream err:", err)
				continue
			}
			// 使用stream,这个stream同时是个net.Conn,你可以Read/Write/Close,同时也可以RPC和消息
		}
	}()

	// 注册一个"echo"方法
	svc.Register(context.TODO(), "echo", echo)

	// 接收消息
	for {
		msg, err := svc.Receive(context.TODO())
		if err == io.EOF {
			// 收到EOF表示svc生命周期已终结,不可以再使用
			return
		}
		if err != nil {
			fmt.Println("receive err:", err)
			continue
		}
		// 处理完msg后,需要通知调用方已完成
		msg.Done()
	}
}

func echo(ctx context.Context, req geminio.Request, rsp geminio.Response) {
	value := req.Data()
	rsp.SetData(value)
}

边缘节点/客户端如何使用

边缘节点侧获取Edge

package main

import (
	"net"
	"github.com/singchia/frontier/api/dataplane/v1/edge"
)

func main() {
	dialer := func() (net.Conn, error) {
		return net.Dial("tcp", "127.0.0.1:30012")
	}
	eg, _ := edge.NewEdge(dialer)
	// 开始使用eg ...
}

边缘节点发布消息到Topic

Service需要提前声明接收该Topic,或者在配置文件中配置外部MQ。

package main

import (
	"context"
	"net"
	"github.com/singchia/frontier/api/dataplane/v1/edge"
)

func main() {
	dialer := func() (net.Conn, error) {
		return net.Dial("tcp", "127.0.0.1:30012")
	}
	eg, _ := edge.NewEdge(dialer)
	// 开始使用eg
	msg := eg.NewMessage([]byte("test"))
	err := eg.Publish(context.TODO(), "foo", msg)
	// ...
}

边缘节点接收消息

package main

import (
	"context"
	"fmt"
	"io"
	"net"
	"github.com/singchia/frontier/api/dataplane/v1/edge"
)

func main() {
	dialer := func() (net.Conn, error) {
		return net.Dial("tcp", "127.0.0.1:30012")
	}
	eg, _ := edge.NewEdge(dialer)
	for {
		// 接收消息
		msg, err := eg.Receive(context.TODO())
		if err == io.EOF {
			// 收到EOF表示eg生命周期已终结,不可以再使用
			return
		}
		if err != nil {
			fmt.Println("receive err:", err)
			continue
		}
		// 处理完msg后,需要通知调用方已完成
		msg.Done()
	}
	// ...
}

边缘节点调用微服务RPC

package main

import (
	"context"
	"net"
	"github.com/singchia/frontier/api/dataplane/v1/edge"
)

func main() {
	dialer := func() (net.Conn, error) {
		return net.Dial("tcp", "127.0.0.1:30012")
	}
	eg, _ := edge.NewEdge(dialer)
	// 开始使用eg
	req := eg.NewRequest([]byte("test"))
	// 调用echo方法,Frontier会查找并转发请求到相应的微服务
	rsp, err := eg.Call(context.TODO(), "echo", req)
}

边缘节点注册RPC

package main

import (
	"context"
	"net"
	"github.com/singchia/frontier/api/dataplane/v1/edge"
	"github.com/singchia/geminio"
)

func main() {
	dialer := func() (net.Conn, error) {
		return net.Dial("tcp", "127.0.0.1:30012")
	}
	eg, _ := edge.NewEdge(dialer)
	// 注册一个"echo"方法
	eg.Register(context.TODO(), "echo", echo)
	// ...
}

func echo(ctx context.Context, req geminio.Request, rsp geminio.Response) {
	value := req.Data()
	rsp.SetData(value)
}

边缘节点打开微服务的点到点流

package main

import (
	"net"
	"github.com/singchia/frontier/api/dataplane/v1/edge"
)

func main() {
	dialer := func() (net.Conn, error) {
		return net.Dial("tcp", "127.0.0.1:30012")
	}
	eg, _ := edge.NewEdge(dialer)
	st, err := eg.OpenStream("service-name")
	// ...
}

基于这个新打开流,你可以用来传递文件、代理流量等。

边缘节点接收流

package main

import (
	"net"
	"fmt"
	"io"
	"github.com/singchia/frontier/api/dataplane/v1/edge"
)

func main() {
	dialer := func() (net.Conn, error) {
		return net.Dial("tcp", "127.0.0.1:30012")
	}
	eg, _ := edge.NewEdge(dialer)
	for {
		stream, err := eg.AcceptStream()
		if err == io.EOF {
			// 收到EOF表示eg生命周期已终结,不可以再使用
			return
		} else if err != nil {
			fmt.Println("accept stream err:", err)
			continue
		}
		// 使用stream,这个stream同时是个net.Conn,你可以Read/Write/Close,同时也可以RPC和消息
	}
}

错误处理

错误 描述和处理
io.EOF 收到EOF表示流或连接已关闭,需要退出Receive、AcceptStream等操作
io.ErrShortBuffer 发送端或者接收端的Buffer已满,可以设置OptionServiceBufferSize或OptionEdgeBufferSize来调整
apis.ErrEdgeNotOnline 表示该边缘节点不在线,需要检查边缘连接
apis.ServiceNotOnline 表示微服务不在线,需要检查微服务连接信息或连接
apis.RPCNotOnline 表示Call的RPC不在线
apis.TopicOnline 表示Publish的Topic不在线
其他错误 还存在Timeout、BufferFull等

需要注意的是,如果关闭流,在流上正在阻塞的方法都会立即得到io.EOF,如果关闭入口(Service和Edge),则所有在此之上的流,阻塞的方法都会立即得到io.EOF。

控制面

Frontier控制面提供gRPC和Rest接口,运维人员可以使用这些api来确定本实例的连接情况,gRPC和Rest都由默认端口:30010提供服务。

GRPC 详见Protobuf定义

service ControlPlane {
    rpc ListEdges(ListEdgesRequest) returns (ListEdgesResponse);
    rpc GetEdge(GetEdgeRequest) returns (Edge);
    rpc KickEdge(KickEdgeRequest) returns (KickEdgeResponse);
    rpc ListEdgeRPCs(ListEdgeRPCsRequest) returns (ListEdgeRPCsResponse);
    rpc ListServices(ListServicesRequest) returns (ListServicesResponse);
    rpc GetService(GetServiceRequest) returns (Service);
    rpc KickService(KickServiceRequest) returns (KickServiceResponse);
    rpc ListServiceRPCs(ListServiceRPCsRequest) returns (ListServiceRPCsResponse);
    rpc ListServiceTopics(ListServiceTopicsRequest) returns (ListServiceTopicsResponse);
}

REST Swagger详见Swagger定义

例如你可以使用下面请求来踢除某个边缘节点下线:

curl -X DELETE http://127.0.0.1:30010/v1/edges/{edge_id} 

或查看某个微服务注册了哪些RPC:

curl -X GET http://127.0.0.1:30010/v1/services/rpcs?service_id={service_id}

注意:gRPC/Rest依赖dao backend,有两个选项buntdbsqlite,都是使用的in-memory模式,为性能考虑,默认backend使用buntdb,并且列表接口返回字段count永远是-1,当你配置backend为sqlite3时,会认为你对在Frontier上连接的微服务和边缘节点有强烈的OLTP需求,例如在Frontier上封装web,此时count才会返回总数。

Frontier配置

如果需要更近一步定制你的Frontier实例,可以在这一节了解各个配置是如何工作的。定制完你的配置,保存为frontier.yaml,挂载到容器/usr/conf/frontier.yaml位置生效。

最小化配置

简单起,你可以仅配置面向微服务和边缘节点的服务监听地址:

# 微服务端配置
servicebound:
  # 监听网络
  listen:
    network: tcp
    # 监听地址
    addr: 0.0.0.0:30011
# 边缘节点端配置
edgebound:
  # 监听网络
  listen:
    network: tcp
    # 监听地址
    addr: 0.0.0.0:30012
  # 找不到注册的GetEdgeID时,是否允许Frontier分配edgeID
  edgeid_alloc_when_no_idservice_on: true

TLS

对于用户来说,比较重要的TLS配置在微服务、边缘节点和控制面都是支持的,另支持mTLS,Frontier由此校验客户端携带的证书。

servicebound:
  listen:
    addr: 0.0.0.0:30011
    network: tcp
    tls:
      # 是否开启TLS,默认不开启
      enable: false
      # 证书和私钥,允许配置多对证书,由客户端协商确定
      certs:
      - cert: servicebound.cert
        key: servicebound.key
      # 是否启用mtls,启动会校验客户端携带的证书是否由下面的CA签发
      mtls: false
      # CA证书,用于校验客户端证书
      ca_certs:
      - ca1.cert
edgebound:
  listen:
    addr: 0.0.0.0:30012
    network: tcp
    tls:
      # 是否开启TLS,默认不开启
      enable: false
      # 证书和私钥,允许配置多对证书,由客户端协商确定
      certs:
      - cert: edgebound.cert
        key: edgebound.key
      insecure_skip_verify: false
      # 是否启用mtls,启动会校验客户端携带的证书是否由下面的CA签发
      mtls: false
      # CA证书,用于校验客户端证书
      ca_certs:
      - ca1.cert

外部MQ

如果你需要配置外部MQ,Frontier也支持将相应的Topic转Publish到这些MQ。

AMQP

mqm:
  amqp:
    # 是否允许
    enable: false
    # AMQP地址
    addrs: null
    # 生产者
    producer:
       # exchange名
      exchange: ""
      # 等于Frontier内Topic的概念,数组值
      routing_keys: null

对于AMQP来说,以上是最小配置,边缘节点Publish的消息Topic如果在routing_keys内,Frontier会Publish到exchange中,如果还有微服务或其他外部MQ也声明了该Topic,Frontier仍然会按照hashby来选择一个Publish。

Kafka

mqm:
  kafka:
    # 是否允许
    enable: false
    # kafka地址
    addrs: null
    # 生产者
    producer:
       # 数组值
      topics: null

对于Kafka来说,以上是最小配置,边缘节点Publish的消息Topic如果在上面数组中,Frontier会Publish过来。如果还有微服务或其他外部MQ也声明了该Topic,Frontier仍然会按照hashby来选择一个Publish。

NATS

mqm:
  nats:
    # 是否允许
    enable: false
    # NATS地址
    addrs: null
    producer:
      # 等于Frontier内Topic的概念,数组值
      subjects: null
    # 如果允许jetstream,会优先Publish到jetstream
    jetstream:
      enable: false
      # jetstream名
      name: ""
      producer:
        # 等于Frontier内Topic的概念,数组值
        subjects: null

NATS配置里,如果允许Jetstream,会优先使用Publish到Jetstream。如果还有微服务或其他外部MQ也声明了该Topic,Frontier仍然会按照hashby来选择一个Publish。

NSQ

mqm:
  nsq:
    # 是否允许
    enable: false
    # NSQ地址
    addrs: null
    producer:
      # 数组值
      topics: null

NSQ的Topic里,如果还有微服务或其他外部MQ也声明了该Topic,Frontier仍然会按照hashby来选择一个Publish。

Redis

mqm:
  redis:
    # 是否允许
    enable: false
    # Redis地址
    addrs: null
    # Redis DB
    db: 0
    # 密码
    password: ""
    producer:
      # 等于Frontier内Topic的概念,数组值
      channels: null

如果还有微服务或其他外部MQ也声明了该Topic,Frontier仍然会按照hashby来选择一个Publish。

其他配置

daemon:
  # 是否开启PProf
  pprof:
    addr: 0.0.0.0:6060
    cpu_profile_rate: 0
    enable: true
  # 资源限制
  rlimit:
    enable: true
    nofile: 102400
  # 控制面开启
controlplane:
  enable: false
  listen:
    network: tcp
    addr: 0.0.0.0:30010
dao:
  # 支持buntdb和sqlite3,都使用的in-memory模式,保持无状态
  backend: buntdb
  # sqlite debug开启
  debug: false
exchange:
  # Frontier根据edgeid srcip或random的哈希策略转发边缘节点的消息、RPC和打开流到微服务,默认edgeid
  # 即相同的边缘节点总是会请求到相同的微服务。
  hashby: edgeid

更多详细配置见 frontier_all.yaml

Frontier部署

在单Frontier实例下,可以根据环境选择以下方式部署你的Frontier实例。

docker

docker run -d --name frontier -p 30011:30011 -p 30012:30012 singchia/frontier:1.1.0

docker-compose

git clone https://github.com/singchia/frontier.git
cd dist/compose
docker-compose up -d frontier

helm

如果你是在k8s环境下,可以使用helm快速部署一个实例

git clone https://github.com/singchia/frontier.git
cd dist/helm
helm install frontier ./ -f values.yaml

你的微服务应该连接service/frontier-servicebound-svc:30011,你的边缘节点可以连接:30012所在的NodePort。

operator

见下面集群部署章节

集群

Frontier + Frontlas架构

新增Frontlas组件用于构建集群,Frontlas同样也是无状态组件,并不在内存里留存其他信息,因此需要额外依赖Redis,你需要提供一个Redis连接信息给到Frontlas,支持 redis sentinelredis-cluster

  • Frontier:微服务和边缘数据面通信组件
  • Frontlas:命名取自Frontier Atlas,集群管理组件,将微服务和边缘的元信息、活跃信息记录在Redis里

Frontier需要主动连接Frontlas以上报自己、微服务和边缘的活跃和状态,默认Frontlas的端口是:

  • :40011 提供给微服务连接,代替微服务在单Frontier实例下连接的30011端口
  • :40012 提供给Frontier连接,上报状态

你可以根据需要部署任意多个Frontier实例,而对于Frontlas,分开部署两个即可保障HA(高可用),因为不存储状态没有一致性问题。

配置

Frontier的frontier.yaml需要添加如下配置:

frontlas:
  enable: true
  dial:
    network: tcp
    addr:
      - 127.0.0.1:40012
    tls:
  metrics:
    enable: false
    interval: 0
daemon:
  # Frontier集群内的唯一ID
  frontier_id: frontier01

Frontier需要连接Frontlas,用来上报自己、微服务和边缘的活跃和状态。

Frontlas的frontlas.yaml最小化配置:

control_plane:
  listen:
    # 微服务改连接这个地址,用来发现集群的边缘节点所在的Frontier
    network: tcp
    addr: 0.0.0.0:40011
frontier_plane:
  # Frontier连接这个地址
  listen:
    network: tcp
    addr: 0.0.0.0:40012
  expiration:
    # 微服务在redis内元信息的过期时间
    service_meta: 30
    # 边缘节点在redis内元信息的过期时间
    edge_meta: 30
redis:
  # 支持连接standalone、sentinel和cluster
  mode: standalone
  standalone:
    network: tcp
    addr: redis:6379
    db: 0

更多详细配置见 frontlas_all.yaml

使用

由于使用Frontlas来发现可用的Frontier,因此微服务需要做出调整如下:

微服务获取Service

package main

import (
	"net"
	"github.com/singchia/frontier/api/dataplane/v1/service"
)

func main() {
	// 改使用NewClusterService来获取Service
	svc, err := service.NewClusterService("127.0.0.1:40011")
	// 开始使用service,其他一切保持不变
}

边缘节点获取连接地址

对于边缘节点来说,依然连接Frontier,不过可以从Frontlas来获取可用的Frontier地址,Frontlas提供了列举Frontier实例接口:

curl -X http://127.0.0.1:40011/cluster/v1/frontiers

你可以在这个接口上封装一下,提供给边缘节点做负载均衡或者高可用,或加上mTLS直接提供给边缘节点(不建议)。

控制面GRPC 详见Protobuf定义

Frontlas控制面与Frontier不同,是面向集群的控制面,目前只提供了读取集群的接口

service ClusterService {
    rpc GetFrontierByEdge(GetFrontierByEdgeIDRequest) returns (GetFrontierByEdgeIDResponse);
    rpc ListFrontiers(ListFrontiersRequest) returns (ListFrontiersResponse);

    rpc ListEdges(ListEdgesRequest) returns (ListEdgesResponse);
    rpc GetEdgeByID(GetEdgeByIDRequest) returns (GetEdgeByIDResponse);
    rpc GetEdgesCount(GetEdgesCountRequest) returns (GetEdgesCountResponse);

    rpc ListServices(ListServicesRequest) returns (ListServicesResponse) ;
    rpc GetServiceByID(GetServiceByIDRequest) returns (GetServiceByIDResponse) ;
    rpc GetServicesCount(GetServicesCountRequest) returns (GetServicesCountResponse) ;
}

k8s

Operator

安装CRD和Operator

按照以下步骤安装和部署Operator到你的.kubeconfig环境中:

git clone https://github.com/singchia/frontier.git
cd dist/crd
kubectl apply -f install.yaml

查看CRD:

kubectl get crd frontierclusters.frontier.singchia.io

查看Operator:

kubectl get all -n frontier-system

FrontierCluster集群

apiVersion: frontier.singchia.io/v1alpha1
kind: FrontierCluster
metadata:
  labels:
    app.kubernetes.io/name: frontiercluster
    app.kubernetes.io/managed-by: kustomize
  name: frontiercluster
spec:
  frontier:
    # 单实例Frontier
    replicas: 2
    # 微服务侧端口
    servicebound:
      port: 30011
    # 边缘节点侧端口
    edgebound:
      port: 30012
  frontlas:
    # 单实例Frontlas
    replicas: 1
    # 控制面端口
    controlplane:
      port: 40011
    redis:
      # 依赖的Redis配置
      addrs:
        - rfs-redisfailover:26379
      password: your-password
      masterName: mymaster
      redisType: sentinel

保存为frontiercluster.yaml,执行

kubectl apply -f frontiercluster.yaml

1分钟,你即可拥有一个2实例Frontier+1实例Frontlas的集群。

通过一下来检查资源部署情况

kubectl get all -l app=frontiercluster-frontier
kubectl get all -l app=frontiercluster-frontlas

NAME                                           READY   STATUS    RESTARTS   AGE
pod/frontiercluster-frontier-57d565c89-dn6n8   1/1     Running   0          7m22s
pod/frontiercluster-frontier-57d565c89-nmwmt   1/1     Running   0          7m22s
NAME                                       TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)          AGE
service/frontiercluster-edgebound-svc      NodePort    10.233.23.174   <none>        30012:30012/TCP  8m7s
service/frontiercluster-servicebound-svc   ClusterIP   10.233.29.156   <none>        30011/TCP        8m7s
NAME                                       READY   UP-TO-DATE   AVAILABLE   AGE
deployment.apps/frontiercluster-frontier   2/2     2            2           7m22s
NAME                                                 DESIRED   CURRENT   READY   AGE
replicaset.apps/frontiercluster-frontier-57d565c89   2         2         2       7m22s
NAME                                            READY   STATUS    RESTARTS   AGE
pod/frontiercluster-frontlas-85c4fb6d9b-5clkh   1/1     Running   0          8m11s
NAME                                   TYPE        CLUSTER-IP    EXTERNAL-IP   PORT(S)               AGE
service/frontiercluster-frontlas-svc   ClusterIP   10.233.0.23   <none>        40011/TCP,40012/TCP   8m11s
NAME                                       READY   UP-TO-DATE   AVAILABLE   AGE
deployment.apps/frontiercluster-frontlas   1/1     1            1           8m11s
NAME                                                  DESIRED   CURRENT   READY   AGE
replicaset.apps/frontiercluster-frontlas-85c4fb6d9b   1         1         1       8m11s

你的微服务应该连接service/frontiercluster-frontlas-svc:40011,你的边缘节点可以连接:30012所在的NodePort。

开发

路线图

详见 ROADMAP

贡献

如果你发现任何Bug,请提出Issue,项目Maintainers会及时响应相关问题。

如果你希望能够提交Feature,更快速解决项目问题,满足以下简单条件下欢迎提交PR:

  • 代码风格保持一致
  • 每次提交一个Feature
  • 提交的代码都携带单元测试

测试

流功能测试

群组

添加以加入微信群组

许可证

Released under the Apache License 2.0


已经看到这里,点个Star⭐️吧♥️

frontier's People

Contributors

singchia avatar raysuffern avatar dependabot[bot] avatar

Stargazers

abc avatar leipin avatar  avatar  avatar Vincent avatar  avatar Chris avatar  avatar  avatar Joseph Wyatt avatar Mr. Pieixoto avatar Kirsten Price avatar Chong Zhuang avatar Mia Li avatar Hacker Ranker avatar Sunpin Han avatar Ether Line avatar  avatar Aryan Sjet avatar Here There Be Dragon ⏬ avatar Zhao Xing avatar iacker avatar Stable Era avatar Sindre Sorhus avatar 元気 Cho avatar  avatar Haylie Wu avatar The Analyst avatar 刘译蓬 avatar  avatar Jack Mu avatar  avatar 吾道一以貫之 avatar 在海右岸与Holiday小姐 avatar 安琪 avatar  avatar GuaGuaBear avatar Cynthia Xin avatar Dai Qianqian (戴茜茜) avatar MinerProxy avatar 春健 avatar Wenyun Pei avatar Paul Miller avatar  avatar Karina Zhao avatar  avatar James Xue avatar Winter Cao avatar  avatar 加藤馬里奧 avatar Ur Omicron avatar Lisa Liang avatar Cloudy 666 avatar Mar1o avatar Hailong Ou avatar  avatar 许寰哥 avatar Antonio Fang avatar ポケモンマスター avatar  avatar 薄荷猫 avatar  avatar Xuper avatar 霎弼海龍 avatar Maynor avatar Dodge (Lang HE) avatar charlieJ avatar Ztian Ele avatar Alexandria Holden avatar ZNA avatar 马志宇 avatar James avatar yycc-move avatar  avatar  avatar  avatar Carabinieri avatar 香港董先森 avatar Unprocessable Man avatar  avatar Suzhou Blue Water avatar Keiichi Nitta avatar Zeus Kataoka avatar Atwood Hui avatar Eric avatar DFINITY avatar xy avatar 0xn0ne avatar 东写西读 avatar  avatar Andre Ganguly avatar Johnny Chew avatar yanzx avatar snowflowersnowflake avatar Sweet Devil avatar Mike avatar David Brown avatar  avatar Nana Mao~毛娜娜~ avatar Galaski Wu avatar

Watchers

 avatar

frontier's Issues

bug: panic in synchub while geminio dialogue_mgr close

The frontier log:

...
I0302 03:13:21.296399       1 service_onoff.go:146] service heartbeat, serviceID: 7341594434138658577, meta: {"Service":"foo","Topics":null}, addr: 192.168.65.1:27945
I0302 03:13:21.296414       1 klog.go:63] send heartbeat ack succeed, clientID: 7341594434138658577, PacketID: 7341598918725857984, packetType: heartbeat ack packet
I0302 03:13:21.296423       1 klog.go:53] conn write down, clientID: 7341594434138658577, packetID: 7341598918725857984, packetType: heartbeat ack packet
I0302 03:13:28.142744       1 klog.go:53] read heartbeat packet , clientID: 1709348163318381, packetID: 7341598945943478461, packetType: heartbeat packet
I0302 03:13:28.142868       1 klog.go:53] conn read in packet, clientID: 1709348163318381, packetID: 7341598945943478461, packetType: heartbeat packet
I0302 03:13:28.142919       1 edge_onoff.go:144] edge heartbeat, edgeID: 1709348163318381, meta: test, addr: 192.168.65.1:22270
I0302 03:13:28.142942       1 klog.go:63] send heartbeat ack succeed, clientID: 1709348163318381, PacketID: 7341598945943478461, packetType: heartbeat ack packet
I0302 03:13:28.142958       1 klog.go:53] conn write down, clientID: 1709348163318381, packetID: 7341598945943478461, packetType: heartbeat ack packet
I0302 03:13:43.495965       1 klog.go:53] read heartbeat packet , clientID: 7341594434138658577, packetID: 7341599013215138498, packetType: heartbeat packet
I0302 03:13:43.496085       1 klog.go:53] conn read in packet, clientID: 7341594434138658577, packetID: 7341599013215138498, packetType: heartbeat packet
I0302 03:13:43.496103       1 service_onoff.go:146] service heartbeat, serviceID: 7341594434138658577, meta: {"Service":"foo","Topics":null}, addr: 192.168.65.1:27945
I0302 03:13:43.496114       1 klog.go:63] send heartbeat ack succeed, clientID: 7341594434138658577, PacketID: 7341599013215138498, packetType: heartbeat ack packet
I0302 03:13:43.496119       1 klog.go:53] conn write down, clientID: 7341594434138658577, packetID: 7341599013215138498, packetType: heartbeat ack packet
I0302 03:13:50.392022       1 klog.go:53] read heartbeat packet , clientID: 1709348163318381, packetID: 7341599040432758975, packetType: heartbeat packet
I0302 03:13:50.392123       1 klog.go:53] conn read in packet, clientID: 1709348163318381, packetID: 7341599040432758975, packetType: heartbeat packet
I0302 03:13:50.392142       1 edge_onoff.go:144] edge heartbeat, edgeID: 1709348163318381, meta: test, addr: 192.168.65.1:22270
I0302 03:13:50.392150       1 klog.go:63] send heartbeat ack succeed, clientID: 1709348163318381, PacketID: 7341599040432758975, packetType: heartbeat ack packet
I0302 03:13:50.392155       1 klog.go:53] conn write down, clientID: 1709348163318381, packetID: 7341599040432758975, packetType: heartbeat ack packet
I0302 03:13:59.422907       1 klog.go:63] conn read down err: EOF, clientID: 1709348163318381
I0302 03:13:59.423105       1 klog.go:63] handle pkt done, clientID: 1709348163318381
I0302 03:13:59.423122       1 edge_onoff.go:121] edge offline, edgeID: 1709348163318381, meta: test, addr: 192.168.65.1:22270
I0302 03:13:59.423142       1 edge_onoff.go:66] edge offline, edgeID: 1709348163318381, remote addr: 192.168.65.1:22270
I0302 03:13:59.424386       1 klog.go:53] stream write in packet, clientID: 7341594434138658577, dialogueID: 1, packetID: 7341599082294418490, packetType: request packet
I0302 03:13:59.424413       1 klog.go:53] dialogue write in packet, clientID: 7341594434138658577, dialogueID: 1, packetID: 7341599082294418490, packetType: request packet
I0302 03:13:59.424422       1 klog.go:53] dialogue write data down succeed, clientID: 7341594434138658577, dialogueID: 1, packetID: 7341599082294418490
I0302 03:13:59.424430       1 klog.go:53] dialogue write down, clientID: 7341594434138658577, dialogueID: 1, packetID: 7341599082294418490, packetType: request packet
I0302 03:13:59.424441       1 klog.go:53] send data succeed, clientID: 7341594434138658577, packetID: 7341599082294418490, remote: 192.168.65.1:27945, meta: {"Service":"foo","Topics":null}
I0302 03:13:59.424453       1 klog.go:53] conn write down, clientID: 7341594434138658577, packetID: 7341599082294418490, packetType: request packet
I0302 03:13:59.432420       1 klog.go:53] read response packet , clientID: 7341594434138658577, packetID: 7341599082294418490, packetType: response packet
I0302 03:13:59.432940       1 klog.go:53] conn read in packet, clientID: 7341594434138658577, packetID: 7341599082294418490, packetType: response packet
I0302 03:13:59.432962       1 klog.go:53] read to dialogue, clientID: 7341594434138658577, dialogueID: 1, packetID: 7341599082294418490, packetType response packet
I0302 03:13:59.432972       1 klog.go:53] dialogue read in packet, clientID: 7341594434138658577, dialogueID: 1, packetID: 7341599082294418490, packetType: response packet
I0302 03:13:59.432980       1 klog.go:53] stream read in packet, clientID: 7341594434138658577, dialogueID: 1, packetID: 7341599082294418490, packetType: response packet
I0302 03:13:59.433020       1 klog.go:53] read response packet, clientID: 7341594434138658577, dialogueID: 1, packetID: 7341599082294418490, packetType: response packet
I0302 03:13:59.433037       1 klog.go:63] client finishing, clientID: 1709348163318381, remote: 192.168.65.1:22270, meta: test
I0302 03:13:59.433448       1 klog.go:63] client finished, clientID: 1709348163318381, remote: 192.168.65.1:22270, meta: test
I0302 03:13:59.433496       1 klog.go:63] conn write done, clientID: 1709348163318381
I0302 03:13:59.433521       1 klog.go:63] dialogue mgr read done, clientID: 1709348163318381
I0302 03:13:59.433528       1 klog.go:63] dialogue manager finishing, clientID: 1709348163318381
I0302 03:13:59.433542       1 klog.go:63] dialogue manager finished, clientID: 1709348163318381
I0302 03:13:59.433549       1 klog.go:63] dialogue handle pkt done, clientID: 1709348163318381, dialogueID: 7341594503324617689
I0302 03:13:59.433555       1 klog.go:63] dialogue offline, clientID: 1709348163318381, del dialogueID: 7341594503324617689
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x80 pc=0x7eacc0]

goroutine 45 [running]:
github.com/singchia/geminio/multiplexer.(*dialogue).DialogueID(0x2?)
	github.com/singchia/[email protected]/multiplexer/dialogue.go:183
github.com/singchia/geminio/application.(*End).ClosedDialogue(0xc0001233e0, {0xa93758?, 0x0?})
	github.com/singchia/[email protected]/application/end.go:267 +0x2a
github.com/singchia/geminio/server.new.func2({0xa93758?, 0x0?})
	github.com/singchia/[email protected]/server/end.go:75 +0x3d
github.com/singchia/geminio/multiplexer.(*dialogueMgr).DialogueOffline(0xc0002530a0, {0xa91200, 0xc000310900})
	github.com/singchia/[email protected]/multiplexer/dialogue_mgr.go:219 +0x26b
github.com/singchia/geminio/multiplexer.(*dialogue).handlePkt(0xc000310900)
	github.com/singchia/[email protected]/multiplexer/dialogue.go:380 +0x967
created by github.com/singchia/geminio/multiplexer.NewDialogue in goroutine 42
	github.com/singchia/[email protected]/multiplexer/dialogue.go:165 +0x478

The iclm_service log:

>>>
> online, edgeID: 1709348163318381, addr: 192.168.65.1:22270
>>> help
the command-line protocol
	1. close
	2. quit
	3. open {clientID}
	4. close {streamID}
	5. switch {streamID}
	6. publish {msg} #note to switch to stream first
	7. publish {clientId} {msg}
	8. call {method} {req} #note to switch to stream first
	9. call {clientId} {method} {req}
>>> open 1709348163318381
> open stream success: 1709348163318381 7341594501316978202
>>> switch 7341594501316978202
> swith stream success: 7341594501316978202
[ 7341594501316978202] >>> publish a
> publish err: mismatch streamID
[ 7341594501316978202] >>> publish a
> publish err: mismatch streamID
[ 7341594501316978202] >>>
> offline, edgeID: 1709348163318381, addr: 192.168.65.1:22270
>>> panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x38 pc=0x1292d39]

goroutine 38 [running]:
github.com/jumboframes/armorigo/synchub.(*SyncHub).Add(0x0, {0x1300780?, 0xc00033ce10}, {0xc00004d6e8, 0x1, 0xc00004d6b8?})
	github.com/jumboframes/[email protected]/synchub/synchub.go:190 +0x179
github.com/jumboframes/armorigo/synchub.(*SyncHub).New(...)
	github.com/jumboframes/[email protected]/synchub/synchub.go:175
github.com/singchia/geminio/multiplexer.(*dialogue).CloseWait.func1()
	github.com/singchia/[email protected]/multiplexer/dialogue.go:645 +0xc5
github.com/singchia/geminio/pkg/sync.(*Once).doSlow(0xc00004d768?, 0x1040f20?)
	github.com/singchia/[email protected]/pkg/sync/once.go:78 +0xbf
github.com/singchia/geminio/pkg/sync.(*Once).Do(...)
	github.com/singchia/[email protected]/pkg/sync/once.go:66
github.com/singchia/geminio/multiplexer.(*dialogue).CloseWait(0x12d7130?)
	github.com/singchia/[email protected]/multiplexer/dialogue.go:643 +0x3f
github.com/singchia/geminio/multiplexer.(*dialogueMgr).Close.func1(0xc0003b0000?)
	github.com/singchia/[email protected]/multiplexer/dialogue_mgr.go:428 +0x49
created by github.com/singchia/geminio/multiplexer.(*dialogueMgr).Close in goroutine 53
	github.com/singchia/[email protected]/multiplexer/dialogue_mgr.go:426 +0x1bb

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.