libuv记录

调整一下学习的顺序,从先看libuv,再看node.js。

  • v8引擎
  • libuv <=
  • node.js

主要参考:官网

基础

libuv不仅仅是一个在不同I/O轮询机制之上的简单抽象,它为sockets提供更高层次的’handles’与’streams’抽象,跨平台的文件I/O,线程、进程等功能。如图:
概览.png

  • 事件循环(event loop)
    事件循环.png

    事件循环是libuv的核心,它与一个独立的线程绑定。所有的网络I/O都在非阻塞的socket上执行。作为循环迭代的一部分,等待socket上I/O活动时,循环会阻塞,回调函数会立即触发,并指示socket的状态(可读、可写等),这样handles可以执行读写操作。

    • 首先执行是到期的times
    • 其次执行的是被挂起的callbacks,这些callback在本次轮询的前面,为什么会再前面呢,因为它们是在一次轮询前,推迟到本次的。
    • 空闲的handles
    • prepare handles、I/O轮询、check handles,这三个可以是轮询的前中后,AOP编程的思维,像是Nest框架中的Interceptors

    在事件驱动的编程中,应用向表示对特定事件的兴趣,当事件发生时进行相应。从操作系统收集事件,监视其他资源的工作由libuv来完成。用户可以注册回调,当事件触发时被调用。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    #include <stdlib.h>
    #include <uv.h>

    int main() {
    uv_loop_t *loop = malloc(sizeof(uv_loop_t));
    uv_loop_init(loop);

    printf("Now quitting.\n");
    uv_run(loop, UV_RUN_DEFAULT);

    uv_loop_close(loop);
    free(loop);
    return 0;
    }
  • handles 与 requests

    • 定义
      libuv提供两个抽象:handle与request,它们与event loop结合使用。
      handles: 代表可执行某些操作的持久对象,例如tcp server handle,一有新连接到来,就会执行回调函数。
      requests: 代表可以行某些操作的短期对象,即可以在handle里执行,也可以直接再event loop里执行。例如getaddrinfo request

      前文讲到,需要向libuv表示对某些事件的兴趣,这个怎么表示呢?这通过创建一个相应的handle来完成,handle可以简单表示成:uv_TYPE_t.例如:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      /* Handle types. */
      typedef struct uv_stream_s uv_stream_t;
      typedef struct uv_tcp_s uv_tcp_t;
      typedef struct uv_udp_s uv_udp_t;
      typedef struct uv_pipe_s uv_pipe_t;
      typedef struct uv_tty_s uv_tty_t;
      typedef struct uv_poll_s uv_poll_t;
      typedef struct uv_timer_s uv_timer_t;
      typedef struct uv_prepare_s uv_prepare_t;
      typedef struct uv_check_s uv_check_t;
      typedef struct uv_idle_s uv_idle_t;
    • 示例:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      #include <stdio.h>
      #include <uv.h>

      int64_t counter = 0;

      void wait_for_a_while(uv_idle_t* handle) {
      counter++;

      if (counter >= 10e6)
      uv_idle_stop(handle);
      }

      int main() {
      uv_idle_t idler;

      uv_idle_init(uv_default_loop(), &idler);
      uv_idle_start(&idler, wait_for_a_while);

      printf("Idling...\n");
      uv_run(uv_default_loop(), UV_RUN_DEFAULT);

      uv_loop_close(uv_default_loop());
      return 0;
      }
  • Streams 与 Buffers

    最基本的handle是stream(uv_stream_t),TCP socket、文件I/O的管道、IPC都是stream的子类。
    初始化之后,stream的基本操作如下:

    1
    2
    3
    4
    int uv_read_start(uv_stream_t*, uv_alloc_cb alloc_cb, uv_read_cb read_cb);
    int uv_read_stop(uv_stream_t*);
    int uv_write(uv_write_t* req, uv_stream_t* handle,
    const uv_buf_t bufs[], unsigned int nbufs, uv_write_cb cb);

    当uv_read_start()调用后,libuv自动从stream中持续读取数据,直到uv_read_stop()调用。
    每个单位的数据是buffer- uv_buf_t。两个基本属性:uv_buf_t.base,指向数据的指针;uv_buf_t.len,数据的长度。

Filesystem

在概览图中,文件I/O是建立再线程池基础之上的。Socket操作使用操作系统提供的非阻塞函数,而File操作内部使用是阻塞函数,通过线程池来触发该函数。

  • 主要函数

    1
    2
    3
    4
    5
    int uv_fs_open(uv_loop_t* loop, uv_fs_t* req, const char* path, int flags, int mode, uv_fs_cb cb)
    int uv_fs_close(uv_loop_t* loop, uv_fs_t* req, uv_file file, uv_fs_cb cb)
    void callback(uv_fs_t* req);
    int uv_fs_read(uv_loop_t* loop, uv_fs_t* req, uv_file file, const uv_buf_t bufs[], unsigned int nbufs, int64_t offset, uv_fs_cb cb);
    int uv_fs_write(uv_loop_t* loop, uv_fs_t* req, uv_file file, const uv_buf_t bufs[], unsigned int nbufs, int64_t offset, uv_fs_cb cb);
  • 直接读取方式:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    void on_open(uv_fs_t *req) {
    // The request passed to the callback is the same as the one the call setup
    // function was passed.
    assert(req == &open_req);
    if (req->result >= 0) {
    iov = uv_buf_init(buffer, sizeof(buffer));
    uv_fs_read(uv_default_loop(), &read_req, req->result,
    &iov, 1, -1, on_read);
    }
    else {
    fprintf(stderr, "error opening file: %s\n", uv_strerror((int)req->result));
    }
    }

    void on_read(uv_fs_t *req) {
    if (req->result < 0) {
    fprintf(stderr, "Read error: %s\n", uv_strerror(req->result));
    }
    else if (req->result == 0) {
    uv_fs_t close_req;
    // synchronous
    uv_fs_close(uv_default_loop(), &close_req, open_req.result, NULL);
    }
    else if (req->result > 0) {
    iov.len = req->result;
    uv_fs_write(uv_default_loop(), &write_req, 1, &iov, 1, -1, on_write);
    }
    }

    void on_write(uv_fs_t *req) {
    if (req->result < 0) {
    fprintf(stderr, "Write error: %s\n", uv_strerror((int)req->result));
    }
    else {
    uv_fs_read(uv_default_loop(), &read_req, open_req.result, &iov, 1, -1, on_read);
    }
    }

    int main(int argc, char **argv) {
    uv_fs_open(uv_default_loop(), &open_req, argv[1], O_RDONLY, 0, on_open);
    uv_run(uv_default_loop(), UV_RUN_DEFAULT);

    uv_fs_req_cleanup(&open_req);
    uv_fs_req_cleanup(&read_req);
    uv_fs_req_cleanup(&write_req);
    return 0;
    }

    这是一个边读边写的示例,再onCreate中,开始读一定的量,然后写到write_req中,再write_req的回调中,再触发读。
    libuv对操作系统的api进行了完整的封装,详细内容查看libuv的api

  • stream方式

    除了直接读取,还可以通过stream的方式来使用,通过将文件fd与pipe绑定来创建一个file pipe如:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    int main(int argc, char **argv) {
    loop = uv_default_loop();

    uv_pipe_init(loop, &stdin_pipe, 0);
    uv_pipe_open(&stdin_pipe, 0);

    uv_pipe_init(loop, &stdout_pipe, 0);
    uv_pipe_open(&stdout_pipe, 1);

    uv_fs_t file_req;
    int fd = uv_fs_open(loop, &file_req, argv[1], O_CREAT | O_RDWR, 0644, NULL);
    uv_pipe_init(loop, &file_pipe, 0);
    uv_pipe_open(&file_pipe, fd);

    uv_read_start((uv_stream_t*)&stdin_pipe, alloc_buffer, read_stdin);

    uv_run(loop, UV_RUN_DEFAULT);
    return 0;
    }

    void alloc_buffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
    *buf = uv_buf_init((char*) malloc(suggested_size), suggested_size);
    }

    void read_stdin(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
    if (nread < 0){
    if (nread == UV_EOF){
    // end of file
    uv_close((uv_handle_t *)&stdin_pipe, NULL);
    uv_close((uv_handle_t *)&stdout_pipe, NULL);
    uv_close((uv_handle_t *)&file_pipe, NULL);
    }
    } else if (nread > 0) {
    write_data((uv_stream_t *)&stdout_pipe, nread, *buf, on_stdout_write);
    write_data((uv_stream_t *)&file_pipe, nread, *buf, on_file_write);
    }

    // OK to free buffer as write_data copies it.
    if (buf->base)
    free(buf->base);
    }

Networking

tcp

这个过程与直接使用操作系统的过程类似

  • server

    服务器通过:init、bind、listen、accept来使用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    int main() {
    loop = uv_default_loop();

    uv_tcp_t server;
    uv_tcp_init(loop, &server);

    uv_ip4_addr("0.0.0.0", DEFAULT_PORT, &addr);

    uv_tcp_bind(&server, (const struct sockaddr*)&addr, 0);
    int r = uv_listen((uv_stream_t*) &server, DEFAULT_BACKLOG, on_new_connection);
    if (r) {
    fprintf(stderr, "Listen error %s\n", uv_strerror(r));
    return 1;
    }
    return uv_run(loop, UV_RUN_DEFAULT);
    }

    void on_new_connection(uv_stream_t *server, int status) {
    if (status < 0) {
    fprintf(stderr, "New connection error %s\n", uv_strerror(status));
    // error!
    return;
    }

    uv_tcp_t *client = (uv_tcp_t*) malloc(sizeof(uv_tcp_t));
    uv_tcp_init(loop, client);
    if (uv_accept(server, (uv_stream_t*) client) == 0) {
    uv_read_start((uv_stream_t*) client, alloc_buffer, echo_read);
    }

注意这里read数据时,是使用stream来使用

  • client

    client端通过connect来使用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    uv_tcp_t* socket = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
    uv_tcp_init(loop, socket);

    uv_connect_t* connect = (uv_connect_t*)malloc(sizeof(uv_connect_t));

    struct sockaddr_in dest;
    uv_ip4_addr("127.0.0.1", 80, &dest);

    uv_tcp_connect(connect, socket, (const struct sockaddr*)&dest, on_connect);

udp

libuv并不为udp提供stream,使用非阻塞的uv_udp_t handle与 uv_udp_send_t request来使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
uv_loop_t *loop;
uv_udp_t send_socket;
uv_udp_t recv_socket;

int main() {
loop = uv_default_loop();

uv_udp_init(loop, &recv_socket);
struct sockaddr_in recv_addr;
uv_ip4_addr("0.0.0.0", 68, &recv_addr);
uv_udp_bind(&recv_socket, (const struct sockaddr *)&recv_addr, UV_UDP_REUSEADDR);
uv_udp_recv_start(&recv_socket, alloc_buffer, on_read);

uv_udp_init(loop, &send_socket);
struct sockaddr_in broadcast_addr;
uv_ip4_addr("0.0.0.0", 0, &broadcast_addr);
uv_udp_bind(&send_socket, (const struct sockaddr *)&broadcast_addr, 0);
uv_udp_set_broadcast(&send_socket, 1);

uv_udp_send_t send_req;
uv_buf_t discover_msg = make_discover_msg();

struct sockaddr_in send_addr;
uv_ip4_addr("255.255.255.255", 67, &send_addr);
uv_udp_send(&send_req, &send_socket, &discover_msg, 1, (const struct sockaddr *)&send_addr, on_send);

return uv_run(loop, UV_RUN_DEFAULT);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
void on_read(uv_udp_t *req, ssize_t nread, const uv_buf_t *buf, const struct sockaddr *addr, unsigned flags) {
if (nread < 0) {
fprintf(stderr, "Read error %s\n", uv_err_name(nread));
uv_close((uv_handle_t*) req, NULL);
free(buf->base);
return;
}

char sender[17] = { 0 };
uv_ip4_name((const struct sockaddr_in*) addr, sender, 16);
fprintf(stderr, "Recv from %s\n", sender);

// ... DHCP specific code
unsigned int *as_integer = (unsigned int*)buf->base;
unsigned int ipbin = ntohl(as_integer[4]);
unsigned char ip[4] = {0};
int i;
for (i = 0; i < 4; i++)
ip[i] = (ipbin >> i*8) & 0xff;
fprintf(stderr, "Offered IP %d.%d.%d.%d\n", ip[3], ip[2], ip[1], ip[0]);

free(buf->base);
uv_udp_recv_stop(req);
}

其他

libuv中还对线程、进程、进程通信等进行了封装,我觉得这些不是libuv的核心,这里不做详细的介绍了。

  • 线程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    int main() {
    int tracklen = 10;
    uv_thread_t hare_id;
    uv_thread_t tortoise_id;
    uv_thread_create(&hare_id, hare, &tracklen);
    uv_thread_create(&tortoise_id, tortoise, &tracklen);

    uv_thread_join(&hare_id);
    uv_thread_join(&tortoise_id);
    return 0;
    }
  • 进程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    uv_loop_t *loop;
    uv_process_t child_req;
    uv_process_options_t options;
    int main() {
    loop = uv_default_loop();

    char* args[3];
    args[0] = "mkdir";
    args[1] = "test-dir";
    args[2] = NULL;

    options.exit_cb = on_exit;
    options.file = "mkdir";
    options.args = args;

    int r;
    if ((r = uv_spawn(loop, &child_req, &options))) {
    fprintf(stderr, "%s\n", uv_strerror(r));
    return 1;
    } else {
    fprintf(stderr, "Launched process with ID %d\n", child_req.pid);
    }

    return uv_run(loop, UV_RUN_DEFAULT);
    }

回顾

  • event loop
  • handle 与 request
  • stream
  • file
  • tcp && udp

问题

  • 为什么文件读取用的是线程池?

  • 为什么tcp是一个stream,而udp不是一个stream?