Giter Club home page Giter Club logo

sofa-bolt-node's Introduction


Bolt 协议 Nodejs 实现版本

NPM version build status Test coverage David deps Known Vulnerabilities npm download


SOFABoltNode 是 SOFABolt 的 Nodejs 实现,它包含了 Bolt 通讯层协议框架,以及 RPC 应用层协议定制。和 Java 版本略有不同的是,它并不包含基础通讯功能(连接管理、心跳、自动重连等等),这些功能会放到专门的 RPC 模块里实现。

二、Bolt 通信层协议设计

Bolt 协议是一个标准的通讯层协议,目前包含两个大版本,定义如下:

V1 版本

Request command protocol for v1
0     1     2           4           6           8          10           12          14         16
|proto| type| cmdcode   |ver2 |   requestId           |codec|        timeout        |  classLen |
|headerLen  | contentLen            |                             ... ...                       |
+-----------+-----------+-----------+                                                                                               +
|               className + header  + content  bytes                                            |
+                                                                                               +
|                               ... ...                                                         |

Response command protocol for v1
0     1     2     3     4           6           8          10           12          14         16
|proto| type| cmdcode   |ver2 |   requestId           |codec|respstatus |  classLen |headerLen  |
| contentLen            |                  ... ...                                              |
+-----------------------+                                                                       +
|                          header  + content  bytes                                             |
+                                                                                               +
|                               ... ...                                                         |

V2 版本

Request command protocol for v2
0     1     2           4           6           8          10     11     12          14         16
|proto| ver1|type | cmdcode   |ver2 |   requestId           |codec|switch|   timeout             |
|classLen   |headerLen  |contentLen             |           ...                                  |
+-----------+-----------+-----------+-----------+                                                +
|               className + header  + content  bytes                                             |
+                                                                                                +
|                               ... ...                                  | CRC32(optional)       |

Response command protocol for v2
0     1     2     3     4           6           8          10     11    12          14          16
|proto| ver1| type| cmdcode   |ver2 |   requestId           |codec|switch|respstatus |  classLen |
|headerLen  | contentLen            |                      ...                                   |
+-----------------------------------+                                                            +
|               className + header  + content  bytes                                             |
+                                                                                                +
|                               ... ...                                  | CRC32(optional)       |

V2 相比 V1 版本,主要两点改进:

  1. 增加了协议版本号(ver1)
  2. 协议层面支持了数据包的 CRC32 校验(后面详细介绍)


  • proto: 协议标识位,bolt v1 是 0x01,bolt v2 是 0x02
  • ver1: bolt 协议版本,从 v2 开始 proto 不会再变,升级只变这个版本号
  • type: request/response/request oneway
  • cmdcode: request/response/heartbeat,和 type 有交叉
  • ver2: 应用层协议的版本(暂时没用)
  • requestId: 数据包唯一标识 id
  • codec: body 序列化方式,目前支持 hessian/hessian2/protobuf
  • switch: 是否开启 crc32 校验
  • headerLen: 自定义头部长度
  • contentLen: 内容长度
  • CRC32: 整个数据包通过计算出的 crc32 值(ver1 > 1 时支持)


基本 RPC 调用功能


'use strict';

const net = require('net');
const pump = require('pump');
const protocol = require('sofa-bolt-node');

const options = {
  sentReqs: new Map(),
const socket = net.connect(12200, '');
const encoder = protocol.encoder(options);
const decoder = protocol.decoder(options);

socket.once('connect', () => {
socket.once('close', () => {
socket.once('error', err => {

// 流式 API
pump(encoder, socket, decoder, err => {

// 监听 response / heartbeat_acl
decoder.on('response', res => {
decoder.on('heartbeat_ack', res => {

// 发送 RPC 请求
encoder.writeRequest(1, {
  args: [{
    $class: 'java.lang.String',
    $: 'peter',
  serverSignature: '',
  methodName: 'sayHello',
  timeout: 3000,

// 发送心跳包
encoder.writeHeartbeat(2, { clientUrl: 'xxx' });


'use strict';

const net = require('net');
const pump = require('pump');
const protocol = require('sofa-bolt-node');

const server = net.createServer(socket => {
  const options = {
    sentReqs: new Map(),
  const encoder = protocol.encoder(options);
  const decoder = protocol.decoder(options);
  pump(encoder, socket, decoder, err => {

  decoder.on('request', req => {
    encoder.writeResponse(req, {
      isError: false,
      appResponse: {
        $class: 'java.lang.String',
        $: `hello ${[0]} !`,
  decoder.on('heartbeat', hb => {



目前推荐的序列化方式是 protobuf,因为它跨语言性做得比较好。在蚂蚁内部其实我们主要使用的是 hessian 序列化,后面我们会陆续开源关于它的一系列最佳实践,尽请期待。下面我们演示一个 pb 的 demo

通过 *.proto 文件定义接口

syntax = "proto3";


// 可选
option java_multiple_files = false;

service ProtoService {
  rpc echoObj (EchoRequest) returns (EchoResponse) {}

message EchoRequest {
  string name = 1;
  Group group = 2;

message EchoResponse {
  int32 code = 1;
  string message = 2;

enum Group {
  A = 0;
  B = 1;

客户端使用 protobuf

'use strict';

const net = require('net');
const path = require('path');
const pump = require('pump');
const protocol = require('sofa-bolt-node');
const protobuf = require('antpb');

// 存放 *.proto 文件的目录,加载 proto
const protoPath = path.join(__dirname, 'proto');
const proto = protobuf.loadAll(protoPath);

// 将 proto 作为参数传入 encoder/decoder
const sentReqs = new Map();
const encoder = protocol.encoder({ sentReqs, proto });
const decoder = protocol.decoder({ sentReqs, proto });

const socket = net.connect(12200, '');
socket.once('connect', () => {
socket.once('close', () => {
socket.once('error', err => {
pump(encoder, socket, decoder, err => {

// 指定序列化方式为 protobuf
encoder.codecType = 'protobuf';

const req = {
  serverSignature: '',
  methodName: 'echoObj',
  args: [{
    name: 'peter',
    group: 'B',
  timeout: 3000,

decoder.on('response', res => {

// 记录请求、发送请求
sentReqs.set(1, { req });
encoder.writeRequest(1, req);

服务端使用 protobuf

'use strict';

const net = require('net');
const path = require('path');
const pump = require('pump');
const protocol = require('sofa-bolt-node');
const protobuf = require('antpb');

const protoPath = path.join(__dirname, 'proto');
const proto = protobuf.loadAll(protoPath);

const server = net.createServer(socket => {
  const options = {
    sentReqs: new Map(),
  const encoder = protocol.encoder(options);
  const decoder = protocol.decoder(options);
  pump(encoder, socket, decoder, err => {

  decoder.on('request', req => {
    const reqData =[0].toObject({ enums: String });;
    encoder.writeResponse(req, {
      isError: false,
      appResponse: {
        code: 200,
        message: 'hello ' + + ', you are in ' +,

  decoder.on('heartbeat', hb => {


CRC32 校验

RPC 在网络传输过程中可能会遇到各种各样奇葩的问题,导致二进制被篡改,如果这个接口是和钱有关的,就可能导致资损,所以 Bolt 协议层面引入了一个校验功能,当开启时会在整个数据包后面额外传输 4 个 bytes 是数据包计算出来的 CRC32 值,接收端收到数据包以后先在本地重新计算 CRC32 值然后和附带的值比对,一致继续处理,不一致则直接报错

该功能由客户端开启,但是开启之前一般有一个协商的过程,服务端通过协商告诉客户端它支持 crc32 校验

'use strict';

const net = require('net');
const pump = require('pump');
const protocol = require('sofa-bolt-node');

const options = {
  sentReqs: new Map(),

const socket = net.connect(12200, '');
const encoder = protocol.encoder(options);
const decoder = protocol.decoder(options);
pump(encoder, socket, decoder);

// 客户端开启 crc 校验
encoder.protocolType = 'bolt2'; // v2 版本以上才支持 crc 校验
encoder.boltVersion = 2;
encoder.crcEnable = true;

// 发送
encoder.writeRequest(1, {
  args: [{
    $class: 'java.lang.String',
    $: 'peter',
  serverSignature: '',
  methodName: 'sayHello',
  timeout: 3000,



  • encoder(options) 创建一个 ProtocolEncoder
    • @param {Map} sentReqs - 用于存储发送出去的请求
    • @param {Map} [classCache] - 类定义缓存
    • @param {Object} [classMap] - hessian 序列化的类型定义
    • @param {Object} [proto] - protobuf 序列化的接口定义
    • @param {Url} [address] - TCP socket 地址
    • @param {String} [codecType] - 序列化方式
  • decoder(options) 创建一个 ProtocolDecoder
    • @param {Map} sentReqs - 用于存储发送出去的请求
    • @param {Map} [classCache] - 类定义缓存
    • @param {Object} [classMap] - hessian 序列化的类型定义
    • @param {Object} [proto] - protobuf 序列化的接口定义
  • setOptions(options) 设置一些全局的参数

ProtocolEncoder 接口

  • protocolType 设置协议,bolt/bolt2
  • codecType 设置序列化方式,hessian/hessian2/protobuf
  • boltVersion 设置 bolt 的版本
  • crcEnable 是否开启 crc 校验
  • writeRequest(id, req, [callback]) 发送请求
    • @param {Number} id - 数据包唯一标识
    • @parma {Object} req - 请求对象
      • @param {String} serverSignature - 服务的唯一标识
      • @param {String} methodName - 方法名
      • @param {Array} args - 参数列表
      • @param {Number} timeout - 超时时间
      • @param {Object} requestProps - 额外传递的 kv 参数
  • writeResponse(req, res, [callback]) 发送响应
    • @param {Object} req - 请求对象,有请求才有响应
    • @parma {Object} res - 响应对象
      • @param {Boolean} isError - 是否成功
      • @param {String} errorMsg - 异常信息,isError=false 的话为 null
      • @param {Object} appResponse - 响应对象
      • @param {Object} responseProps - 额外传递的 kv 参数
  • writeHeartbeat(id, hb, [callback]) 发送心跳请求
    • @param {Number} id - 数据包唯一标识
    • @parma {Object} hb - 心跳对象
      • @param {String} clientUrl - 客户端 url
  • writeHeartbeatAck(hb, [callback]) 发送心跳响应
    • @parma {Object} hb - 心跳对象


从上面的介绍和接口定义看,我们对协议的实现核心就是 Encoder 和 Decoder 两个类,并且采用了 Nodejs 里流(Stream)的风格

+---------+  pipe  +---------+  pipe  +---------+    response
| Encoder |  --->  | Socket  |  --->  | Decoder |    ...
+---------+        +---------+        +---------+
                      |  ^
                      |  |
                      |  |
                      v  |
+---------+  pipe  +---------+  pipe  +---------+    request
| Encoder |  --->  | Socket  |  --->  | Decoder |    ...
+---------+        +---------+        +---------+

所有的协议细节,数据的切分都封装在 Encoder/Decoder 两个类中,并且提供标准的 API,所以以后我们要替换其他的通讯层协议(比如:dubbo),那么只需要直接替换就好了






sofa-bolt-node's People


gxcsoccer avatar killagu avatar duan-0916 avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.