sensai2.0

326
15

1. libuv异步编程

node.js基于libuv实现了基本的事件驱动模式。

libuv和大多数事件驱动的库类似:一个主线程(loop thread)进行阻塞轮询,n个子线程(worker threads)处理各种事件;一旦事件处理完成,worker线程便设置相应标志位,然后主线程在下一周期轮询时收到消息,执行回调。

在类unix系统下,libuv的loop thread在轮询网络I/O事件时会使用IO复用API(http://sensai.powerpigger.cc/sensai/site/article/282)来保证高效的网络连接处理。与此同时,libuv也能以线程池的方式处理file io和cpu密集相关的任务:在worker threads中执行任务,完成之后设置标志位,loop thread轮询获取状态并执行回调方法。

可惜的是,node.js没有提供异步处理自定义操作的方法,这时即需要通过编写addon来实现异步回调(不阻塞js主流程)的自定义方法。


2. uv_work_t方法简介

​libuv中有两种线程对象:uv_thread_t和uv_work_t,uv_thread_t简单的封装了原生线程对象,不多展开。

uv_work_t是libuv封装的专门用于执行后台任务的方法,内部通过线程池来管理所有的work,线程池大小默认为4个,并且可以通过系统变量UV_THREADPOOL_SIZE设置

使用uv_queue_work来初始化,结构如下:


uv_work_s是uv_work_t的别名,而uv_work_t继承于uv_req_t

(由于uv是纯C语言写的,所以使用宏定义来实现的继承)


UV_REQ_FIELDS是一个宏定义,包含了uv_req_t中所有的字段。

uv_req_t定义了libuv所有异步对象的基类,uv_req_t的子类都可以使用异步回调的处理方式。

uv_queue_work方法第一个参数是libuv的事件循环体,第二个参数为新建的uv_work_t,后两个参数work_cb和after_work_cb分别为work的执行函数和work执行完成的回调函数。

根据Node.js的使用经验,很容易判断到传入的work_cb函数是在独立线程中跑的,而想要传的数据也可以放在继承的void * data的字段中。

利用uv_work_t实现一个多线程模拟js的setTimeout方法demo如下:

#include <stdlib.h>
#include <unistd.h>
#include <stdio.h>
#include <uv.h>

typedef unsigned long TIME_US;

void setTimeout(uv_work_t *req) {
    TIME_US time = *(TIME_US *)req->data;
    usleep(time);
}

void callback(uv_work_t *req, int status) {
    printf("delay %ldms", (*(TIME_US *)req->data / 1000));
}

unsigned long getSleepTime(int argc, char **argv) {
    if (argc == 1) {
        return 1000000;
    }
    return atoi(argv[0]);
}

int main(int argc, char **argv) {

    TIME_US sleepTime = getSleepTime(argc, argv);

    uv_loop_t *loop = uv_default_loop();
    uv_work_t req;
    req.data = (TIME_US *)malloc(sizeof(TIME_US));
    *(TIME_US *)req.data = sleepTime;

    uv_queue_work(loop, &req, setTimeout, callback);

    printf("start loop...\n");
    fflush(stdout);
    return uv_run(loop, UV_RUN_DEFAULT);
}

uv线程相关文档: http://docs.libuv.org/en/v1.x/guide/threads.html#id1


​3. 在v8中内嵌uv_work_t

在nodejs启动的时候已经执行过了uv_run的方法,所以在写nodejs异步方法的时候只需要调用uv_queue_work,并将nodejs层相关的上下文需要封装在data字段中即可。

考虑js异步方法的一般写法如下:

asyncTask(params, function (error, data) {
  ...
})

​data字段中会放置输入、输出和回调方法等相关变量,在work_cb方法中算出输出,最后在after_work_cb回调方法中执行js的回调方法,并吐出输出变量。

除此之外,data最好包含uv_work_t本身,因为uv_work_t是定义在v8::Local的方法中,如果单独定义在方法中,那么方法结束后uv_work_t也会被销毁;如果用new方法定义,那么还需要在after_work_cb中执行delete。

需要注意的是,这里的js回调方法需要以v8::Persistent来定义;原因很简单,回调方法与调用方法不在一个上下文作用域中,这里借用网上找的一幅图说明异步方法的整个调用流程:


依然考虑setTimeout的实现。

将v8与uv_work_t结合,实现模拟原生的setTimeout的方法实现如下:​

#include <v8.h>
#include <node.h>
#include <uv.h>

#include "sleep.h"

using namespace v8;

typedef struct {
    uv_work_t req;
    v8::Persistent<Function> callback;
    unsigned int timeout;
} TimeoutStruct;

static void asyncSleep(uv_work_t *req) {
    unsigned int timeout = static_cast<TimeoutStruct *>(req->data)->timeout;
    commonSleep(timeout);
}

static void afterAsyncSleep(uv_work_t *req, int status) {
    v8::HandleScope HandleScope(Isolate::GetCurrent());
    TimeoutStruct *timeoutWorker = static_cast<TimeoutStruct *>(req->data);

    v8::Local<Function> callback = v8::Local<Function>::New(Isolate::GetCurrent(),
            timeoutWorker->callback);

    callback->Call(Isolate::GetCurrent()->GetCurrentContext()->Global(), 0, NULL);
    timeoutWorker->callback.Reset();
    delete timeoutWorker;
}

void AsyncSleep(const FunctionCallbackInfo<Value> &args) {
    Isolate *isolate = args.GetIsolate();
    TimeoutStruct *timeoutWorker = new TimeoutStruct();

    if (1  == args.Length()) {
        timeoutWorker->timeout = 1;
        timeoutWorker->callback.Reset(isolate, v8::Local<Function>::Cast(args[0]));
    } else {
        timeoutWorker->timeout = args[0]->Uint32Value();
        timeoutWorker->callback.Reset(isolate, v8::Local<Function>::Cast(args[1]));
    }

    timeoutWorker->req.data = static_cast<void *>(timeoutWorker);

    uv_queue_work(uv_default_loop(), &timeoutWorker->req, asyncSleep, afterAsyncSleep);
    args.GetReturnValue().Set(Undefined(isolate));
}

void init(Local<Object> target) {
    NODE_SET_METHOD(target, "setTimeout", AsyncSleep);
}


NODE_MODULE(exports, init);

其中commonSleep方法可参考前一篇文章

和nodejs的setTimeout不同的是,这里实现的“setTimeout”占用一个uv线程池中的一个线程,而node自带的setTimeout是基于uv_timer_t实现的,所以同时setTimeout超过4个(UV_THREAD_POOL的默认大小)的时候后面的定时器就会开始阻塞。

如下所示:

const sleep = require('./build/Debug/async_uv.node')

for (let i = 0; i < 10; i++) {
  sleep.setTimeout(3, () => {
    console.log(new Date(), `timer: ${i} callback`)
  })
}
console.log('continue...')

可以看到,只有前4个定时器延时了3s,而后续的都是顺延6s和9s


当然,可以在js脚本内通过设置环境变量来加大线程池size:

process.env.UV_THREADPOOL_SIZE = 10
加上这句话,就可以同时10个setTimeout同时在3s内返回了。


4. 基于nan的AsyncWorker

​nan中的AsyncWorker类封装了uv_work_t相关的一系列方法,便于开发人员使用。

AsyncWorker类基本结构如下:

class AsyncWorker {
 public:
  explicit AsyncWorker(Callback *callback_);

  virtual ~AsyncWorker();

  virtual void WorkComplete();

  void SaveToPersistent(const char *key, const v8::Local<v8::Value> &value);

  void SaveToPersistent(const v8::Local<v8::String> &key,
                        const v8::Local<v8::Value> &value);

  void SaveToPersistent(uint32_t index,
                        const v8::Local<v8::Value> &value);

  v8::Local<v8::Value> GetFromPersistent(const char *key) const;

  v8::Local<v8::Value> GetFromPersistent(const v8::Local<v8::String> &key) const;

  v8::Local<v8::Value> GetFromPersistent(uint32_t index) const;

  virtual void Execute() = 0;

  uv_work_t request;

  virtual void Destroy();

 protected:
  Persistent<v8::Object> persistentHandle;

  Callback *callback;

  virtual void HandleOKCallback();

  virtual void HandleErrorCallback();

  void SetErrorMessage(const char *msg);

  const char* ErrorMessage();
};

使用时只需要继承之,加上输入输出的字段,然后override Execute和WorkComplete方法即可

最后通过方法Nan::AsyncQueueWorker来初始化worker。

setTimeout的示例如下:

#include <node.h>
#include <nan.h>
#include "sleep.h"
using namespace Nan;

class TimerWorker: public Nan::AsyncWorker {

public: 
    TimerWorker(Nan::Callback *callback, unsigned int seconds):
        Nan::AsyncWorker(callback), seconds(seconds) {}

    void Execute() {
        commonSleep(this->seconds);
    }

    void WorkComplete() {
        Nan::HandleScope handleScope;
        this->callback->Call(0, NULL);
    }

private:
    unsigned int seconds;
};

NAN_METHOD(setTimeout) {
    unsigned int seconds;
    Nan::Callback *callback;

    if (1 == info.Length()) {
        seconds = 1;
        callback = new Callback(info[0].As<v8::Function>());
    } else {
        seconds = info[0]->Uint32Value();
        callback = new Callback(info[1].As<v8::Function>());
    }
    Nan::AsyncQueueWorker(new TimerWorker(callback, seconds));
};

NAN_MODULE_INIT(init) {
    Nan::Set(
        target,
        Nan::New<v8::String>("setTimeout").ToLocalChecked(),
        Nan::GetFunction(Nan::New<v8::FunctionTemplate>(setTimeout)).ToLocalChecked()
    );
};

NODE_MODULE(exports, init)
与直接用libuv原生方法相比,nan隐藏了更多的细节,接口格式对人更加友好,兼容性也更强,适合生产环境下使用。

具体参考: https://github.com/nodejs/nan/blob/master/doc/asyncworker.md

回复

对话列表

×