一文让你掌握skynet,后端开发原来如此简单【建议收藏】

一、skynet是什么

云风的skynet,定义为一个游戏服务器框架,用c + lua基于Actor模型实现。代码极其精简,c部分的代码只有三千行左右。

整个skynet框架要解决的核心问题是:把一个消息(数据包)从一个服务(Actor)发送给另一个服务(Actor),并接收其返回。也就是在同一进程内(作者也强调并非只限于同一进程,因为可能会有集群间的通讯)的一个服务通过类似rpc之类的调用同一进程内的另外一个服务,并接收处理结果。而skynet就是处理这些服务间发送数据包的规则和正确性。

skynet的核心层全部是c来实现

当系统启动的时候,会得到一个提前分配好的节点id,我们称之为harbor id,这个id是集群用的,一个集群内可以启动很多个skynet节点,每个节点都会分配到唯一的id,一个节点(即一个进程)内有很多个服务,服务可以狭义地暂且理解为功能模块。

当初始化一个服务的时候,会生成一个skynet_context来作为服务的实例;一个唯一(即使是在集群里也是唯一)的服务handle,即服务的唯一id,用来识别服务;一个消息队列message_queue;还要向框架注册一个callback,当服务收到有发送来的消息时,通过这个方法传入。

初始化一个服务的代码如下:

struct skynet_context *
skynet_context_new(const char * name, const char *param) {
     // 装载模块
     struct skynet_module * mod = skynet_module_query(name);

     if (mod == NULL)
          return NULL;

     void *inst = skynet_module_instance_create(mod);
     if (inst == NULL)
          return NULL;
     // 初始化skynet_context实例
     struct skynet_context * ctx = skynet_malloc(sizeof(*ctx));
     CHECKCALLING_INIT(ctx)

     ctx->mod = mod;
     ctx->instance = inst;
     ctx->ref = 2;
     ctx->cb = NULL;
     ctx->cb_ud = NULL;
     ctx->session_id = 0;
     ctx->logfile = NULL;

     ctx->init = false;
     ctx->endless = false;
     // 初始化服务handle
     // Should set to 0 first to avoid skynet_handle_retireall get an uninitialized handle
     ctx->handle = 0;    
     ctx->handle = skynet_handle_register(ctx);
     // 初始化消息队列
     struct message_queue * queue = ctx->queue = skynet_mq_create(ctx->handle);
     // init function maybe use ctx->handle, so it must init at last
     context_inc();

     CHECKCALLING_BEGIN(ctx)
     int r = skynet_module_instance_init(mod, inst, ctx, param);
     CHECKCALLING_END(ctx)
     if (r == 0) {
          struct skynet_context * ret = skynet_context_release(ctx);
          if (ret) {
               ctx->init = true;
          }
          skynet_globalmq_push(queue);
          if (ret) {
               skynet_error(ret, "LAUNCH %s %s", name, param ? param : "");
          }
          return ret;
     } else {
          skynet_error(ctx, "FAILED launch %s", name);
          uint32_t handle = ctx->handle;
          skynet_context_release(ctx);
          skynet_handle_retire(handle);
          struct drop_t d = { handle };
          skynet_mq_release(queue, drop_message, &d);
          return NULL;
     }
}

在skynet_handle_register方法中生成一个服务handle,handle是一个32位的整数,在生成handle的时候,是把该节点的harbor id写到了handle的高8位里面,所以一个服务的handle,就可以知道这个服务是哪个节点的。

s->handle_index = handle + 1;
          rwlock_wunlock(&s->lock);
          handle |= s->harbor;
          return handle;

所以说,harbor id最高也只有256个,也就意味着skynet集群最多只能有256个节点,而一个节点里最多也只能有24位个服务,即1.6M个。因为一个handle是32位的整数,高8位用来存储harbor id,只有低的24位用来分配给本节点的handle。

消息队列message_queue是用来存储发送给该服务的消息的。所有发送给该服务的消息,都要先压到该服务的消息队列中。

文章福利】小编推荐自己的C/C++、Linux技术交流群:【1106675687】整理了一些个人觉得比较好的学习书籍、视频资料共享在群文件里面,有需要的可以自行添加哦!!!



服务启动起来了,来看看数据包是如何从一个服务发送给另一个服务的。来看看 skynet_send 和 callback 函数的定义:

int skynet_send(
  struct skynet_context * context,
  uint32_t source,
  uint32_t destination,
  int type,
  int session,
  void * msg,
  size_t sz
);

typedef int (*skynet_cb)(
  struct skynet_context * context,
  void *ud,
  int type,
  int session,
  uint32_t source ,
  const void * msg,
  size_t sz
);

ource和destination分别是发送方和接收方的handle。
type是发送方和接收方处理数据包的协议、session识别本次调用的口令,发送方发送一个消息后,保留该session,以便收到回应数据包时,能识别出是哪一次调用。

msg/sz是数据包的内容和长度,成对使用

  • skynet 的消息调度
  • skynet 维护了两级消息队列。

每个服务实体有一个私有的消息队列,队列中是一个个发送给它的消息。消息由四部分构成:

struct skynet_message {
    uint32_t source;
    int session;
    void * data;
    size_t sz;
};

向一个服务发送一个消息,就是把这样一个消息体压入这个服务的私有消息队列中。这个结构的值复制进消息队列的,但消息内容本身不做复制。

Skynet 维护了一个全局消息队列,里面放的是诸个不为空的次级消息队列。

在 Skynet 启动时,建立了若干工作线程(数量可配置),它们不断的从主消息列队中取出一个次级消息队列来,再从次级队列中取去一条消息,调用对应的服务的 callback 函数进行出来。为了调用公平,一次仅处理一条消息,而不是耗净所有消息(虽然那样的局部效率更高,因为减少了查询服务实体的次数,以及主消息队列进出的次数),这样可以保证没有服务会被饿死。这样,skynet就实现了把一个消息(数据包)从一个服务发送给另一个服务。

二、消息派发

这里云大的skynet已经很详细的介绍了,我就仅仅在这里略提一下。skynet 把消息分为不同的类别,不同类别的消息有不同的编码方式,若编写一个服务,你需要为此服务关注的消息类型注册 dispatch 函数用来接收此类别的消息。skynet 注册类别消息的 dispatch 函数有两种方式。

调用 skynet.register_protocol 注册。函数的参数是一个 table ,以"lua"类消息为例,里面有若干字段含义如下:

{
     name = "lua", -- 消息组的字符串名称
     id = skynet.PTYPE_LUA, -- 消息组的数字 id
     pack = skynet.pack, -- 打包消息
     unpack = skynet.unpack, -- 解包消息
     dispatch = function(session, source, cmd, ...) ... end -- 消息回调/分发函数
}

指定了 table 中的 dispatch 字段,以后"lua"类消息到达时便会调用此函数。

调用 skynet.dispatch 函数注册。为此,云大给出了一个惯用写法,以"lua"类消息为例,如下:

local CMD = {}
skynet.dispatch("lua", function(session, source, cmd, ...)
    local f = assert(CMD[cmd])
    f(...)
end)

两种方式可以根据喜好选择,毕竟一个服务可能需要处理多种类型的消息,需要注册多个 dispatch 函数。

在 skynet 中用 Lua 编写一个服务必须调用 skynet.start 启动函数启动此服务:

function skynet.start(start_func)
    c.callback(dispatch_message)
    skynet.timeout(0, function()
        init_service(start_func)
    end)
end

skynet.start 其中在一个作用是调用 c.callback 函数把 skynet 框架的消息派发与你自定义的 dispatch 函数联系起来,这个联系的纽带就是 dispatch_message(skynet.lua) 函数。当服务的消息队列有消息到达时,框架从消息队列中取出消息经过一些转换调用到 dispatch_message 函数,然后 dispatch_message 函数根据协议类型调用相应的 dispatch 函数,最终到具体某条消息的处理函数。

三、消息执行

skynet 是基于服务的,服务间通过消息进行通信。实现方面 skynet 为每个服务创建一个 lua_State ,不同的服务 lua_State 是不同的,因此服务是相互独立互不影响的。对于消息,“skynet 的 lua 层会为每个请求创建一个独立的 coroutine”。经过上面一节,了解到消息会到达我们自定义的 dispatch 函数,此时进入了业务相关的代码逻辑中,我们只关注业务的逻辑而不关注底层消息如何到达这儿的。于是猜测应该是在 dispatch_message 函数中 skynet 会创建 coroutine 来具体处理某个消息。

然后,我们猜想消息执行流程大概应该是这样的:

  • 一条消息到达,服务的主线程创建 coroutine 处理此消息,处理完后执行权回到主线程,继续下一条消息处理。
  • 一条消息到达,服务的主线程创建 coroutine 处理此消息,假设此服务是 A ,此时创建的 coroutine 是 coA。A向另一个服务 B 发送一条消息并等待 B 的返回结果,A 才继续执行。这时最好的方式是对 coA做出标记让出执行,主线程继续处理其他消息,并根据标记判断接收的消息是不是派发到 coA 的,若是则再唤醒 coA 继续执行。

对于单个服务来说,弄清楚一条消息执行流程是这篇笔记的主要内容。

此外由于每条消息都运行在一个 coroutine 中,云大根据反馈对 coroutine 进行了回收再利用以此提升效率。

skynet 接口有非阻塞 API (如 skynet.ret)也有阻塞 API (如 skynet.call)。阻塞 API 也仅仅是阻塞调用此 API 的 coroutine ,服务本身并没有阻塞。这两个 API 刚好与上面猜测的消息执行流程相呼应,接下来以这两个 API 为例子来说明。顺便提一点,调用阻塞 API 时要防止一些问题。

上面提到 dispatch_message 会创建 coroutine 把消息派发到我们的自定义 dispatch 函数中。实际上完成任务是在函数 raw_dispatch_message 函数中。下面是简化版的函数实现:

local function raw_dispatch_message(prototype, msg, sz, session, source, ...)
    -- skynet.PTYPE_RESPONSE = 1, read skynet.h
    if prototype == 1 then -- response 类型消息,skynet 已自动处理
        local co = session_id_coroutine[session]
        session_id_coroutine[session] = nil
        suspend(co, coroutine.resume(co, true, msg, sz))
    else -- 其他类型消息派发到相应的 dispatch 函数
        local p = assert(proto[prototype], prototype)
        local f = p.dispatch -- 我们自定义的 dispatch 函数
        if f then
            local co = co_create(f) -- 创建 coroutine
            session_coroutine_id[co] = session
            session_coroutine_address[co] = source
            suspend(co, coroutine.resume(co, session,source, p.unpack(msg,sz, ...)))
        end
    end
end

下面以 skynet 自带的例子 agent.lua 和 simpledb.lua 为例来进行说明,以 agent 服务 和 simpledb 服务分别指代这两个服务。agent 服务通过"client"类型协议处理客户端发送过来的请求,然后 agent 服务和 simpledb 服务通信获得结果,最后把结果发送到客户端。simpledb 服务最简单,接收消息计算结果并返回结果。

先以 simpledb 服务为例进行说明

local skynet = require "skynet"
local db = {}

local command = {}

function command.GET(key)
    return db[key]
end

function command.SET(key, value)
    local last = db[key]
    db[key] = value
    return last
end

skynet.start(function()
    skynet.dispatch("lua", function(session, address, cmd, ...)
        local f = command[string.upper(cmd)]
        if f then
            skynet.ret(skynet.pack(f(...)))
        else
            error(string.format("Unknown command %s", tostring(cmd)))
        end
    end)
    skynet.register "SIMPLEDB" -- 注册名称,其他服务可以直接向此名称发送协议
end)

simpledb(line 17) 调用 skynet.dispatch 注册"lua"类型消息的 dispatch 函数,假设这个匿名函数叫 db_dispatch 。

假设 simpledb 接收到 agent 发送过来的"SET"消息。框架从 simpledb 消息队列中取出消息,经过一些调用代码执行到 raw_dispatch_message 函数。在 raw_dispatch_message(line 3) 进行 if 条件判断,这条"SET"消息的消息类型是"lua",因此 prototype 是 10 ,代码这时执行到 else 分支,目的是为了创建 coroutine 调用 db_dispatch 函数。代码走到 raw_dispatch_message(line 11) 调用 co_create 函数,在能回收 coroutine 的情况下创建一个 coroutine ,让我们看看 co_create 实现。

local coroutine_pool = {} -- 存放 coroutine 对象的数组
local coroutine_yield = coroutine.yield -- 让出函数

local function co_create(f)
    local co = table.remove(coroutine_pool) -- 先从数组中取出 coroutine ,从数组中删除是禁止此 coroutine 被其他消息使用
    if co == nil then
        co = coroutine.create(function(...)
            f(...) -- 执行我们传入的函数
            while true do
            -- 执行完后回收 coroutine
            f = nil
            coroutine_pool[#coroutine_pool+1] = co
            -- 让出执行,通知 main_thread 做些清理工作
            -- coroutine 被唤醒后,代码会从下面的调用中返回并赋值 f 为我们需要执行的函数,然后继续执行
            f = coroutine_yield "EXIT"
            f(coroutine_yield()) -- 这里再次调用让出函数,是为了接收参数传递给 f
            end
        end)
    else
        coroutine.resume(co, f) -- 唤醒一个 coroutine ,并传入参数 f f 是我们想要执行的函数
    end
    return co
end

从使用的理念上,调用函数创建一个 coroutine 对象后,再调用 resume 函数,coroutine 便会执行,调用者无需关注这个 coroutine 是新创建的还是回收利用之前已经创建的。代码继续执行,走到raw_dispatch_message(line 14) ,正如预想的那样,代码先调用 coroutine.resume 启动 coroutine ,于是 dispatch 函数变得以执行。由于 coroutine 是回收利用的,实际在 raw_dispatch_message(line 14) 调用 coroutine.resume 时,coroutine 是分两种情况执行的,让我们回到 co_create 函数实现。

  • 当调用 co_create ,coroutine_pool 没有 coroutine 时(此时有可能是服务刚启动数组中还没有coroutine ,也有可能创建的 coroutine 已经被用完了)此时会走到 co_create(line 7) ,创建一个新的coroutine 。然后调用 coroutine.resume 时,co_create(line 8)的代码会被执行,函数执行完后就要回收这个新创建的 co ,然后调用 coroutine_yield “EXIT” 让出执行,此时raw_dispatch_message(line 14) 调用的 coroutine.resume 函数返回,代码回到主线程,调用suspend 函数处理"EXIT"命令,suspend 函数执行完后,raw_dispatch_message函数也执行完毕,本次消息也就执行完毕。
  • 当调用 co_create ,coroutine_pool 中有剩余的 coroutine 时,此时便会利用这个 coroutine 。代码执行到 co_create(line 20) ,这里调用 coroutine.resume 唤醒这个之前已经让出执行的coroutine ,然后在 co_create(line 15) 对 coroutine_yield 的调用会返回,并赋值 f,这样做的目的是为了传入我们要执行的函数 f 。然后执行到 line 16 再次调用 coroutine_yield,这次目的是为了接收函数参数。最后在 raw_dispatch_message(line 14) 调用 coroutine.resume时,coroutine 第二次被唤醒,在 co_create(line 16) coroutine_yield 会返回并返回 resume传入的参数,这样我们想要执行的函数便得到执行。执行后这是一个 while 死循环,代码走到 co_create(11) 开始回收这个coroutine ,然后调用 coroutine_yield “EXIT” 让出执行(接下来的执行同上),消息执行完毕。

分析了 co_create 函数,让我们回到正题。此时是 simpledb 服务,代码执行raw_dispatch_message(line 14) ,coroutine 被执行,db_dispatch 函数被调用,此时代码走到 simpledb(line 18) 然后 command.SET 函数被调用,紧接着调用 skynet.ret 返回结果。

skynet.ret 实现如下:

function skynet.ret(msg, sz)
2     msg = msg or ""
3     return coroutine_yield("RETURN", msg, sz)
4 end

在 skynet.ret 函数中会调用 coroutine_yield ,此时 coroutine 会让出执行,执行权回到主线程 main_thread 。不要晕,千万不要晕:)现在代码再次回到 raw_dispatch_message(line 14) ,此时 coroutine.resume 函数返回并返回了 4 个参数:true, “RETURN”, msg, sz ,其中 msg, sz 是要发送回去的消息。接着便调用 suspend 函数处理"RETURN"命令。下面看一下简化版的 suspend 代码。

function suspend(co, result, command, param, size)
    if command == "CALL" then
        session_id_coroutine[param] = co -- 记录下此 coroutine ,接收到"response"消息时获取
    elseif command == "RETURN" then
        local co_session = session_coroutine_id[co]
        local co_address = session_coroutine_address[co]
        ret = c.send(co_address, skynet.PTYPE_RESPONSE, co_session, param, size)
        return suspend(co, coroutine.resume(co, ret))
    elseif command == "EXIT" then
        -- coroutine exit
        local address = session_coroutine_address[co]
        release_watching(address)
        session_coroutine_id[co] = nil
        session_coroutine_address[co] = nil
        session_response[co] = nil
    end
end

再次强调一下,此时代码走到 suspend ,这是在主线程执行的,然后处理"RETURN"命令,发送消息到 agent 服务。这里发现原来调用 skynet.ret 返回消息时实际的消息发送是在主线程执行的。紧接着代码走到 suspend(line 8) ,再次调用 coroutine.resume ,此时执行权回到 coroutine ,回到 skynet.ret 函数中,在 skynet.ret(line 3) coroutine_yield 返回后,skynet.ret 函数也已经返回,执行权还是在 coroutine ,代码此时走到 simpledb(line 20) skynet.ret 的返回,db_dispatch 函数也已经执行完并返回,此时 simpledb 已经对"SET"消息处理完毕,这时就相当于 co_create 中的 f 函数执行完毕,下面就是 coroutine 的回收,参考 co_create 说明。OK ,到了这里 simpledb 处理"SET"消息,我们已经分析完毕,看起来很绕,其实也蛮清晰的。我们来总结一下 simpledb 处理"SET"消息在主线程和 coroutine 经历了哪些切换(忽略 co_create 利用回收的 cocoutine 时做的切换):

(raw_dispatch_message 函数) 主线程 -> (db_dispatch 函数) coroutine: skynet.ret 调用 coroutine_yield"RETURN" 让出执行 -> (suspend 函数,在 raw_dispatch_message(line 14) 被调用) 主线程: 处理"RETURN",并再次 resume -> coroutine: skynet.ret 返回,db_dispatch 函数返回,调用coroutine_yield"EXIT" 让出执行 -> (suspend 函数,在suspend(line 8) 被调用) 主线程: 处理"EXIT",suspend 函数返回,raw_dispatch_message 函数返回 -> 消息执行完毕。

以 agent 服务为例说明 skynet.call 调用。

上面解释了 simpledb 处理"SET"消息的流程,这条消息实际上是 agent 服务发送过去的,agent 也是接收到"client"类型的"set"。

agent 简化版代码如下:

local skynet = require "skynet"
local netpack = require "netpack"
local socket = require "socket"
local sproto = require "sproto"
local sprotoloader = require "sprotoloader"

local host
local send_request

local CMD = {}
local REQUEST = {}
local client_fd

function REQUEST:set()
    print("set", self.what, self.value)
    local r = skynet.call("SIMPLEDB", "lua", "set", self.what, self.value)
end

local function request(name, args, response)
    local f = assert(REQUEST[name])
    local r = f(args)
    if response then
        return response(r)
    end
end

local function send_package(pack)
    local package = string.pack(">s2", pack)
    socket.write(client_fd, package)
end

skynet.register_protocol {
    name = "client",
    id = skynet.PTYPE_CLIENT,
    unpack = function (msg, sz)
        return host:dispatch(msg, sz)
    end,
    dispatch = function (_, _, type, ...) -- "client" 类型消息 dispatch 函数
        if type == "REQUEST" then
            local ok, result  = pcall(request, ...)
            if ok then
                if result then
                    send_package(result)
                end
            else
                skynet.error(result)
            end
        else
            assert(type == "RESPONSE")
            error "This example doesn't support request client"
        end
    end
}

skynet.start(function()
    skynet.dispatch("lua", function(_,_, command, ...)
        local f = CMD[command]
        skynet.ret(skynet.pack(f(...)))
    end)
end)

看见代码发现 agent 服务处理两种类型的消息:“lua"和"client”。这里我们关注的是"client"消息,"client"消息的 dispatch 函数是调用 skynet.register_protocol 设置的,赋值给 dispatch 一个匿名函数,假设这个匿名函数叫 ag_client_dispatch 。当接收到客户端发送来的"set"消息(这里先不管那些不懂的函数,我们此时只关注执行流程),便会调用 REQUEST:set 函数,然后调用 skynet.call 向 simpledb 发送"set"消息,阅读 skynet 文档说 skynet.call 是阻塞的(阻塞调用 skynet.call 的coroutine),我们来看一下是如何阻塞的。

先看一下简化的 skynet.call 代码:

local function yield_call(service, session)
    local succ, msg, sz = coroutine_yield("CALL", session)
    return msg,sz
end

function skynet.call(addr, typename, ...)
    local p = proto[typename]
    local session = c.send(addr, p.id , nil , p.pack(...))
    if session == nil then
        error("call to invalid address " .. skynet.address(addr))
    end
    return p.unpack(yield_call(addr, session))
end

阅读发现 skynet.call 和 skynet.ret 有一些相似,不同的是 skynet.call 调用 coroutine_yield 传入的是"CALL",然后执行权回到主线程 suspend 函数,阅读 suspend 函数(千万别晕)代码发现此时仅仅是记录了 coroutine ,然后就返回了。 神马?神马?神马?suspend 函数没有做其他的事情就返回了,我们的 agent 服务对"set"消息的处理追踪定格在了 skynet.call(line 2) 行,当前这个 coroutine 未被回收,而是被标记了,然后本次 agent 对"set"消息的处理也就完毕了。

当 simpledb 接收到"set"消息并处理完,然后调用 skynet.ret 返回结果时,阅读 suspend(line 7) 此时给 agent 服务发送了一个类型为 1 的"lua"类型的消息。之后 agent 服务接收到此消息时,agent 服务主线程执行到函数 raw_dispatch_message ,由于 prototype 为 1 ,此时走到了 raw_dispatch_message(line 6) ,找到了上次标记的 coroutine ,并调用 resume 唤醒这个 coroutine 并传入了接收到的 msg 和 sz(这实际是 simpledb 服务发送来的),接着代码执行权来到 coroutine ,来到 skynet.call(line 2) ,coroutine_yield 函数返回并返回了接收到的消息。然后 skynet.call 函数执行完毕,执行权依旧是在 coroutine 中,然后回到 agent(line 16) ,接着继续执行,ag_client_dispatch 执行完毕,然后进行 coroutine 的回收,调用 coroutine_yield"EXIT" ,coroutine 也就执行完毕,执行权回到主线程,raw_dispatch_message(line 6) 行继续调用suspend 并传入"EXIT"命令,suspend 执行完后,raw_dispatch_message 也就执行完毕了,agent 对"set"消息的处理也终于结束了。总结一下:调用 skynet.call 导致 coroutine 被中间执行中断,等结果到达时(框架从 agent 服务消息队列取得相应的消息)才会从中断处继续执行。流程是这样的:

1) 第一次。

(raw_dispatch_message 函数) 主线程 -> (ag_client_dispatch 函数) coroutine: skynet.call 调用 coroutine_yield"CALL" 让出执行 -> (suspend 函数,在 raw_dispatch_message(line 14) 被调用) 主线程: 处理"CALL",suspend 函数返回,raw_dispatch_message 函数返回 -> 消息执行完毕。

2)第二次。

(raw_dispatch_message 函数) 主线程 -> coroutine: skynet.call(line 2) ,skynet.call 函数返回,ag_client_dispatch 函数返回,调用coroutine_yield"EXIT" 让出执行 -> (suspend 函数,在 raw_dispatch_message(line 6) 被调用) 主线程: 处理"EXIT",suspend 函数返回,raw_dispatch_message 函数返回 -> 消息执行完毕。

四、总结

大体上 coroutine 的执行流程就是这样的。我们始终保持一个理念:skynet 为每个服务创建一个 lua_State ,skynet 为每个消息的执行创建一个 coroutine ,阻塞 API 阻塞的是当前 coroutine ,服务本身不会被阻塞,可以继续处理其他消息。

代码取自skynet-v1.0.0-alpha,因为代码以后有可能变动,这里是以 1.0-alpha 为基准分析的。一次性打字描述好多函数调用有可能会描述错误(而且打字并没有那么直观:)),有错误的话,欢迎评论指出,我来修改。


发布于 2020-12-19 11:52