samuelyao314 / blog Goto Github PK
View Code? Open in Web Editor NEW个人技术博客 https://github.com/samuelyao314/blog/issues
License: MIT License
个人技术博客 https://github.com/samuelyao314/blog/issues
License: MIT License
Skynet 是一个为网络游戏服务器设计的轻量框架。
这个游戏框架的特点是:
Skyent 核心部分是一个消息调度机制。示意图如下
说明:
我会用单独的篇幅来分析各个重点
首先是消息队列
Skynet 维护了两级消息队列。
下面,主要内容是
struct message_queue {
uint32_t handle; // 服务地址
int cap; // 数组
int head; // queue 实现了循环队列。 head 和 tail 分别是头和尾
int tail;
struct skynet_message *queue; // 保存消息的数组.
struct message_queue *next;
...
};
由于服务队列是属于服务的,所以服务队列的生命周期和服务一致:载入服务的时候生成,卸载服务的时候删除。
服务是通过 skynet_context_new 载入的,在此函数中,可以找到对应的服务队列的生成语句:
struct message_queue * queue = ctx->queue = skynet_mq_create(ctx->handle);
struct message_queue *
skynet_mq_create(uint32_t handle) {
struct message_queue *q = skynet_malloc(sizeof(*q));
q->handle = handle;
q->cap = DEFAULT_QUEUE_SIZE; // 队列大小
q->head = 0;
q->tail = 0;
// ...
q->queue = skynet_malloc(sizeof(struct skynet_message) * q->cap);
q->next = NULL;
return q;
}
handle 就是队列所属服务的地址。 通过 handle, 就可以找到服务对应的结构体。
可以看到,queue 是个数组,可以存放 DEFAULT_QUEUE_SIZE 个消息, 默认大小是 1024个。
实际上 queue 是用数组实现了一个循环队列。
服务队列主要支持2个操作
添加消息到队列中,如果队列满了,会触发自动扩容的操作 expand_queue . 新扩容的数组大小是原先的2倍。
void
skynet_mq_push(struct message_queue *q, struct skynet_message *message) {
q->queue[q->tail] = *message;
if (++ q->tail >= q->cap) {
q->tail = 0;
}
if (q->head == q->tail) {
expand_queue(q);
}
...
}
那么,取出消息后,数组占有空间少的时候,会收缩吗 ? 答案是不。
int
skynet_mq_pop(struct message_queue *q, struct skynet_message *message) {
// ...
if (q->head != q->tail) {
// 这个就是取出的消息
*message = q->queue[q->head++];
int head = q->head;
int tail = q->tail;
int cap = q->cap;
// head 重新指向数组头
if (head >= cap) {
q->head = head = 0;
}
int length = tail - head;
if (length < 0) {
length += cap;
}
}
// ...
}
Skynet 进程只有1个全局消息队列。 在Skynet 启动的时候会进行初始化。
skynet_mq_init();
struct global_queue {
struct message_queue *head; // 指向第一个服务队列
struct message_queue *tail; // 指向最后一个服务队列
struct spinlock lock; // 并发同步
};
你可能很好奇, 这个链表,如何指向下一个成员。 其实之前已经提到了
struct message_queue {
...
struct message_queue *next;
}
为了效率,并不是简单的把所有的服务队列都塞到全局队列中,而是只塞入非空的服务队列,
这样worker线程就不会得到空的服务队列而浪费CPU。
全局消息队列主要支持2个操作
添加服务队列
void
skynet_globalmq_push(struct message_queue * queue) {
struct global_queue *q= Q;
SPIN_LOCK(q)
assert(queue->next == NULL);
if(q->tail) {
q->tail->next = queue;
q->tail = queue;
} else {
q->head = q->tail = queue;
}
SPIN_UNLOCK(q)
}
取出服务队列
struct message_queue *
skynet_globalmq_pop() {
struct global_queue *q = Q;
SPIN_LOCK(q)
struct message_queue *mq = q->head;
if(mq) {
q->head = mq->next;
if(q->head == NULL) {
assert(mq == q->tail);
q->tail = NULL;
}
mq->next = NULL;
}
SPIN_UNLOCK(q)
return mq;
}
这些操作都使用了锁进行保护。早期版本,采用无锁机制,结果引入了并发的BUG 。
Skynet 启动多个worker线程进行消息分发,线程个数是可以设置的,官方建议配置为cpu核数。
每个 worker 线程执行的入口函数是 thread_worker 。
// skynet_start.c#start
for (i=0;i<thread;i++) {
// ...
create_thread(&pid[i+3], thread_worker, &wp[i]);
}
忽略枝叶, thread_worker 会不停地调用 skynet_context_message_dispatch
struct message_queue *
skynet_context_message_dispatch(struct skynet_monitor *sm, struct message_queue *q, int weight) {
// 从全局队列中取出一个服务队列
if (q == NULL) {
q = skynet_globalmq_pop();
if (q==NULL)
return NULL;
}
// 找到服务队列所属的服务上下文
uint32_t handle = skynet_mq_handle(q);
struct skynet_context * ctx = skynet_handle_grab(handle);
// ...
// 为了调度公平,每次只弹出一个消息
int i,n=1;
struct skynet_message msg;
for (i=0;i<n;i++) {
if (skynet_mq_pop(q,&msg)) {
skynet_context_release(ctx);
return skynet_globalmq_pop();
} else if (i==0 && weight >= 0) {
n = skynet_mq_length(q);
n >>= weight;
}
// 检查服务是否过载
int overload = skynet_mq_overload(q);
if (overload) {
skynet_error(ctx, "May overload, message queue length = %d", overload);
}
skynet_monitor_trigger(sm, msg.source , handle);
if (ctx->cb == NULL) {
skynet_free(msg.data);
} else {
// 进行消息分发
dispatch_message(ctx, &msg);
}
skynet_monitor_trigger(sm, 0,0);
}
// ...
return q;
}
函数 _dispatch_message 会调用这个服务的 callback 。
static void
_dispatch_message(struct skynet_context *ctx, struct skynet_message *msg) {
int type = msg->sz >> HANDLE_REMOTE_SHIFT;
size_t sz = msg->sz & HANDLE_MASK;
// ...
if (!ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz))
skynet_free(msg->data);
}
那么,如何设置这个 callback ? 下一篇再来回答。
上文详细讲了沙盒服务 snlua 的启动过程。本文内容是snlua 服务收到消息后,如何进行调度。
snlua 服务通过 skynet.start 进行启动。会触发绑定C消息回调函数
// 来自lua-skynet.c
static int
lcallback(lua_State *L) {
// 省略...
lua_rawgeti(L, LUA_REGISTRYINDEX, LUA_RIDX_MAINTHREAD);
lua_State *gL = lua_tothread(L,-1);
skynet_callback(context, gL, _cb);
}
注意: callback 绑定的ud 是Lua VM的主协程。
因此,服务收到的消息,会驱动主协程执行 skynet.raw_dispatch_message.
那么,Lua VM 只用一个协程,来处理所有的消息吗?
不是的。实际上新的请求会对应一个协程。 协程创建的代码是
-- 来自 skynet.lua
local coroutine_pool = setmetatable({}, { __mode = "kv" })
local function co_create(f)
local co = table.remove(coroutine_pool)
if co == nil then
co = coroutine.create(function(...)
f(...)
while true do
f = nil
coroutine_pool[#coroutine_pool+1] = co
f = coroutine_yield "EXIT"
f(coroutine_yield())
end
end)
else
coroutine_resume(co, f)
end
return co
end
为提高性能,采用了协程池。每个新请求,先从协程池中取,如果没有,就重新创建一个。处理完请求后,重新放入协程池。
在实际使用 skynet 时,你可以直接使用 rpc 的语法,向外部服务发起一个远程调用,等对方发送了回应消息后,逻辑接着走下去。
那么,框架是如何把回调函数的模式转换为阻塞 API 调用的形式呢?
用官方的例子来解释
-- 来自 skynet/example/agent.lua
local r = skynet.call("SIMPLEDB", "lua", "get", self.what)
每一个客户端连接上skynet后,会有一个对应的服务 agent.
服务 simpledb 收到 agent 消息后,执行过程如下
-- 主协程执行
raw_dispatch_message(prototype, msg, sz, session, source)
-- 请求类型 prototype ~= PTYPE_RESPONSE。执行新请求分支
local p = proto[prototype]
-- 这里对应 simpledb.lua 注册的分发函数
local f = p.dispatch
-- 假设刚开始池子是空的。创建一个新的
local co = co_create(f)
-- 记录session 和消息发来的服务地址
session_coroutine_id[co] = session
session_coroutine_address[co] = source
-- 协程切换, 子协程开始执行
suspend(co, coroutine_resume(co, session,source, p.unpack(msg,sz)))
-- 子协程执行
-- 入口函数是 co_create 定义的传入 coroutine.create 的闭包
-- 先执行分发函数, 定义在 simpledb.lua 里
f(...)
local f = command[string.upper(cmd)]
-- f 这里对应于 command.GET
skynet.ret(skynet.pack(f(...)))
-- 协程切换,回主协程
return coroutine_yield("RETURN", msg, sz)
-- 主协程执行
suspend(co, coroutine_resume(co, session,source, p.unpack(msg,sz)))
-- suspend(co, true, "RETURN", msg, sz)
-- 进入 command == "RETURN" 分支
-- 回复消息
ret = c.send(co_address, skynet.PTYPE_RESPONSE, co_session, param, size) ~= nil
-- 协程切换,子协程开始执行
return suspend(co, coroutine_resume(co, ret))
-- 子协程执行
skynet.ret(skynet.pack(f(...)))
-- ret 正常返回
-- 分发函数正常返回, f(...)
-- 重新返回 co_create 定义的闭包
-- 这个协程被放入池中,可以被复用
coroutine_pool[#coroutine_pool+1] = co
-- 协程切换,返回主协程
f = coroutine_yield "EXIT"
-- 主协程
-- command == "RETURN" 分支
return suspend(co, coroutine_resume(co, ret)) -- suspend(co, true, "EXIT")
-- 这时候调用栈 raw_dispatch_message --> suspend --> suspend
-- "EXIT" 做清理工作,内层 suspend 返回
-- 从 raw_dispatch_message 正常返回,主协程退出
上面子协程是新创建的,如果是复用协程
-- 主协程执行
raw_dispatch_message(prototype, msg, sz, session, source)
-- 假设刚开始池子是空的。创建一个新的
local co = co_create(f)
-- 返回的不是nil,说明协程池中还有协程可以复用。
local co = table.remove(coroutine_pool)
-- 进入 else 分支
-- 切换协程
coroutine_resume(co, f)
-- 子协程执行
-- 之前挂起在这个地方,f 就是对应的分发函数
f = coroutine_yield "EXIT"
-- 协程切换,返回主协程
f(coroutine_yield())
-- 主协程
-- 从这个函数挂起的地方返回
local co = co_create(f)
-- 协程切换, 子协程开始执行
suspend(co, coroutine_resume(co, session,source, p.unpack(msg,sz)))
-- 子协程
-- 从这里返回。等同于 f(...)
f(coroutine_yield())
-- 下面调度过程跟新创建的情况一样
-- 执行完后,最终会回收此协程,并调用 coroutine_yield “EXIT”等待下一次的复用
服务 agent 执行过程
-- 子协程
skynet.call("SIMPLEDB", "lua", "get", self.what)
-- 发送消息,得到唯一的session id
local session = c.send(addr, p.id , nil , p.pack(...))
-- 执行 yield_call
return p.unpack(yield_call(addr, session))
-- 这里当前子线程先挂起,等待收到回包。具体细节再看下面
local succ, msg, sz = coroutine_yield("CALL", session)
-- 主协程执行
suspend(co, coroutine_resume(co, session,source, p.unpack(msg,sz)))
-- suspend(co, true, "CALL", session)
-- 进入 "CALL" 分支。关联 session --> co
-- 主协程从 raw_dispatch_message 正常返回
-- 当收到回复包的时候,主协程执行
-- raw_dispatch_message 进入 PTYPE_RESPONSE 分支
-- 协程切换,执行之前挂起的子协程
local co = session_id_coroutine[session]
suspend(co, coroutine_resume(co, true, msg, sz))
-- 子协程
local succ, msg, sz = coroutine_yield("CALL", session)
-- skynet.call 返回,一次RPC调用已经完成
总结:
skynet 框架下不能直接使用 Lua coroutine库。
如果你需要创建用户级线程,可以使用 skynet.fork ,skynet.wait,skynet.wakeup。
如果你有其它原因想使用 coroutine, 请考虑清楚为什么需要,然后参考这篇文章Skynet Coroutinue.
游戏服务器开发中的其中一个难点:隔离性。在C/C++写的服务器中,一行代码中的空指针访问,就会导致整个服务器进程crash。
解决方式是:沙盒机制。
Skynet 的沙盒是利用Lua 实现的, 称为服务 snlua 。
下面重点讲这个沙盒是如何实现的
Skynet 启动过程, 主要是启动了一些沙盒服务。
Skynet 配置文件一般是 Config 文件。
按照默认配置,启动时,部分日志如下:
$ ./skynet examples/config
[:01000001] LAUNCH logger
[:01000002] LAUNCH snlua bootstrap
[:01000003] LAUNCH snlua launcher
[:01000004] LAUNCH snlua cmaster
[:01000004] master listen socket 0.0.0.0:2013
[:01000005] LAUNCH snlua cslave
[:01000005] slave connect to master 127.0.0.1:2013
[:01000004] connect from 127.0.0.1:55126 4
[:01000006] LAUNCH harbor 1 16777221
[:01000004] Harbor 1 (fd=4) report 127.0.0.1:2526
[:01000005] Waiting for 0 harbors
[:01000005] Shakehand ready
[:01000007] LAUNCH snlua datacenterd
[:01000008] LAUNCH snlua service_mgr
[:01000009] LAUNCH snlua main
...
第一个启动的服务是 logger ,这个服务在之前已经介绍过了,是用C语言实现的。用来打印日志。
bootstrap 这个配置项关系着 skynet 运行的第二个服务。默认的 bootstrap 配置项为
snlua bootstrap
这意味着,skynet 会启动一个 snlua 沙盒服务,并将 bootstrap 作为参数传给它。
按默认配置,服务会加载 service/bootstrap.lua 作为入口脚本。启动后,这个 snlua 服务同样可以称为 bootstrap 服务。
bootstrap 服务, 会根据配置启动其他系统服务, 其中启动了 launcher 服务。更多细节可以见Bootstrap 。
最后,它启动了 main 服务。 main.lua 就是业务逻辑的入口。
Lua代码里, 启动其他沙盒服务有2个API
例如,服务 bootstrap 启动服务 launcher
-- bootstrap.lua
local launcher = assert(skynet.launch("snlua","launcher"))
代码跟踪:
最终载入了一个 snlua 服务,用 launcher.lua 作为入口脚本。
那么, skynet.newservice 有什么不同那 ?
这个函数跟 launch 的区别是: 通过发送消息给服务 launcher, 由 launcher 来统一启动指定服务。
代码跟踪:
下面讲沙盒具体的启动过程
启动服务 launcher 取例
skynet_context_new("snlua", "launcher")
服务的创建函数
struct snlua {
lua_State * L;
struct skynet_context * ctx; // 服务的句柄
size_t mem; // 当前使用的内存量,单位是byte
size_t mem_report; // 每次超过这个值,会产生日志告警
size_t mem_limit; // 内存上限
};
struct snlua *
snlua_create(void) {
struct snlua * l = skynet_malloc(sizeof(*l));
memset(l,0,sizeof(*l));
l->mem_report = MEMORY_WARNING_REPORT;
l->mem_limit = 0;
l->L = lua_newstate(lalloc, l);
return l;
}
每一个 snlua 服务都绑定了一个Lua VM。 Lua VM实现是线程安全的。
既然可以限制每个VM的内存,那么应该限制多少?
官方的建议:
玩家代理服务,可以设置上限到 128 M 左右。当然以过往经验,在正常情况通常应保持在 10M 以下。
读者可能还有一个疑问:每个服务一个 Lua VM, 函数字节码在进程里不是有很多份吗?
针对这个问题,云风大牛已经解决了:对Lua源码做了修改,可以支持多个Lua VM 共用函数字节码。
下面,看初始化函数
int
snlua_init(struct snlua *l, struct skynet_context *ctx, const char * args) {
int sz = strlen(args);
char * tmp = skynet_malloc(sz);
memcpy(tmp, args, sz);
skynet_callback(ctx, l , launch_cb);
const char * self = skynet_command(ctx, "REG", NULL);
uint32_t handle_id = strtoul(self+1, NULL, 16);
skynet_send(ctx, 0, handle_id, PTYPE_TAG_DONTCOPY,0, tmp, sz);
return 0;
}
消息触发执行 launch_cb
static int
launch_cb(struct skynet_context * context, void *ud, int type, int session, uint32_t source , const void * msg, size_t sz) {
assert(type == 0 && session == 0);
struct snlua *l = ud;
skynet_callback(context, NULL, NULL);
init_cb(l, context, msg, sz);
...
}
函数 init_cb
static int
init_cb(struct snlua *l, struct skynet_context *ctx, const char * args, size_t sz) {
lua_State *L = l->L;
l->ctx = ctx;
// 省略 ...
lua_pushlightuserdata(L, ctx);
lua_setfield(L, LUA_REGISTRYINDEX, "skynet_context");
// 省略 ...
lua_pushcfunction(L, traceback);
assert(lua_gettop(L) == 1);
const char * loader = optstring(ctx, "lualoader", "./lualib/loader.lua");
int r = luaL_loadfile(L,loader);
if (r != LUA_OK) {
skynet_error(ctx, "Can't load %s : %s", loader, lua_tostring(L, -1));
report_launcher_error(ctx);
return 1;
}
lua_pushlstring(L, args, sz);
r = lua_pcall(L,1,0,1);
if (r != LUA_OK) {
skynet_error(ctx, "lua loader error : %s", lua_tostring(L, -1));
report_launcher_error(ctx);
return 1;
}
// 省略 ...
}
Lua 加载 lancher.lua 。 最重要的是在Lua代码中注册了服务的消息分发函数
skynet.register_protocol {
name = "text",
id = skynet.PTYPE_TEXT,
unpack = skynet.tostring,
dispatch = function(session, address , cmd)
if cmd == "" then
command.LAUNCHOK(address)
elseif cmd == "ERROR" then
command.ERROR(address)
else
error ("Invalid text command " .. cmd)
end
end,
}
skynet.dispatch("lua", function(session, address, cmd , ...)
cmd = string.upper(cmd)
local f = command[cmd]
if f then
local ret = f(address, ...)
if ret ~= NORET then
skynet.ret(skynet.pack(ret))
end
else
skynet.ret(skynet.pack {"Unknown command"} )
end
end)
对每一种类型的消息,都需要注册一个Lua 分发函数。
function skynet.register_protocol(class)
local name = class.name
local id = class.id
assert(proto[name] == nil)
assert(type(name) == "string" and type(id) == "number" and id >=0 and id <=255)
proto[name] = class
proto[id] = class
end
那么,服务收到一个消息后,又是如何执行这个Lua 分发函数的?
lancher.lua 最后一行
skyent.start(function () end)
function skynet.start(start_func)
c.callback(skynet.dispatch_message)
-- 这里可以理解为,直接执行初始化函数 start_func
skynet.timeout(0, function()
skynet.init_service(start_func)
end)
end
static int
lcallback(lua_State *L) {
// 取出服务的上下文
struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1));
int forward = lua_toboolean(L, 2);
luaL_checktype(L,1,LUA_TFUNCTION);
lua_settop(L,1);
// 寄存器中保存分发函数,register[&_cb] = skynet.dispatch_message
lua_rawsetp(L, LUA_REGISTRYINDEX, _cb);
// 取出状态机的主线程,注意:snlua 沙盒是由主线程进行调度
lua_rawgeti(L, LUA_REGISTRYINDEX, LUA_RIDX_MAINTHREAD);
// 主线程
lua_State *gL = lua_tothread(L,-1);
// forward 模式下,这个消息处理完,并不释放内存
if (forward) {
skynet_callback(context, gL, forward_cb);
} else {
skynet_callback(context, gL, _cb);
}
return 0;
}
-- 代码片段来自 skynet.lua@raw_dispatch_message
local p = proto[prototype]
local f = p.dispatch
-- 针对这个新请求,创建出一个线程。切换协程
local co = co_create(f)
session_coroutine_id[co] = session
session_coroutine_address[co] = source
suspend(co, coroutine_resume(co, session,source, p.unpack(msg,sz)))
通过 proto,消息就能被之前 skynet.register_protocol 注册的分发函数进行处理了。
总结:
参考资料:
源码: include/phenom/refcnt.h
typedef int ph_refcnt_t;
void ph_refcnt_add(ph_refcnt_t *ref)
// Returns true if we just released the final reference
bool ph_refcnt_del(ph_refcnt_t *ref)
引用计数管理对象的生命期
void ph_string_delref(ph_string_t *str)
{
if (!ph_refcnt_del(&str->ref)) {
return;
}
if (str->mt >= 0) {
ph_mem_free(str->mt, str->buf);
str->mt = PH_MEMTYPE_INVALID;
}
if (str->slice) {
ph_string_delref(str->slice);
str->slice = 0;
}
str->buf = 0;
if (!str->onstack) {
ph_mem_free(mt_string, str);
}
}
源码: corelib/counter.c; include/phenom/counter.h; tests/counter.c; corelib/debug_console.c
计数器。 用来了解事物发生的频率。实际用在memory, job子系统中。
scope就是一系列在逻辑上处于同一组的counter的集合概念。
在使用counter的时候最初就需要创建scope。
在定义scope的时候需要确定该scope内最多能有多少个counter注册进去,这个叫slot。
scope互相之间可以有父子继承关系。
我们要创建block的scenario只有两个:
当你在同一个线程内需要频繁进行计数器更新的时候;
当你在一个线程内对多个计数器进行更新,并期望这个操作尽可能快的时候;
开启debug-console, 可以输出系统的计数。
~$> echo counters | nc -UC /tmp/phenom-debug-console
iosched/dispatched 5144
iosched/timer_busy 0
iosched/timer_ticks 5035
memory.ares.channel/allocs 1
memory.ares.channel/bytes 104
memory.ares.channel/frees 0
memory.ares.channel/oom 0
上面的最高层的scope是memory和iosched。 memory的子scope是area, area的子scope是channel.
channel里面有4个slots, 分别记录了4个counter. name分别是alloc, bytes, frees, oom.
对应的counter分别是1, 104, 0, 0.
源码:corelib/memory.c; include/phenom/memory; tests/memory.c; corelib/debug_console.c
基于counter子系统的内存分配器。
通过下面2个函数注册新的memtype。
ph_memtype_t ph_memtype_register(const ph_memtype_def_t *def);
ph_memtype_t ph_memtype_register_block(
uint8_t num_types,
const ph_memtype_def_t *defs,
ph_memtype_t *types);
memtype支持的操作,malloc, realloc, free
void *ph_mem_alloc(ph_memtype_t memtype)
void *ph_mem_alloc_size(ph_memtype_t memtype, uint64_t size)
void *ph_mem_realloc(ph_memtype_t memtype, void *ptr, uint64_t size)
void ph_mem_free(ph_memtype_t memtype, void *ptr)
通过下面函数就可以了解内存的分配情况
void ph_mem_stat(ph_memtype_t memtype, ph_mem_stats_t *stats);
struct ph_mem_stats {
/* the definition */
const ph_memtype_def_t *def;
/* current amount of allocated memory in bytes */
uint64_t bytes;
/* total number of out-of-memory events (allocation failures) */
uint64_t oom;
/* total number of successful allocation events */
uint64_t allocs;
/* total number of calls to free */
uint64_t frees;
/* total number of calls to realloc (that are not themselves
* equivalent to an alloc or free) */
uint64_t reallocs;
};
开启debug-console, 可以输出内存使用情况 (非常酷)
$> echo memory | nc -UC /tmp/phenom-debug-console
WHAT BYTES OOM ALLOCS FREES REALLOC
threadpool/pool 832 0 1 0 0
threadpool/ringbuf 8480 0 2 0 0
hashtable/table 3136 0 3 0 0
hook/hook 8 0 1 0 0
hook/head 0 0 0 0 0
hook/string 19 0 1 0 0
hook/unreg 0 0 0 0 0
stream/stream 272 0 2 0 0
buffer/object 120 0 3 0 0
buffer/8k 16384 0 2 0 0
buffer/16k 0 0 0 0 0
buffer/32k 0 0 0 0 0
buffer/64k 0 0 0 0 0
buffer/vsize 0 0 0 0 0
buffer/queue 48 0 2 0 0
buffer/queue_ent 64 0 2 0 0
源码: corelib/string.c; include/phenom/string.c; tests/string.c;
设计目标: http://facebook.github.io/libphenom/#string
实现
typedef struct ph_string ph_string_t;
struct ph_string {
ph_refcnt_t ref; // 引用计数
ph_memtype_t mt;
uint32_t len, alloc; // 使用字节数,总字节数
char *buf; // 指向实际的存储
ph_string_t *slice;
bool onstack; // 是否在stack上
};
其中参数mt的值, 用负的表示stack-based growable,正的表示heap-allocated growable
ph_result_t ph_string_append_buf(ph_string_t *str,
const char *buf, uint32_t len)
{
if (len + str->len > str->alloc) {
// Not enough room
if (str->mt == PH_STRING_STATIC) {
// Just clamp to the available space
len = str->alloc - str->len;
} else {
// Grow it
uint32_t nsize = ph_power_2(str->len + len);
char *nbuf;
// Negative memtypes encode the desired memtype as the negative
// value. Allocate a buffer from scratch using the desired memtype
if (str->mt < 0) {
nbuf = ph_mem_alloc_size(-str->mt, nsize);
} else {
nbuf = ph_mem_realloc(str->mt, str->buf, nsize);
}
if (nbuf == NULL) {
return PH_NOMEM;
}
if (str->mt < 0) {
// Promote from static growable to heap allocated growable
memcpy(nbuf, str->buf, str->len);
str->mt = -str->mt;
}
str->buf = nbuf;
str->alloc = nsize;
}
}
memcpy(str->buf + str->len, buf, len);
str->len += len;
return PH_OK;
}
slice的创建
ph_string_t *ph_string_make_slice(ph_string_t *str,
uint32_t start, uint32_t len)
{
ph_string_t *slice;
if (start == 0 && len == str->len) {
ph_string_addref(str);
return str;
}
slice = ph_mem_alloc(mt_string);
if (!slice) {
return NULL;
}
ph_string_init_slice(slice, str, start, len);
return slice;
}
例如 memory.c 里有以下指令
PH_LIBRARY_INIT_PRI(memory_init, memory_destroy, 3)
include/phenom/defs.h定义
void ph_library_init_register(struct ph_library_init_entry *ent);
#define PH_LIBRARY_INIT_PRI(initfn, finifn, pri) \
static __attribute__((constructor)) \
void ph_defs_gen_symbol(ph__lib__init__)(void) { \
static struct ph_library_init_entry ent = { \
__FILE__, __LINE__, pri, initfn, finifn, 0 \
};
ph_library_init_register(&ent); \
}
attribute((constructor)), 使的函数体在main开始运行前,自动调用;
具体见 http://gcc.gnu.org/onlinedocs/gcc/Function-Attributes.html;
所以meory_init, memory_destroy被注册
每1个使用libphenom的程序都要求,先调用ph_library_init,每1个注册init函数都被执行。
for (i = 0; i < num_init_ents; i++) {
struct ph_library_init_entry *ent = init_funcs[i];
if (ent->init) {
ent->init();
}
}
源码: 目录 corelib/streams; include/phenom/stream.h; tests/stream.c
libPhenom provides a portable layer over streaming IO.
CSAPP解释了标准IO为什么不能使用在socket上。
stream支持socket, ssl, fd, string.
实现
/** Represents a stream
*
* Streams maintain a buffer for read/write operations.
*/
struct ph_stream {
const struct ph_stream_funcs *funcs;
void *cookie;
unsigned flags;
pthread_mutex_t lock;
// if data is in the read buffer, these are non-NULL
unsigned char *rpos, *rend;
// if data is in the write buffer, these are non-NULL
unsigned char *wpos, *wend;
unsigned char *wbase;
// associated buffer. It can be either used in read mode
// or write mode, but not both
unsigned char *buf;
uint32_t bufsize;
int last_err;
ph_iomask_t need_mask;
};
/** Defines a stream implementation.
*
* If any of these return false, it indicates an error.
* The implementation must set stm->last_err to the corresponding
* errno value in that case (and only in the failure case).
*/
struct ph_stream_funcs {
bool (*close)(ph_stream_t *stm);
bool (*readv)(ph_stream_t *stm, const struct iovec *iov,
int iovcnt, uint64_t *nread);
bool (*writev)(ph_stream_t *stm, const struct iovec *iov,
int iovcnt, uint64_t *nwrote);
bool (*seek)(ph_stream_t *stm, int64_t delta,
int whence, uint64_t *newpos);
};
读写公用1个缓存区。 通过定义 struct ph_stream_funcs, 来支持不同流类型。
源码: corelib/buf.c; include/phenom/buffer.h; tests/buf.c
设计目标: http://facebook.github.io/libphenom/index.html#buffer;
ph_buf_t作为ph_bufq_t的底层实现,并没有单独使用
struct ph_buf {
ph_refcnt_t ref;
ph_buf_t *slice;
uint8_t *buf;
uint64_t size;
ph_memtype_t memtype;
};
ph_buf_t *ph_buf_new(uint64_t size);
ph_buf_t *ph_buf_slice(ph_buf_t *buf, uint64_t start, uint64_t len);
ph_buf_new 创建1个新的buffer, 新的buffer的大小,调用函数select_size。主要分为8192,16k, 32k等。
ph_buf_slice 创建1个slice, slice实际上没有分配内存。
特殊情况:start=0, len等于buf的长度,只是ph_buf_addref(buf).
buf子系统中不同的内存分配,都分别区分开。
static ph_memtype_def_t defs[] = {
{ "buffer", "object", sizeof(ph_buf_t), PH_MEM_FLAGS_ZERO },
{ "buffer", "8k", 8*1024, 0 },
{ "buffer", "16k", 16*1024, 0 },
{ "buffer", "32k", 32*1024, 0 },
{ "buffer", "64k", 64*1024, 0 },
{ "buffer", "vsize", 0, 0 },
{ "buffer", "queue", sizeof(ph_bufq_t), PH_MEM_FLAGS_ZERO },
{ "buffer", "queue_ent", sizeof(struct ph_bufq_ent), PH_MEM_FLAGS_ZERO },
};
ph_bufq_t,用作socket的用户层buffer。
struct ph_bufq_ent {
PH_STAILQ_ENTRY(ph_bufq_ent) ent;
ph_buf_t *buf;
// Offset into the buf of the data that is yet to be consumed
uint64_t rpos;
// Offset at which to append further data
uint64_t wpos;
};
struct ph_bufq {
PH_STAILQ_HEAD(bufqhead, ph_bufq_ent) fifo;
// Maximum amount of storage to allow
uint64_t max_size; // 现在好像没有用? 20131114
};
ph_bufq_t *ph_bufq_new(uint64_t max_size);
ph_result_t ph_bufq_append(ph_bufq_t *q, const void *buf, uint64_t len,
uint64_t *added_bytes);
ph_buf_t *ph_bufq_consume_bytes(ph_bufq_t *q, uint64_t len);
ph_buf_t *ph_bufq_consume_record(ph_bufq_t *q, const char *delim,
uint32_t delim_len);
ph_bufq_new 创建出1个定长buffer的fifo. 默认会在fifo里放1个8192长度的buffer.
ph_bufq_append 对ph_bufq_t插入数据. 如果最后1个buffer容量不够,就会创建出1个新的buffer, 放到fifo里.
ph_bufq_consume_bytes 从ph_bufq_t读出数据。gc_bufq用来释放资源。 返回的ph_buf_t是重新创建的。
ph_bufq_consume_record 读取数据到指定的record. 例如读取到"\r\n". 调用函数find_record,需要很有耐心的实现。
源码:include/phenom/json.h; 目录 corelib/variant/
提供了json的encoding,decoding的功能。
源码: phenom/configuration.h; corelib/config.c
程序启动时有全局的配置文件(json格式), 修改程序一些行为。
该文件可以通过ph_config_load_config_file或者 getenv("PHENOM_CONFIG_FILE")来指定。
例如job.c 里, 可以设置下面的参数来指定sleep时间
int max_sleep = ph_config_query_int("$.nbio.max_sleep", 5000);
建议应用自己的配置在路径 "$.app."下
源码: include/phenom/timerwheel.h, corelib/timerwheel.c
timer wheel, 是一种定时器实现机制。概念来自"Hashed and Hierarchical Timing Wheels".
用来管理大量的定时器。Linux内核中也用这种实现。
定时轮的工作原理可以类比于时钟,如上图; 指针按某一个方向按固定频率轮动,每一次跳动称为一个tick。
这样可以看出定时轮由个3个重要的属性参数,ticksPerWheel(一轮的tick数),tickDuration(一个tick的持续时间)
以及 timeUnit(时间单位),例如 当ticksPerWheel=60,tickDuration=1,timeUnit=秒,这就和现实中的始终的秒针走动完全类似了。
实现
PH_LIST_HEAD( // 双向的循环链表head, 具体见phenom/queue.h
ph_timerwheel_list,
ph_timerwheel_timer);
struct ph_timerwheel_timer {
PH_LIST_ENTRY(ph_timerwheel_timer) t;
struct ph_timerwheel_list *list;
struct timeval due;
int enable;
#define PH_TIMER_DISABLED 0
#define PH_TIMER_ENABLED 1
#define PH_TIMER_LOCKED 2
};
#define PHENOM_WHEEL_BITS 8
#define PHENOM_WHEEL_SIZE (1 << PHENOM_WHEEL_BITS) // 256
struct ph_timerwheel {
struct timeval next_run; // 下1个tick的实际时间
uint32_t tick_resolution; // 每个tick的时间间隔
ck_rwlock_t lock;
struct {
struct ph_timerwheel_list lists[PHENOM_WHEEL_SIZE];
} buckets[4];
};
Bucket 0 represents those events that are due the soonest.
Each tick causes us to look at the next list in a bucket.
The 0th list in a bucket is special; it means that it is time to
flush the timers from the next higher bucket and schedule them
into a different bucket.
ph_timerwheel提供了4个buckets, buckets存在着类似时分秒的进位关系;
下面用TV1标识buckets[0], 以此类推, TV4标识buckets[3];
TV1为第1个表,所表示的计时是 1 ~ 255 tick.
因为在一个tick上可能同时有多个timer等待超时处理,
使用ph_timerwheel_list将所有timer 串成一个链表,以便在超时时顺序处理;
TV2为第2个表, 所表示的计时是 256 ~ 65535 tick.
以此类推TV3, TV4;
在nbio子系统中,tick_resolution=100ms,每过100ms, 每1个事件循环会触发ph_timerwheel_tick函数。
用来处理下一个tick所在的所有timer.
ph_timerwheel_tick(ph_timerwheel_t *wheel,
struct timeval now,
ph_timerwheel_should_dispatch_func_t should_dispatch,
ph_timerwheel_dispatch_func_t dispatch,
void *arg)
idx 是用来遍历 TV1 的索引。每一次循环idx会定位一个当前待处理的 tick,并处理这个tick下所有超时的timer。
wheel->next_run会在每次循环后增加一个 tick_resolution,index也会随之向前移动。当index变为0时表示TV1完成了一次完整的遍历,
此时所有在 TV1 中的 timer 都被处理了,因此需要通过 cascade 将后面 TV2,TV3 等 timer list 中的timer向前移动,类似于分转成秒的操作。
这种层叠的 timer list 实现机制可以大大降低每次检查超时, timer的时间,每次中断只需要针对 TV1 进行检查,只有必要时才进行cascade。
timer wheel一个弊端就是 cascade 开销过大。 在极端的条件下,同时会有多个TV需要进行cascade处理,会产生很大的时延。
这也是为什么说timeout类型的定时器是timer wheel 的主要应用环境,或者说timer wheel 是为 timeout 类型的定时器优化的。
因为timeout类型的定时器的应用场景多是错误条件的检测,这类错误发生的机率很小,通常不到超时就被删除了,因此不会产生cascade的开销。
nbio子系统,
初始化过程,ph_nbio_init--> ph_timerwheel_init(&emitters[i].wheel, me->now, WHEEL_INTERVAL_MS);
函数ph_nbio_emitter_init中, 每1个emitter,创建1个timerfd,100ms后,定时器超时,timefd成为可读,触发回调函数tick_epoll;
emitter->timer_fd = timerfd_create(
CLOCK_MONOTONIC, TFD_NONBLOCK|TFD_CLOEXEC);
if (emitter->timer_fd == -1) {
ph_panic("timerfd_create(CLOCK_MONOTONIC) failed: `Pe%d", errno);
}
memset(&ts, 0, sizeof(ts));
ts.it_interval.tv_nsec = WHEEL_INTERVAL_MS * 1000000;
ts.it_value.tv_nsec = ts.it_interval.tv_nsec;
timerfd_settime(emitter->timer_fd, 0, &ts, NULL);
ph_job_init(&emitter->timer_job);
emitter->timer_job.callback = tick_epoll;
emitter->timer_job.fd = emitter->timer_fd;
emitter->timer_job.data = emitter;
emitter->timer_job.emitter_affinity = emitter->emitter_id;
ph_job_set_nbio(&emitter->timer_job, PH_IOMASK_READ, 0);
调用顺序: ph_nbio_emitter_init -> ph_job_set_nbio -> tick_epoll -> ph_nbio_emitter_timer_tick -> ph_timerwheel_tick
源码: include/phenom/hashtable.h;corelib/hash; tests/hashtable.c;
struct ph_ht {
uint32_t nelems;
uint64_t table_size, elem_size, mask;
const struct ph_ht_key_def *kdef;
const struct ph_ht_val_def *vdef;
/* points to the table, an array of table_size elements */
char *table;
};
ph_result_t ph_ht_init(ph_ht_t *ht, uint32_t size_hint,
const struct ph_ht_key_def *kdef,
const struct ph_ht_val_def *vdef)
{
ht->kdef = kdef;
ht->vdef = vdef;
ht->nelems = 0;
ht->table_size = ph_power_2(size_hint * 2);
ht->elem_size = sizeof(struct ph_ht_elem) + kdef->ksize + vdef->vsize;
ht->mask = ht->table_size - 1;
ht->table = ph_mem_alloc_size(mt_table, ht->elem_size * ht->table_size);
if (!ht->table) {
return PH_NOMEM;
}
return PH_OK;
}
采用的是linear probing的实现。 hash桶的大小在ph_ht_init的时候传入。
如果桶满了, insert就会失败。 需要显性地调用。
ph_ht_grow来手动建立hash表, 没有rehash的过程。
ph_hash_bytes_murmur函数实现了Murmur Hash算法。
源码: include/phenom/thread.h; corelib/thread.c
struct ph_thread {
bool refresh_time;
// internal monotonic thread id
uint32_t tid;
PH_STAILQ_HEAD(pdisp, ph_job) pending_nbio, pending_pool;
struct ph_nbio_emitter *is_emitter;
int is_worker;
struct timeval now;
ck_epoch_record_t epoch_record;
ck_hs_t counter_hs;
// linkage so that a stat reader can find all counters
ck_stack_entry_t thread_linkage;
// OS level representation
pthread_t thr;
// If part of a pool, linkage in that pool
CK_LIST_ENTRY(ph_thread) pool_ent;
pid_t lwpid;
#ifdef HAVE_STRERROR_R
char strerror_buf[128];
#endif
// Name for debugging purposes
char name[16];
};
Phenom线程上记录了
每个phenom线程分配一个全局唯一的id,对应一个pthread线程。 如注释所说,tid < MAX_RINGS的phenom线程称为preferred thread, 拥有自己专用的job队列,其他线程竞争共享队列,用spinlock同步。
全局的pools将所有线程池保存在链表中。其中包含用于consumer和producer等待/唤醒的结构(futex或condition variable), 保存job的ring buffer、worker线程的指针等等信息。
ph_thread_spawn(func, arg)创建一个ph_thread_t线程。 实际上是调用pthread_create(),让其执行ph_thread_boot(),将实际要执行的函数func() 和参数arg等信息传入。ph_thread_boot()会分配内存并创建一个新的ph_thread_t结构, 执行一些初始化,然后调用传入的那个func()。
此外,封装了join、self、setaffinity等等pthread操作。
1个进程中线程直接除了线程自己的栈和寄存器之外,其他几乎都是共享的,如果线程想维护一个只属于线程自己的全局变量怎么办?
线程的私有存储解决了这个问题。
ph_thread_self函数就用这个方式取得线程自己的句柄
job有3类
源码
ph_nbio_init()初始化NBIO。
每个emitter绑定了1个事件循环
struct ph_nbio_emitter {
ph_timerwheel_t wheel; // 时间轮
ph_job_t timer_job;
uint32_t emitter_id;
struct timeval last_dispatch;
int io_fd, timer_fd;
ph_nbio_affine_job_stailq_t affine_jobs; // typedef PH_STAILQ_HEAD(affine_ent, ph_nbio_affine_job)
ph_job_t affine_job;
ph_pingfd_t affine_ping; // 用来唤醒epoll
ph_thread_t *thread; // 跟thread绑定在一起
ph_counter_block_t *cblock; // 计数器
};
struct ph_job {
// data associated with job
void *data;
// the callback to run when the job is dispatched
ph_job_func_t callback;
// deferred apply list
PH_STAILQ_ENTRY(ph_job) q_ent;
// whether we're in a deferred apply
bool in_apply;
// for PH_RUNCLASS_NBIO, trigger mask */
ph_iomask_t mask;
// use ph_job_get_kmask() to interpret
int kmask;
// Hashed over the scheduler threads; two jobs with
// the same emitter hash will run serially wrt. each other
uint32_t emitter_affinity;
// For nbio, the socket we're bound to for IO events
ph_socket_t fd;
// Holds timeout state
struct ph_timerwheel_timer timer;
// When targeting a thread pool, which pool
ph_thread_pool_t *pool;
// for SMR
ck_epoch_entry_t epoch_entry;
struct ph_job_def *def;
};
ph_sched_run调度NBIO
job加入NBIO
通过ph_job_set_nbio加入JOB (ph_job_set_nbio_timeout_in实际调用ph_job_set_nbio
放入pending_nbio队列的job, 通过ph_sched_run --> process_deferred --> ph_nbio_emitter_apply_io_mask;
加入事件循环中
sched_run开始后,新加入job; ph_nbio_emitter_run --> ph_job_pool_apply_deferred_items --> process_deferred
源码: include/phenom/thread.h; corelib/job.h; corelib/job.c; tests/tpool.c
struct ph_thread_pool {
struct ph_thread_pool_wait consumer CK_CC_CACHELINE;
uint32_t max_queue_len;
ck_ring_t *rings[MAX_RINGS+1];
intptr_t used_rings;
ck_spinlock_t lock CK_CC_CACHELINE;
char pad1[CK_MD_CACHELINE - sizeof(ck_spinlock_t)];
struct ph_thread_pool_wait producer CK_CC_CACHELINE;
int stop;
char *name;
ph_counter_scope_t *counters;
CK_LIST_ENTRY(ph_thread_pool) plink;
ph_thread_t **threads;
uint32_t max_workers;
uint32_t num_workers;
ph_variant_t *config;
};
job分发的过程
job的处理
ph_thread_pool_signal_stop函数用来终止
源码:
libphenom对socket io进行了封装。包括描述符ph_socket_t, 通用的地址结构phenom_sockaddr,
ph_sock_t封装了读写buffer、用于NBIO的job结构、超时时长、事件发生后的callback等信息。
ph_sock_t由NBIO pool管理。
解析域名并发起连接的过程:
struct resolve_and_connect {
ph_sockaddr_t addr;
ph_socket_t s;
int resolve_status;
int connect_status;
uint16_t port;
struct timeval start, timeout, elapsed;
void *arg;
ph_sock_connect_func func;
};
def ph_sock_resolve_and_connect(name, port, timeout, resolver, func,, args):
rac = ph_mem_alloc(mt.resolve_and_connect)
rac.func = func;
rac.arg = arg;
rac.start = ph_time_now();
rac.port = port;
if timeout:
rac.timeout = timeout
else:
rac.timeout = 60 # 默认60s超时
if ph_sockaddr_set_v4(rac.addr, name, port) == PH_OK: # 如果name是IP地址
attempt_connect(rac)
return
# 根据resolver采用不同的解析域名的方式,
rac.addr = dns_getaddrinfo(resolver)
attempt_connect(rac)
def attempt_connect(rac):
# 建立socket对象
rac.s = ph_socket_for_addr(rac.addr, SOCK_STREAM, PH_SOCK_CLOEXEC|PH_SOCK_NONBLOCK)
ph_socket_connect(rac.s, rac.addr, rac.timeout, connected_sock, rac)
struct connect_job {
ph_job_t job;
ph_socket_t s;
ph_sockaddr_t addr;
int status;
struct timeval start;
void *arg;
ph_socket_connect_func func;
};
def ph_socket_connect(s, addr, timeout, func, arg):
# connect_job_template = { callback = connect_complete, memtype = mt.connect_job}
job = (struct connect_job*)ph_job_alloc(connect_job_template)
job.s, job.addr, job.func, job.arg = s, addr, func, arg
job.start = ph_time_now();
res = connect(s, job.addr ...) # man 2 connect
if (...) #
# 如果s对应fd是异步方式,使用事件回调机制, 回调函数是connect_complete
job.job.fd = s
job.job.callback = connect_complete
job.job.data = job
ph_job_set_nbio_timeout_in(&job->job, PH_IOMASK_WRITE,
timeout ? *timeout : default_timeout);
return;
# 同步IO, 直接调用connected_sock
done = job.stat - now
func(s, addr, res == 0 ? 0 : errno, done, arg);
def connect_complete(ph_job_t *j, ph_iomask_t why, void *data):
struct connect_job *job = data
if why == PH_IOMASK_TIME:
status = ETIMEDOUT
# 回调之前注册的函数, connected_sock
job.func(job.s, job.addr, status, done, job.arg)
def connected_sock(s, addr, status, elapsed, arg):
struct resolve_and_connect *rac = arg;
sock = ph_sock_new_from_socket(s, NULL, addr)
calc_elapsed(rac)
# 回调用户定义的函数 , 类型是ph_sock_connect_func
rac.func(sock, PH_SOCK_CONNECT_SUCCESS, 0, addr, rac.elapsed, rac.arg);
ph_sock_t, 对1个socket连接的抽象:
struct ph_sock {
// Embedded job so we can participate in NBIO
ph_job_t job;
// Buffers for output, input
ph_bufq_t *wbuf, *rbuf;
// The per IO operation timeout duration
struct timeval timeout_duration;
// A stream for writing to the underlying connection
ph_stream_t *conn;
// A stream representation of myself. Writing bytes into the
// stream causes the data to be buffered in wbuf
ph_stream_t *stream;
// Dispatcher
ph_sock_func callback;
bool enabled;
// sockname, peername as seen from this host.
// These correspond to the raw connection we see; if we are
// proxied, these are the names of our connection to the proxy.
// If we are not proxied, these are the same as the equivalents below
ph_sockaddr_t via_sockname, via_peername;
// sockname, peername as seen from the connected peer
// These are the actual outgoing address endpoints, independent of
// any proxying that may be employed
ph_sockaddr_t sockname, peername;
// If we've switched up to SSL, holds our SSL context
SSL *ssl;
ph_stream_t *ssl_stream;
ph_sock_openssl_handshake_func handshake_cb;
ph_bufq_t *sslwbuf;
};
// 创建ph_sock_t
// connected_sock, accept_dispatch函数调用
def ph_sock_new_from_socket(ph_socket_t s, ph_sockaddr_t *sockname, ph_sockaddr_t *peername):
# sock_job_template = {sock_dispatch, mt.sock}, 分配的结构体是ph_sock_t
sock = (ph_sock_t*)ph_job_alloc(&sock_job_template)
# 读写buf默认大小为128k
max_buf = ph_config_query_int("$.socket.max_buffer_size", MAX_SOCK_BUFFER_SIZE)
sock->wbuf = ph_bufq_new(max_buf)
sock->rbuf = ph_bufq_new(max_buf)
sock->conn = ph_stm_fd_open(s, 0, 0)
sock->stream = ph_stm_make(&sock_stm_funcs, sock, 0, 0)
# sockname记录本地地址, peer记录对端地址
sock->sockname = *sockname
sock->peername = *peername
# 默认60s超时
sock->timeout_duration.tv_sec = 60
return sock
// 加入nbio的方式, 以ph_sock_connect_func回调取例
def connect_cb(ph_sock_t *sock, ...):
# 设置回调函数, 并开启
sock->callback = remote_cb
ph_sock_enable(sock, true);
// 当ph_sock对应的fd有event发生时,nbio回调的入口函数是
def sock_dispatch(j, why, data):
# SSL暂时不关心, 先skip这些代码
ph_sock_t *sock = (ph_sock_t*)j;
sock->conn->need_mask = 0;
// 把wbuf里缓存的数据写入fd
try_send(sock)
// 从系统中读取数据到rbuf
try_read(sock)
// 设置对应的mask, 回调用户注册的函数
// ....
sock->callback(sock, why, data);
// 释放ph_socket_t,当发现需要关闭连接时
ph_sock_shutdown(sock, PH_SOCK_SHUT_RDWR);
// 如果sock->job.data之前有malloc数据,这里需要释放
ph_mem_free(mt_state, state);
ph_sock_free(sock);
sock的读写:
本章的内容是
在业务Lua代码里设置定时器的接口是
-- 参数 ti: number
-- 参数 func: function
-- 框架在 ti 个单位时间后,调用 func 这个函数。
skynet.timeout(ti, func)
定时器实现的非常高效,一般不用太担心性能问题。
如果你的服务想大量使用定时器的话,可以考虑:在一个服务里,只使用一个 skynet.timeout,用它来触发自己的定时事件模块。
skynet.timeout 实现
function skynet.timeout(ti, func)
local session = c.intcommand("TIMEOUT",ti)
assert(session)
local co = co_create(func)
assert(session_id_coroutine[session] == nil)
session_id_coroutine[session] = co
end
再看看 c.intcommand("TIMEOUT",ti) 的实现
cmd_timeout 实现
static const char *
cmd_timeout(struct skynet_context * context, const char * param) {
char * session_ptr = NULL;
int ti = strtol(param, &session_ptr, 10);
int session = skynet_context_newsession(context);
skynet_timeout(context->handle, ti, session);
sprintf(context->result, "%d", session);
return context->result;
}
skynet 启动后,有1个线程执行
create_thread(&pid[1], thread_timer, m);
忽略很多细节,这个函数主要是不停地调用
for (;;) {
skynet_updatetime();
// ...
}
忽略细节,随着时间变化,对应的定时器到期,会触发
static inline void
dispatch_list(struct timer_node *current) {
do {
struct timer_event * event = (struct timer_event *)(current+1); //取出event,然后对skynet消息赋值
struct skynet_message message;
message.source = 0;
message.session = event->session;
message.data = NULL;
message.sz = (size_t)PTYPE_RESPONSE << MESSAGE_TYPE_SHIFT;
skynet_context_push(event->handle, &message); //将消息压入相应的服务
struct timer_node * temp = current;
current=current->next;
skynet_free(temp); //处理完之后才释放内存
} while (current);
}
dispatch_list 根据定时事件event 关联的source, session, 发送一个消息给对应的服务。
那么,服务收到后,就可以根据session 找到之前分配的协程了。
定时器实现方案,从原理来说,非常简单。 伪代码如下
def update_timer():
current = get_cur_time()
for timer in reg_timer_list:
if timer.expeires > current:
timer.callback()
由时钟来驱动 update_timer。检查到期的定时事件,进行回调。
云大在skynet 里采用了时间轮的实现。按8/6/6/6/6/分成5个部分,也就是有5个时钟,这种分层方法的空间复杂度变为 256+64+64+64+64= 512个槽,支持注册最长时间的tick是 25664646464=2^32。每个ticket 时长是 1/100秒
个人觉得:针对这个精度,时间轮太复杂了,采用更简单的实现,例如优先级队列。
更多关于定时器,可以看我之前的文章: 各种定时器的实现
skynet 在机制上不支持取消之前注册的定时器。
当倒计时结束以后,执行的不是调用定时器时注册的回调,而是更改了的回调。
如果这个回调不会做任何事情,这就是伪取消。
实现代码
local function remove_timeout_cb(...)
end
function skynet.remove_timeout(session)
local co = co_create(remove_timeout_cb)
assert(session_id_coroutine[session] ~= nil)
session_id_coroutine[session] = co
end
这篇文章,讲Skynet 调度机制中另外一个重点是:服务。
把一个符合规范的 C 模块,从动态库(so 文件)中加载进来。绑定一个永不重复的数字 id 做为其 handle 。
这个运行的模块就是服务。 服务本质上是C语言的一个结构体。
模块和服务的关系,可以理解为程序和进程。
struct skynet_context {
void * instance; // 指针,指向模块create的结构体
struct skynet_module * mod; // 对动态库 so 的封装
void * cb_ud; // 消息回调的上下文
skynet_cb cb; // 消息回调
struct message_queue *queue; // 服务所属的消息队列
FILE * logfile; // 如果存在,会通过 skynet_log_output 记录消息到这个文件中
char result[32]; // 保存相关函数的临时结果
uint32_t handle; // 服务的 handle, 具有唯一性
int session_id; // 请求回应模式
int ref; // 引用计数
int message_count; // 记录服务总共处理了多少个消息
...
};
通过下面函数,可以创建一个服务。
struct skynet_context *
skynet_context_new(const char * name, const char *param);
skynet_context_new 主要的操作是
每一个实现C 服务的模块, 约定实现以下4个接口
struct skynet_module {
const char * name; // so 文件名称
void * module; // dlopen 返回的指针
skynet_dl_create create; // 创建XX服务。 必须
skynet_dl_init init; // 初始化XX服务实例。 可选
skynet_dl_release release; // 释放XX服务实例。 可选
skynet_dl_signal signal; // XX服务信号处理器。 可选
};
每个服务可以向 Skynet 框架注册一个 callback 函数,用来接收发给它的消息。
每个服务都是被一个个消息包驱动,当没有包到来的时候,它们就会处于挂起状态,对CPU资源零消耗。
可以通过这个函数设置回调
void
skynet_callback(struct skynet_context * context, void *ud, skynet_cb cb) {
context->cb = cb;
context->cb_ud = ud;
}
callback 函数的定义
typedef int (*skynet_cb)(
struct skynet_context * context,
void *ud,
int type,
int session,
uint32_t source,
const void * msg,
size_t sz
);
对指定服务发送消息
int skynet_send(
struct skynet_context * context,
uint32_t source,
uint32_t destination,
int type,
int session,
void * msg,
size_t sz
);
日志服务 logger, 是启动的第一个服务。 它负责记其他服务的日志输出。
例如,下面的Lua代码,通过日志服务打印了一行日志
skynet.error("Watchdog listen on", 8888)
skynet.error 实现是这样的
那么, logger 服务是如何实现的 ?
struct logger {
FILE * handle;
char * filename;
int close;
};
struct logger *
logger_create(void) {
struct logger * inst = skynet_malloc(sizeof(*inst));
inst->handle = NULL;
inst->close = 0;
inst->filename = NULL;
return inst;
}
logger 服务创建的时候, 会附带一个数据结构 struct logger . 从这个结构知道,日志可以输出到指定的一个文件。
int
logger_init(struct logger * inst, struct skynet_context *ctx, const char * parm) {
if (parm) {
inst->handle = fopen(parm,"w");
if (inst->handle == NULL) {
return 1;
}
inst->filename = skynet_malloc(strlen(parm)+1);
strcpy(inst->filename, parm);
inst->close = 1;
} else {
inst->handle = stdout;
}
if (inst->handle) {
skynet_callback(ctx, inst, logger_cb);
skynet_command(ctx, "REG", ".logger");
return 0;
}
return 1;
}
初始化时,init 根据是否传入参数,决定了日志是输出到文件或者标准输出。
再看看回调函数 logger_cb
static int
logger_cb(struct skynet_context * context, void *ud, int type, int session, uint32_t source, const void * msg, size_t sz) {
struct logger * inst = ud;
switch (type) {
case PTYPE_SYSTEM:
if (inst->filename) {
inst->handle = freopen(inst->filename, "a", inst->handle);
}
break;
case PTYPE_TEXT:
fprintf(inst->handle, "[:%08x] ",source);
fwrite(msg, sz , 1, inst->handle);
fprintf(inst->handle, "\n");
fflush(inst->handle);
break;
}
return 0;
}
下面,我们会讲一讲,使用最多的 Lua 沙盒服务。
skynet 核心机制,是没有网络层的。对网络层的处理,是由单独的线程来完成的。
网络功能的核心结构
struct socket_server {
int recvctrl_fd; // 接收管道消息的文件描述
int sendctrl_fd; // 发送管道消息的文件描述
int checkctrl; // 判断是否有其他线程通过管道,向socket线程发送消息的标记变量
poll_fd event_fd; // epoll实例id
int alloc_id; // 已经分配的socket slot列表id
int event_n; // 标记本次epoll事件的数量
int event_index; // 下一个未处理的epoll事件索引
struct socket_object_interface soi;
struct event ev[MAX_EVENT]; // epoll事件列表
struct socket slot[MAX_SOCKET]; // socket 列表
};
服务所在的工作线程,通过管道的 sendctrl_fd 写入数据。 socket 线程,通过 recvctrl_fd进行读取网络操作请求。
操作系统的fd, 都会在 slot 里分配一个对应的 socket 结构。
struct socket {
uintptr_t opaque; // 与本socket关联的服务地址,socket接收到的消息,最后将会传送到这个服务商
struct wb_list high; // 高优先级发送队列
struct wb_list low; // 低优先级发送队列
int64_t wb_size; // 发送字节大小
int fd; // socket文件描述符
int id; // 位于socket_server的slot列表中的位置
uint16_t protocol; // 使用的协议tcp or udp
uint16_t type; // epoll事件触发时,会根据type来选择处理事件的逻辑
};
其中 type 会表示当前 socket 的状态,会根据这个状态触发不同的逻辑。
监停指定端口,并发处理客户端的请求。代码如下
-- 来着 skynet/service/debug_console.lua
local socket = require "skynet.socket"
local listen_socket = socket.listen (ip, port)
socket.start(listen_socket , function(id, addr)
local function print(...)
local t = { ... }
for k,v in ipairs(t) do
t[k] = tostring(v)
end
socket.write(id, table.concat(t,"\t"))
socket.write(id, "\n")
end
socket.start(id)
skynet.fork(console_main_loop, id , print)
end)
注意点:
Lua 执行
local listen_socket = socket.listen (ip, port)
调用过程如下:
接着执行
socket.start(listen_socket, function ()... end)
socket 线程,是并发执行C 代码的。 不停地在执行 skynet_socket_poll
// skynet_socket_poll
case SOCKET_OPEN:
forward_message(SKYNET_SOCKET_TYPE_CONNECT, true, &result);
break;
forward_message,封装了一个类别是 PTYPE_SOCKET 的请求,数据是 struct skynet_socket_message的消息,
放入到服务opaque 的消息队列里。
debug_console.lua. 对这个消息的处理,是定义在
// lualib/skynet/socket.lua
skynet.register_protocol {
name = "socket",
id = skynet.PTYPE_SOCKET, -- PTYPE_SOCKET = 6
unpack = driver.unpack,
dispatch = function (_, _, t, ...)
socket_message[t](...)
end
}
数据unpack 执行的是 lunpack @ lua-socket.c. 接下去执行 dispatch, 传入的参数是
socket_message[SKYNET_SOCKET_TYPE_CONNECT = 2](result->id, result->ud, "start")
执行的回调函数就是
-- SKYNET_SOCKET_TYPE_CONNECT = 2
socket_message[2] = function(id, _ , addr)
local s = socket_pool[id]
if s == nil then
return
end
-- log remote addr
if not s.connected then -- resume may also post connect message
s.connected = true # 表示成功连接
wakeup(s)
end
end
local function wakeup(s)
local co = s.co
if co then
s.co = nil
-- 这里并没有 yield。 只是把 co 放入 wakeup_queue.
-- 等到下一个调度,重新 resume co
skynet.wakeup(co)
end
end
这样,找到之前挂起的那个协程,重新唤醒它。 Lua 之后执行的代码,会跳转到 这里
socket 线程,轮循 socket_server_poll。 检测到读时间
// skynet_socket_poll
case SOCKET_ACCEPT:
forward_message(SKYNET_SOCKET_TYPE_ACCEPT, true, &result);
break;
服务 debug_console,触发逻辑是
socket.start(listen_socket , function(id, addr)
// ...
socket.start(id)
skynet.fork(console_main_loop, id , print)
end)
调用 socket.start(id, func) @socket.lua , 协程挂起。 参考上面的流程,这个挂起,之后会被唤醒。
skynet.fork 出一个新的协程,并发地处理每个客户端的读写操作。
针对其中一个客户端,执行 console_main_loop ,执行
local cmdline = socket.readline(stdin, "\n")v
如果 s.buffer 读缓冲区里有 “\n", 就直接读取并返回。 否则就会挂起
local ret = driver.readline(s.buffer, buffer_pool, sep)
if ret then
return ret
end
assert(not s.read_required)
s.read_required = sep
-- 在这里挂起
suspend(s)
socket 线程里,客户端每次发过来新的数据,最终会触发服务执行代码. socket_message[1] @socket.lua
local s = socket_pool[id]
local sz = driver.push(s.buffer, buffer_pool, data, size)
local rr = s.read_required -- 现在这里应该是 "\n"
local rrt = type(rr)
-- 下面逻辑走 else
if rrt == "string" then
-- read line
if driver.readline(s.buffer,nil,rr) then
s.read_required = nil
wakeup(s)
wakeup 后, 最终框架会继续驱动上面挂起的 socket.readline 返回,继续执行之前的逻辑。
Go语言, 号称互联网时代的C语言。 它从一开始,就适合服务端的开发。
下面具体叙述Go语言为什么适合服务端的开发。
从 2005 年开始,时钟速率的增长和晶体管数量的增长已不再同步。由于处理器材料的物理性质限制,时钟速率已停止增长(甚至下降),处理器制造商开始将更多执行单元(核心)封装到单个芯片(插槽)中。这一趋势(似乎能够在可预见的未来继续保持)已开始给应用程序开发和编程语言开发社区带来越来越大的压力. 高效地利用可用 CPU 核心的惟一方法就是使用并行性。
在应用程序开发方面,基于线程的并发编程是实现并行性的主导机制。
但线程编程存在本质上的困难:
所有,基于线程的编程模型不是多核时代的最佳选择.
我们需要更合适的并发模型。
语言层面支持并发模型的, 暂时只有GO和Erlang.
许式伟认为
Erlang的困难之处在于它是FP语言. 我们缺乏深入人心的FP编程理论。我们并不了解FP“数据结构”学。这是Erlang语言无法逾越的门槛,决定了它只能是小众化语言。
在Baidu上搜索“招聘+erlang”, 相关结果约20,300个。
搜索”招聘+go“, 相关结果约1,240,000个。
我个人认为,许式伟的看法是对的。
Go语言之父Rob Pike 这样说
在那一个小时的演讲中,我们大概听到了约35种计划中的新特性。当然实际上还有更多,
这时候,我问了自己一个问题:C++委员会真的认为C++的特性还不够多?当然,不同于Ron的玩笑,简化这门语言必是一门更大的成就!
Go语言社区吸引了大量程序员的参与, 截至2015-10-29日
https://golang.org/AUTHORS 统计,核心提交者有556人。
有人用BigQuery查询github的结果:
【图2】统计从2014年05月1日起到2015年8月8号,各种语言创建的repos数排名。
限定条件:fork数>3;repos大于20MB
从搜索引擎结果看,国内已经有很多公司在使用Go。 包括百度,阿里,豆瓣,陌陌等
Rob Pike 说
在那一个小时的演讲中,我们大概听到了约35种计划中的新特性。当然实际上还有更多。这时候,我问了自己一个问题:C++委员会真的认为C++的特性还不够多?当然,不同于Ron的玩笑,简化这门语言必是一门更大的成就!也许这很可笑,但是请把这个想法记在心里。
slice 和 array 的基本操作
arr := [5]int{1, 2, 3, 4, 5}
slice := arr[3 : 5] // slice:[4, 5]
slice[0] = 0 // slice:[0, 5]
fmt.Println(slice)
fmt.Println(arr)
输出结果是
[0 5]
[1 2 3 0 5]
对Go的Slice进行Append的一个坑
func main() {
arr1 := [5]int{1, 2, 3, 4, 5}
slice1 := arr1[1:2]
slice1 = append(slice1, 6, 7, 8)
fmt.Println("slice1:", slice1)
fmt.Println("arr1:", arr1)
arr2 := [5]int{1, 2, 3, 4, 5}
slice2 := arr2[1:3]
slice2 = append(slice2, 6, 7, 8)
fmt.Println("slice2:", slice2)
fmt.Println("arr2:", arr2)
}
输出结果是
slice1: [2 6 7 8]
arr1: [1 2 6 7 8] //神奇地,原数组被改变了
slice2: [2 3 6 7 8]
arr2: [1 2 3 4 5] //一切正常
type geometry interface {
area() float64
perim() float64
}
type rect struct {
width, height float64
}
type circle struct {
radius float64
}
func (r rect) area() float64 {
return r.width * r.height
}
func (r rect) perim() float64 {
return 2*r.width + 2*r.height
}
func (c circle) area() float64 {
return math.Pi * c.radius * c.radius
}
func (c circle) perim() float64 {
return 2 * math.Pi * c.radius
}
func measure(g geometry) {
fmt.Println(g)
fmt.Println(g.area())
fmt.Println(g.perim())
}
func main() {
r := rect{width: 3, height: 4}
c := circle{radius: 5}
measure(r)
measure(c)
}
Go语言采用的是“非侵入式接口".
一个Go例程就是一个和其它Go例程在同一地址空间里但却独立运行的函数。
f("hello", "world") // f runs; we wait
go f("hello", "world") // f starts running
g() // does not wait for f to return
就像是在shell里使用 & 标记启动一个命令。
Go例程不是线程, 很像线程,但比线程更轻量。
一个程序里产生成千上万个Go例程很正常。
多个例程可以在系统线程上做多路通信。
当一个Go例程阻塞时,所在的线程会阻塞,但其它Go例程不受影响。
Do not communicate by sharing memory; instead, share memory by communicating
通道是类型化的值,能够被Go例程用来做同步或交互信息。
timerChan := make(chan time.Time)
go func() {
time.Sleep(deltaT)
timerChan <- time.Now() // send time on timerChan
}()
// Do something else; when ready, receive.
// Receive will block until timerChan delivers.
// Value sent is other goroutine's completion time.
completedAt := <-timerChan
这select语句很像switch,但它的判断条件是基于通信,而不是基于值的等量匹配。
select {
case v := <-ch1:
fmt.Println("channel 1 sends", v)
case v := <-ch2:
fmt.Println("channel 2 sends", v)
default: // optional
fmt.Println("neither channel was ready")
}
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.