perrynzhou

专注于系统组件研发

0%

TCP协议初探

  • 发送方对一个ACK应该等待多长时间?

  • 如果ACK丢失怎么办?

    • 如果一个ack丢失了,发送方式是不能轻易的识别ACK丢失和原分组的丢失的情况,所以发送方会再次发送原分组
  • 如果分组被接受了,但是分组有错误怎么办?

    • 使用编码技术检测一个大的分组的差错一般都是简单,仅仅使用比自身小很多的一些比特即可纠正。当接受方接受到有差错的分组,接收方是不能发送ACK,最后发送方重新发送无差错的分组
  • 如果接受方接收到了重复分组怎么办?

    • 发送方针对每个分组会有一个序列号,这个序列号是有分组自身携带,接受方可以使用这个序列号来判断它是否已经接受到这个分组,如果是则丢弃
  • 什么是分组窗口和滑动窗口?

    • 定义一个分组窗口作为已经被发送方发送但还没完成收到ACK确认的分组的集合,这是分组窗口;这个分组窗口的数量叫做窗口大小;发送方和接受方都存在滑动窗口,对于发送方,滑动窗口记录发送已确认的分组(可释放)、发送未收到确认的分组、即将发送的分组。对于接收方,滑动窗口记录已接受和确认的分组、期望接受分组、即将接受可能因为内存限制而被丢弃的分组。
  • 什么是流量控制和拥塞控制?

    • 在接受方接受分组的速度跟不上发送方发分送分组的速度,会强迫发送方把发送分组的速度降下来,这个称谓流量控制;流量控制有2种方式,一种是基于速率,它给发送方指定某个发送的速率,同时确保数据永远不会超过这个发送速率发送,这个仅仅适合流应用程序,可用于广播和组播。另外一种是基于窗口流量控制,是使用滑动窗口,在这个方法里,滑动窗口大小不固定,随着时间而变化。必须有一种方法让接受方可以通知发送方应该使用多大的滑动窗口,这个接受方通知发送方的窗口叫做窗口更新。
    • 在发送方和接受方之间可能会有因为有限内存的路由器,它们和低速网络链路抗争着,当这种情况发生时候,发送方发送分组的速率可能超过某个路由器的能力,从而导致丢包,这种情况是有拥塞控制的流量控制方式来处理。拥塞控制涉及发送方降低发送速度,不至于压垮其和接受方之间的网络。
  • 如果设置重传超时?

    • 发送方在重发一个分组之前等待的时间包括:发送分组所用时间、接受方处理分组时间、接受方发送ACK的时间、发送方接受ACK所用时间。在这些时间都是不确定的。比较好的策略让协议实现尝试去评估它们,这称为往返时间评估,这是一个统计过程。
  • TCP服务有什么特点?

    • TCCP虽然TCP和UDP都使用相同的网络层(IP层),但是TCP给应用程序提供一种和UDP完全不同的服务,TCP提供一种面向连接、可靠的字节流的服务,面向连接是指TCP的两个应用程序必须在他们可交换数据之前,通过相互联系来建立一个TCP连接。最经典的比喻,拨打一个电话号码,等待另外一方接电话并说“喂”,然后说“找谁?”,这正是一个TCP连接的两个端点之间的通信,广播和组播不存在于TCP,存在于UDP。
    • TCP没有消息边界,是一种流式服务。
  • TCP如何保证可靠性?

    • TCP提供一个字节流接口,TCP必须把一个发送应用程序的数据转换为一组IP可以携带的分组,这个分组叫做组包,这些分组包含序列号,这个序列号实际代表每个分组的第一个字节在整个应用程序数据中的字节偏移量,而不是分组号。
    • TCP传给IP层的块叫做报文段,应用程序数据被打散成TCP人为最佳的大小块来发送,使得每个报文段按照不会被IP层数据报的大小来划分
    • TCP发送一组报文段时会设置一个重传计时器,等待对方的确认接收,TCP不会为每个报文段设置一个不同的重传计时器,相反,发送一个窗口的数据,它仅仅设置一个计时器,当ACK到达时候在更新计时器;如果ACK么有及时接收,这个报文段会被重传。
    • 当TCP接收到连接的另一端数据时候,它会发送一个ACK确认给另一端,告知另一端数据已经接受。这个ACK确认可能不会立即发送,一般会延迟发送。
    • TCP给应用程序提供一种双工服务,数据可以在两个方向上流动,两个方向互相独立,因此连接的每个端点必须对每个方向维持一个数据流的一个序列号,一旦建立一个连接,这连接的一个方向上的包含数据流的每个TCP报文段也包含了相反方向上的报文段的一个ACK。
    • TCP接受端使用序列号来丢弃重复的报文段和记录杂乱次序到达的报文段,TCP使用IP来传递它的报文段,IP不提供重复消除、保证次序正确的功能。然而,因为TCP是一个字节流协议,TCP绝不会以杂乱次序给接收应用程序发送数据,因此TCP接收端可能会被迫先保持大序列号的数据不交给应用程序,知道缺失的小序列号的报文段被填满。
  • TCP头部和封装是什么样的?

    • 每个TCP头部都包含源和目标的端口,这两个值和IP头部的源和目标IP地址一起唯一的标识每个连接,TCP术语中,一个IP地址和一个端口的组合有时被称为套接字或者端点。每个TCP连接由一对套接字或端点(四元组,客户端IP、客户端端口、服务器IP,服务器端口唯一标识。
    • 确认字段(ack)包含德玛值是该确认号发送方期待接受下一个序列号,即最后发送成功的数据字节的序列号加1
    • 当建立一个新连接时,从客户机发送至服务器的第一个报文段的SYN位字段被启用,这个报文段称为SYNC报文段

nginx基本配置说明

nginx信号控制

信号 作用 用法
SIGTERN或者SIGINT 快速停止nginx服务 kill -SIGTERM {nginx_pid} 或者 kill -SIGINT {nginx_pid}
SIGQUIT 平缓停止nginx服务 kill -SIGTQUIT {nginx_pid}
SIGHUP 平缓重启nginx服务 kill -SIGHUP {nginx_pid}
  • 快速停止nginx:停止当前nginx服务正在处理的所有网路请求,立即丢弃连接,停止工作
  • 平缓停止nginx:允许nginx服务将当前正在处理网络连接请求处理完毕,但是不在接受新的请求,之后关闭连接、停止工作
  • 平缓重启nginx:平缓停止所有进程,使用新的配置启动nginx进程

nginx配置结构

  • 全局块:默认是从配置文件开始到events的一部分内容,主要影响nginx服务器整体运行的参数,包括用户运行的组、最大的worker进程数
  • events块:主要影响nginx服务器与用户的网络连接,比如是否允许同时接受多个连接请求、选择哪一种事件驱动模型处理请求(select/epoll)
  • http块:这是nginx服务器配置代理、缓存、日志定义的大部分的功能,http块可以包括server块,server块可以包括localtion块
  • server块:主要用于配置虚拟主机
  • localtion块:每个server块可以包括多个location块,严格来说,location是server的一个指令,location块主要作用基于nginx服务器接收到请求的字符串(server_name/uri-string),除对虚拟主机名称之外的字符串进行匹配,对特定的请求进行处理

nginx核心配置

  • worker_process {num} | auto:这是非常核心的参数,也是nginx的实现并发服务的核心,该参数配置nginx的worker进程的数量
  • pid {pid_path}:nginx是以daemon的方式运行,master进程的pid的值会写入到nginx配置文件中 pid {path}指定的路径中。
  • error_log {file_path} {log_level}:配置nginx的日志
  • accept_mutex on|off:防止nginx的惊群配置,默认开启
  • use {method}:选择事件驱动模型
  • worker_connetions {number}:用来设置允许每一个worker_process同时开启的最大连接数
  • sendfile on|off:零拷贝方式的文件传输
  • keepalive_timeout {timeout} {header_timeout}:nginx可以保持连接打开一段时间
  • keepalive_requests {number}:nginx服务器和用户建立回话连接后,用户端通过此连接发出请求,这个参数用于限制用户通过某一连接向nginx服务器发送请求的次数

memcached 有几种类型的线程?

memcached有2类线程,一类是main线程,一类是worker线程,main线程负责监听listenfd放到main_base中进行监听,一旦listenfd中有IO时间,也就是有新的连接进来,则转发通过dispatch_conn_new函数初始化一个CQ_ITEM,把对应的clientfd封装在CQ_ITEM中,同时把item放到线程的CQ_ITEM队列中,然后写入标记数据到线程的notify_send_fd中,worker线程则是监听工作线程中的notify_recv_fd中的IO事件,根据读取到的标记数据来初始化一个新连接,接着处理这个链接,释放CQ_ITEM

memcached中工作线程如何和main线程交互的

工作线程会有一个管道来实现和main线程进行通信的,工作线程通过pipe系统调用初始化一对FD。

  • memcached中的工作线程的结构
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    typedef struct {
    pthread_t thread_id; /* 工作线程ID*/
    struct event_base *base; /* libevent的句柄 */
    struct event notify_event; /* listen event */
    int notify_receive_fd; /* 客户端线程监听这个套接字接受来自服务端通知,比如新连接 */
    int notify_send_fd; /* 主线程有消息,会往这个fd中进行写入数据 */
    struct thread_stats stats; /* 线程的状态 */
    struct conn_queue *new_conn_queue; /* 该队列存储CQ_ITEM把服务端接受到的客户端fd状态为CQ_ITEM,放到这个队列中 */
    cache_t *resp_cache; /* response objects */
    cache_t *rbuf_cache; /* static-sized read buffers */
    cache_t *io_cache; /* IO objects */
    void *storage; /* data object for storage system */
    logger *l; /* logger buffer */
    void *lru_bump_buf; /* async LRU bump buffer */
    char *ssl_wbuf;
    } LIBEVENT_THREAD;
  • memcached的工作线程的初始化函数
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    	//memcached的工作线程的初始化函数
    void memcached_thread_init(int nthreads, void *arg) {
    //每个工作线程的初始化
    threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
    if (! threads) {
    perror("Can't allocate thread descriptors");
    exit(1);
    }

    for (i = 0; i < nthreads; i++) {
    int fds[2];
    if (pipe(fds)) {
    perror("Can't create notify pipe");
    exit(1);
    }
    //当有新连接时候,主线程会选择性的写数据到线程的notify_send_fd,然后工作线程在从notify_receive_fd读取数据,然后取出连接开始处理
    //工作线程从notify_receive_fd读取数据
    threads[i].notify_receive_fd = fds[0];
    //主线程从notify_send_fd写入数据
    threads[i].notify_send_fd = fds[1];
    //设置每个线程中event监听notify_receive_fd中的数据
    setup_thread(&threads[i]);
    /* Reserve three fds for the libevent base, and two for the pipe */
    stats_state.reserved_fds += 5;
    }

    /* Create threads after we've done all the libevent setup. */
    //启动每个工作线程,每个线程执行event_base_loop
    for (i = 0; i < nthreads; i++) {
    create_worker(worker_libevent, &threads[i]);
    }
    }

memcached主线程如何初始化服务端的socket?

在这里我们需要大概了解下main函数的执行逻辑前基本介绍全局的数据结构如下:

  • listen_conn:全局的监听套接字的连接

  • max_fds:默认最大支持的连接数

  • main_base:主线程的libevent的核心结构体

  • conns:全局的连接数组,类型是conn指针类型

  • 基本的执行逻辑如下

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    //函数入口,在memcached的入口
    main
    //初始化全局的conns,max_fds = settings.maxconns + headroom + next_fd;
    conn_init();
    //main线程的event初始化
    main_base = event_init();
    //和server_socket类型,不同的仅仅是制定网卡绑定
    server_sockets
    //main线程通过socket/bind/listen来初始化listen_conn,设置该连接的类型,listen_conn中的sfd放到main_base监听
    server_socket
    //根据sfd初始化一个conn放到main_base中,设置回函数event_handler
    conn_new
    //初始化工作线程
    memcached_thread_init
    //设置工作线程,把notify_receive_fd放到工作线程的event_base中监听
    setup_thread
    //设置notify_receive_fd中IO事件处理函数
    thread_libevent_process
    //启动单独线程开始监听notify_receive_fd的IO时间
    worker_libevent
    event_base_loop
    event_base_free

memcached主线程如何处理一个新来的连接

  • 服务端的socket(listenfd)监听套接字封装成conn,设置该连接状态为conn_listening,放到main_base中,一旦有listenfd有事件,event_handler就会被触发,则调用drive_machine函数接受客户单请求套接字
    1
    2
    3
    4
    //针对main线程,把服务端的listenfd放到main_base中,针对工作线程则注册
    event_handler notify_receive_fd注册到event_base中
    drive_machine //接受来自客户端IO请求,这是最重要的函数
    dispatch_conn_new //把请求的FD封装到CQ_ITEM中,然后放到工作线程的队列中,然后通知工作线程有新连接进去
  • 服务端调用drive_machine接受客户端fd后,调用dispatch_conn_new函数客户端的FD封装成CQ_ITEM
  • 客户端的FD封装成CQ_ITEM后,根据哈希取模策略选择一个线程,把CQ_ITEM放到该工作线程的new_conn_queue中,然后往该线程的notify_send_fd写入数据
  • 由于工作线程在memcached启动后就已经初始化几个工作线程,然后每个线程中调用thread_libevent_process监听该线程中的notify_recv_fd中的IO事件,该工作线程从notify_recv_fd中读取到了数据,从自己线程队列中取出一个CQ_ITEM,初始化conn,放到全局conns数组中,这个数据的下表就是客户请求的FD,然后释放CQ_ITEM资源,到此整个memcached的从主线程接受请求,到工作线程接受和初始化conn结束,后面就是处理这个客户端请求

memcached注释

背景

  • 目的是分析nginx的内存池模块,了解内存池相关知识
  • nginx为什么要这样设计内存池

基本数据结构介绍

  • ngx_pool_s:nginx的内存池结构,定义如下
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    struct ngx_pool_s {
    //d:保存可用内存地址信息
    ngx_pool_data_t d;
    //max:申请大块的的基本条件是申请大小超过max
    size_t max;
    //current:指向当前内存池结构,初始化时候指向它自己
    ngx_pool_t *current;
    ngx_chain_t *chain;
    //large:表示大块内存,通过单向链表表示
    ngx_pool_large_t *large;
    ngx_pool_cleanup_t *cleanup;
    ngx_log_t *log;
    };
  • ngx_pool_data_t:存储内存小于(4k-1)的内存块,其结构定义如下
1
2
3
4
5
6
7
8
9
typedef struct {
//可用内存的开始地址
u_char *last;
//可用内存的末尾地址
u_char *end;
//指向下一个内存pool
ngx_pool_t *next;
ngx_uint_t failed;
} ngx_pool_data_t;
  • ngx_pool_large_s:存储内存超过(4k-1)字节的内存块,其结构定义如下
1
2
3
4
struct ngx_pool_large_s {
ngx_pool_large_t *next;
void *alloc;
};

基本函数介绍

  • ngx_create_pool:nginx的内存池创建
  • ngx_destroy_pool:销毁一个内存池
  • ngx_reset_pool:重置一个内存池中的小块内存,同时释放大块内存
  • ngx_palloc:通过系统调用函数申请内存
  • ngx_pnalloc:从内存池中申请内存
  • ngx_pmemalign:按照对齐方式方式申请large结构体内存
  • ngx_pfree:释放大块内存

内存池函数注释

ngx_palloc.h
ngx_palloc.c

简单的网络并发服务器

并发服务基本介绍

  • 在网络服务器设计中,当用户请求达到10W以及更多时候,服务端处理客户端请求能力就非常重要,在业界一般是采用异步网络IO+多线程方式来做这种方案。本例中仅仅是采用多进程的方式模拟这种场景,主要的目的是搞清楚涉及到网络编程中的函数基本原理

基本函数介绍

  • socket函数:指明需要绑定什么版本的协议和具体协议类型,比如协议版本ipv4或者ipv6,具体的协议是tcp/udp/sctp协议
  • bind函数:设置服务器端的地址,和socket函数返回的套接字绑定
  • listen函数:监听这个套接字,同时设置内核的连接的最大队列,一个是未完成队列,一个是完成队列,未完成队列是程序发起syn之后和未完成三次握手之前,此时连接会被放到服务器端的未完成队列中;当完成三次握手,为被应用程序使用之前,此时这种连接会被放到服务器端TCP的完成队列中。
  • accept函数:该函数就是从服务端的TCP的完成队列中取出一个可用连接以供应用程序使用
  • connect函数: 程序发起syn包到服务端,之后的所有操作都是有tcp协议栈来做

样例代码中的疑惑

fork后为啥需要关闭listenfd?
讲道理父进程fork之后,在子进程中关闭了listenfd,子进程不需要读写整个listenfd的任何资源,子进程仅仅是一个工作进程。这个时候不是应该触发四次挥手(发送fin)断开的操作吗?在这里需要说清楚,fork之后,父子进程会共享listenfd和connfd,此时在内核看来,listenfd和connfd的文件描述符的引用计数在fork之前是1,fork后父子进程共享这2个文件描述符,这2个文件描述符的应用计数为2,在子进程中close(listenfd)不会触发发送fin包,仅仅是listenfd的引用计数从2减到了1而已。
那如果要真正来发送fin包该如何做?可以调用shutdown函数实现发送fin包,断开连接
父进程中为什么也要关闭connfd?
比如子进程处理connfd时间很长,在实例代码中,fork后返回2次,返回到父进程时候已经执行了close(connfd),那子进程不是读取数据失败?其实不是这样的,connfd和上面描述一样,都是引用计数,connfd的引用计数由原来的2减到1而已。这里关闭也有另外一个原因,父进程仅仅复杂监听来自客户单请求,不需要处理客户单请求。

基本实例代码

  • 代码
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    /*************************************************************************
    > File Name: concurrent_tcp_server.c
    > Author:perrynzhou
    > Mail:perrynzhou@gmail.com
    > Created Time: Wednesday, July 29, 2020 AM09:06:42
    ************************************************************************/

    #include <stdio.h>
    #include <stdint.h>
    #include <stdlib.h>
    #include <string.h>
    #include <assert.h>
    #include <arpa/inet.h>
    #include <fcntl.h>
    #include <sys/socket.h>
    #include <sys/types.h>
    #include <netdb.h>
    #include <ifaddrs.h>
    #include <unistd.h>
    #define TCP_MIN_BACKLOG (2048)
    static void fetch_ip_from_host(char *buf, size_t buf_size)
    {
    struct ifaddrs *ifaddr, *ifa;
    int family, s;
    if (getifaddrs(&ifaddr) != -1)
    {
    const char *local_address = "127.0.0.1";
    size_t local_address_len = strlen(local_address);
    for (ifa = ifaddr; ifa != NULL; ifa = ifa->ifa_next)
    {
    if (ifa->ifa_addr == NULL)
    {
    continue;
    }
    s = getnameinfo(ifa->ifa_addr, sizeof(struct sockaddr_in), buf, NI_MAXHOST, NULL, 0, NI_NUMERICHOST);
    if (s != 0)
    {
    continue;
    }
    if (ifa->ifa_addr->sa_family == AF_INET && strncmp(buf, local_address, local_address_len) != 0)
    {
    break;
    }
    bzero(buf, buf_size);
    }
    freeifaddrs(ifaddr);
    }
    }
    void fetch_ip_from_fd(int client_fd, char *address, size_t address_size)
    {
    struct sockaddr_in addr;
    socklen_t addr_size = sizeof(struct sockaddr_in);
    int res = getpeername(client_fd, (struct sockaddr *)&addr, &addr_size);
    strncpy(address, inet_ntoa(addr.sin_addr), address_size);
    size_t alen = strlen((char *)&address);
    snprintf((char *)&address + alen, address_size - alen, ":%d", htons(addr.sin_port));
    }
    static int init_socket(int domain, int type, int protocol, int backlog, const char *addr, int port)
    {
    int sock = socket(domain, type, protocol);
    assert(sock != -1);
    struct sockaddr_in serveraddr;
    serveraddr.sin_family = domain;
    serveraddr.sin_port = htons(port);
    assert(inet_pton(AF_INET, addr, &serveraddr.sin_addr) > 0);
    assert(bind(sock, (struct sockaddr *)&serveraddr, sizeof(serveraddr)) != -1);
    if (type == SOCK_STREAM)
    {
    int opt = 1;
    setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
    listen(sock, backlog);
    }
    return sock;
    }
    inline static int init_tcp_server_socket(const char *addr, int port, int backlog)
    {
    int real_backlog = (backlog < TCP_MIN_BACKLOG) ? TCP_MIN_BACKLOG : backlog;
    return init_socket(AF_INET, SOCK_STREAM, IPPROTO_TCP, real_backlog, addr, port);
    }
    inline static int init_tcp_client_socket(const char *addr, int port)
    {
    int sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
    assert(sock != 0);
    struct sockaddr_in srvaddr;
    srvaddr.sin_family = AF_INET;
    srvaddr.sin_port = htons(port);
    assert(inet_pton(AF_INET, addr, &srvaddr.sin_addr) > 0);
    return sock;
    }
    int main(int argc, char *argv[])
    {
    int port = atoi(argv[1]);
    char addr[128] = {'\0'};
    fetch_ip_from_host((char *)&addr, 128);
    int listenfd = init_tcp_server_socket((char *)&addr, port, 1024);
    fprintf(stdout, "::server run on %s:%d\n", (char *)&addr, port);
    int connfd = -1;
    pid_t pid;
    for (;;)
    {
    struct sockaddr_in cliaddr;
    socklen_t len = sizeof(cliaddr);
    connfd = accept(listenfd, (struct sockaddr *)&cliaddr, &len);
    fflush(stdout);
    if ((pid = fork()) == 0)
    {
    fetch_ip_from_fd(connfd, (char *)&addr, 128);
    close(listenfd);
    fprintf(stdout, "%d process handle %s connection\n", getpid(), (char *)&addr);
    exit(0);
    }
    close(connfd);
    }
    }

容器网络分析

容器二层网络分析

特点

  • 容器IP和节点IP在同一个网段
  • 二层协议通过ARP广播找到对方mac地址进行通信

基本环境

  • 物理节点上会有一个虚拟网卡对(veth-pair)、虚拟网桥。虚拟网卡对,一个在容器内(veth0),另外一个虚拟网卡(veth1)在物理节点的连接虚拟网桥上的veth1.
  • contain1的ip和node1的IP是同一个网段的
  • container1的默认网关是node1的IP地址
  • 两个节点之间是二层的交换机,通过ARP广播进行通信
  • 容器启动时候,会在容器内设置该节点的mac地址

通信过程

  • container-1和container-2进行通信,首先container-1的发起ARP广播,因为container-1是默认网关是node-1,container-1通过虚拟网桥把数据包发到了node-1上
  • node-1 发现container-1 的目标IP不是自己的IP地址(这里涉及到了netfileter,这个模块是iptables的基础)container-1的目标IP是container-2的IP,在同一个网段可是认为是局域网,但是container-1的目标IP是容器,没有直接连接二层交换机。于是node-1进行ARP广播,询问谁是192.168.1.102,node-2接受到广播,发现不是自己的,node-2不会应答。但是node-2上有一个container-2的容器的IP却是192.168.1.102.
  • 基于二层网络的容器必须有sdn的功能,二层网络的交换机上代答(伪造arp应答),有这个基础,node-1进行ARP广播后,二层交换机进行代答,告诉node-1的container-2的mac地址是node-2.
  • container-1的数据,到了node-1,交换机告诉node-1,container-2的mac是node-2,于是node-1通过交换机把数据送到了node-2.在这里container-1发送给container-2的包的mac地址已经变更为node-2的mac地址。
  • node-2接受到数据,发现数据不是自己的,走自己节点的路由表,发现自己有一条路由规则,凡是发往container-2的IP的数据都是走容器的虚拟网桥,虚拟网桥(docker-bridge)连接虚拟网卡对,通过这对虚拟网卡通过网桥把node-2的数据送到container-2里面,至此2个容器完成二层的通信。

性能

  • 走二层没有包的封装,所以性能非常好
  • 容器和节点之间可以直通

容器三层网络分析

特点

  • 容器IP和节点IP在不同网段
  • 三层协议通过IP层寻找对方节点,大部分基于BGP协议来做,BGP协议就是一个同步路由规则的广播协议

通信过程

  • container-1发送数据到容器的默认网关(node-1的IP),node-1会收到数据包,此时经过netfilter模块。
  • 此时calico bgp会下发一条理由规则,凡是发送container-2的IP数据包,通过某个网卡(eth0)发往一个网关(网关地址是node-2的IP),例如:172.138.1.102 –eth0 –>192.168.1.202 ,这样的规则
  • 因为192.168.1.201和192.168.1.201分别是node1和node2的IP,此时数据包通过路由器,把数据发送给node-2
  • node-2收到数据后,查找node-2的路由表,发现有一个条路由规则,凡是发送给72.138.1.102的数据都发往容器的网桥,然后经过容器网桥把数据送到container-2.至此2个容器通信结束

Linux网络内核概述

Linux网络栈

  • 物理层(L1):提供电信号和一些底层细节
  • 数据链路层(L2):处理端点间的数据传输,最常见的数据链层的标准是以太网,Linux以太网设备驱动运行在这一层
  • 网络层(L3):负责数据包转发和主机编址,Linux内核网络子系统中最常用的是IPv4和IPv6协议
  • 传输层(L4): 完成节点间的数据发送,TCP和UDP是最常见的网络协议
  • 应用层:应用层协议,比如http/https、ftp协议

Linux网络栈的本质

  • Linux内核协议栈任务就是将接收到的数据数据包从数据链路层(L2层,,网络设备驱动)传递给网络层(L3层,IPv4或者IPv6),如果数据包的目的地是当前设备,Linux网络协议栈将数据包传递给传输层(L4层,同时是TCP/UDP);如果数据包需要转发,网络协议栈交给L2层进行传输。对于本设备需要发送的数据包,将从L4依次传递给L2,再有网络驱动程序进行传输,在传输节点可能会发生如下节点:
1
2
3
4
5
6
1.根据协议规则(如IPsec规则或者NAT规则),可能需要对数据包进行修改
2.数据包可能会被丢弃
3.数据包可能导致设备发送错误消息
4.可能会对数据包进行分段
5.可能会对数据包进行重组
6.需要计算数据包的校验和

网络设备

  • 混杂模式:在linux 网络协议栈实现中,有一个net_device表示设备的数据结构,其中有一个promsicuity字段,该字段值大于0,网络协议栈不会丢失那些目的地并非本地主机的数据包,这种模式就是混杂模式,通常该模式用户网络调试。

  • 网络设备中的API:老的网络设备驱动是在中断模式下进行工作的,没接受一个数据包,就需要中断一次,大量实践表明这种中断模式在负载很高的情况下效率非常低下。为了解决这个问题引入了NAPI,当前几乎所有的Linux都支持这种技术,NAPI是在linux kernel 2.5/2.6引入,采用该NAPI,如果负载很高,网络设备驱动将以轮询模式工作,而不是中断模式工作,这就意味着不会再每次接受数据包时候触发中断,而是将数据包放到内核缓存区中,由kernel不断的轮询来取数据包

  • 网络驱动的工作

    • 数据包接受目的地为当前主机,将其传递给网络层(L3),然后传递给传输层(L4)
    • 传输当前主机发送出去的数据包或者转发当前主机接受的数据包
    • 每个数据包不管是接受还是发送出去的,都需要路由子系统执行一次查找,根据查找结果来决定数据包的处理。
    • 每个数据包在经过路由子系统之前会经过netfilter子系统的处理,netfilter子系统在网络栈中5个位置会注册回调函数,数据包经过netfileter的第一个回调函数处理,结果用verdict表示,这个值为NF_DROP,则数据包将会被丢弃;如果为NF_ACCEPT,则数据包继续传输到其他层。netfilter为用户态的iptables提供的基础的架构。
  • 套接字缓冲区

    • linux 网络内核使用sk_buff结构表示一个包含报头的入站(接受路径)或者出站(传输路径)的数据包。
    • 使用skb时候必须遵循skb api,比如需要skb->data指针向前移动,必须通过skb_pull_inline或者skb_pull函数。要冲skb_buff中取回传输层报头,需要调用skb_transport_header函数,同样的,要取回网络层的报头,必须调用skb_network_header函数;取回数据链路层的报头,必须调用skb_mac_header函数,这三个方法都是入参都是skb_buff的结构体。
    • 从物理层接收到数据包后,网络设备驱动会分配一个skb(通过netdev_alloc_skb函数),在数据传输过程中,有时候需要丢失数据包则调用kfree_skb函数。skb_buff中的某些成员是有数据链路层决定的,比如IPv4数据包由14个字节的以太网报头、20到60个字节的IPv4报头、8个字节的UDPv4的报头,最后是数据包的有效载荷。每个skb_buffer的实例都包含了一个net_device的实例,对于接受到来的数据包,这个成员表示接受它的网络设备;对于发送出去的数据包,这个成员则表示发送它的网络设备。

nginx 如何worker进程的请求处理的负载均衡

  • nginx 自身实现避免了进群效应,每一个请求上来仅仅只会有一个worker_process处理,nginx 作为静态资源服务器,最大请求连接数为:worker_process * connections;如果作为反向代理,则支撑的最大连接数为worker_process * conenctions/2,因为反向代理需要连接后端的服务,同时也会连接前端请求的。那nginx如何做到请求处理的负载均衡?
  • nginx 核心逻辑调试
1
2
3
4
5
6
7
$ ./configure --with-debug --with-cc-opt='-g -O0' --prefix=/usr/local/nginx
$ make -j4
$ make install

//在配置文件中添加调试配置
daemon off;
master_process off;
  • ngx_worker_process_cycle是每个worker进程核心执行的函数,所有的请求处理都是通过这个函数入口进行
    1
    2
    3
    4
    5
    6
    7
    8
    9

    static void
    ngx_worker_process_cycle(ngx_cycle_t *cycle, void *data)
    {
    ngx_worker_process_init(cycle, worker);
    for ( ;; ) {
    ngx_process_events_and_timers(cycle);
    }
    }
  • ngx_process_events_and_timers是接受客户请求的函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void
ngx_process_events_and_timers(ngx_cycle_t *cycle)
{
if (ngx_use_accept_mutex) {
if (ngx_accept_disabled > 0) {
ngx_accept_disabled--;

} else {
if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {
return;
}
}
//这里ngx_process_events宏定义的是ngx_epoll_process_events函数
(void) ngx_process_events(cycle, timer, flags);
}
  • ngx_epoll_process_events 通过多层次的请求最终函数调用会落在ngx_event_accept,最终处理来自客户端的请求
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
static ngx_int_t ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags)
{
events = epoll_wait(ep, event_list, (int) nevents, timer);
for (i = 0; i < events; i++) {
c = event_list[i].data.ptr;

instance = (uintptr_t) c & 1;
c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1);

rev = c->read;
//(gdb) p rev->handler
//$3 = (ngx_event_handler_pt) 0x5555555cb7f6 <ngx_http_wait_request_handler>
rev->handler(rev);
}

}

void ngx_event_accept(ngx_event_t *ev)
{
ngx_accept_disabled = ngx_cycle->connection_n / 8- ngx_cycle->free_connection_n;
ngx_listening_t *ls;
/*
(gdb) p ls->handler
$5 = (ngx_connection_handler_pt) 0x5555555cb32d
<ngx_http_init_connection>
*/
ls->handler(c);

}
  • nginx 通过ngx_accept_disabled来协调当前worker是否应该接受处理来自客户端请求吗,在ngx_event_accept中,设置ngx_accept_disabled为当前worker进程的连接数的1/8 减去当前空间连接,这个ngx_accept_disabled越大,空闲连接就越小,说明当前worker进程比较繁忙,让出请求处理的特权,其他的worker可以去处理。

操作系统-用户接口篇

用户接口

  • 为了方便用户使用计算机系同,操作系统向用户提供了直接使用计算系统的手段,这个就叫做用户接口

接口类型

  • 交互接口:用户提供各种命令控制界面,用户利用这些操作命令来阻止和控制程序的执行或者管理计算机系统,经典的实现有linux shell
  • 程序接口:编程人员在程序中通过程序接口来请求操作系统的提供服务

程序接口

定义
  • 程序接口用于在程序和系统资源以及系统服务之间实现交互的作用,而为了保证系统的安全行,系统提供了若干系统调用(system call)来实现用户程序和内核的交互,因此,系统调用是操作系统为编程人员提供唯一的程序接口。
一般函数和系统调用区别
  • 运行在不同系统状态,一般函数调用运行于用户态,而系统调用运行于内核态
  • 系统调用通过软中断进入,一般的函数调用不涉及软中断,不涉及系统状态的转换;运行系统调用时候,由于调用程序和被调用程序处于不同的系统状态,因此不允许由调用程序直接转向被调用程序,通常是通过软中断机制,先由用户态转换内核态,在转向相应的中断处理子程序。
系统调用类型
  • 设备管理:用来请求和释放有关设备以及启动设备的操作
  • 文件管理:文件的读、写、创建、删除等操作
  • 进程控制:进程的创建、进程执行、进程等待、进程撤销、进程优先级等操作
  • 进程通信:进程之间的传递消息或者信号
  • 内存管理:内存的相关操作
系统调用处理过程
  • 保护进程的现场,这里也叫上下文,系统发生陷入终端时候,需要把当前进程的状态保存起来,才能切换到目标进程的执行状态,这些上下文保存在寄存器中
  • 取得系统调用号并转入相应的处理程序,系统中配置了一个系统调用号和处理子程序的入口地址
  • 返回,在系统调用结束之后,陷入处理还要恢复处理机现场

nginx 的worker_process

nginx的配置worker_process

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
[perrynzhou@perrynzhou-ubuntu ~/Debug/nginx-1.19.0/conf]$ cat nginx.conf 

#user nobody;
worker_processes 1;

events {
worker_connections 1024;
}


http {
include mime.types;
default_type application/octet-stream;
sendfile on;
keepalive_timeout 65;


server {
listen 80;
server_name localhost;

location / {
root html;
index index.html index.htm;
}

error_page 500 502 503 504 /50x.html;
location = /50x.html {
root html;
}

}

nginx 如何配置worker_process呢?

nginx中通过static ngx_command_t ngx_core_commands[]数组表示配置文件以及配置文件中的回调函数
1
2
3
4
5
6
{ ngx_string("worker_processes"),
NGX_MAIN_CONF|NGX_DIRECT_CONF|NGX_CONF_TAKE1,
ngx_set_worker_processes,
0,
0,
NULL },
worker_process 通过回调函数ngx_set_worker_processes设置nginx的进程数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
static char *
ngx_set_worker_processes(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
ngx_str_t *value;
ngx_core_conf_t *ccf;

ccf = (ngx_core_conf_t *) conf;

if (ccf->worker_processes != NGX_CONF_UNSET) {
return "is duplicate";
}

value = (ngx_str_t *) cf->args->elts;

if (ngx_strcmp(value[1].data, "auto") == 0) {
//如果在 conf/nginx.conf 中配置 worker_processes auto;则会走这个逻辑
ccf->worker_processes = ngx_ncpu;
//ngx_cpu:会在 ngx_os_init 函数中初始化这个全局变量,ngx_ncpu = sysconf(_SC_NPROCESSORS_ONLN);获取CPU逻辑核
return NGX_CONF_OK;
}

ccf->worker_processes = ngx_atoi(value[1].data, value[1].len);

if (ccf->worker_processes == NGX_ERROR) {
return "invalid value";
}

return NGX_CONF_OK;
}

nginx 如何启动一个或者多个 worker_process?

  • 调用关系链
1
2
3
main->
ngx_master_process_cycle->
ngx_start_worker_processes
  • main:nginx入口函数
  • ngx_master_process_cycle:依据配置文件初始化worker_process和master process.同时提供nginx -h 列举出所有的命令操作实现,这里仅仅列举出来和master、worker进程相关的实现
    1
    2
    3
    4
    5
    6
    7
    8
    void ngx_master_process_cycle(ngx_cycle_t *cycle)
    {
    //fork进程作为master进程
    ngx_start_worker_processes(cycle, ccf->worker_processes,NGX_PROCESS_JUST_RESPAWN);

    //fork worker进程
    ngx_start_cache_manager_processes(cycle, 1);
    }
  • ngx_start_worker_processes:根据配置文件启动多个work_process.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
static void ngx_start_worker_processes(ngx_cycle_t *cycle, ngx_int_t n, ngx_int_t type)
{

for (i = 0; i < n; i++) {
//ngx_worker_process_cycle 是每个work进程执行的函数
ngx_spawn_process(cycle, ngx_worker_process_cycle, (void *) (intptr_t) i, "worker process", type);
}
}
static void ngx_worker_process_cycle(ngx_cycle_t *cycle, void *data)
{
ngx_int_t worker = (intptr_t) data;

ngx_uint_t i;
ngx_connection_t *c;

ngx_process = NGX_PROCESS_WORKER;
ngx_worker_process_init(cycle, worker);
ngx_setproctitle("worker process");
//忽略了其他的代码
for ( ;; ) {

if (ngx_exiting) {

c = cycle->connections;

for (i = 0; i < cycle->connection_n; i++) {

/* THREAD: lock */

if (c[i].fd != -1 && c[i].idle) {
c[i].close = 1;
c[i].read->handler(c[i].read);
}
}
}

}

ngx_pid_t ngx_spawn_process(ngx_cycle_t *cycle, ngx_spawn_proc_pt proc, void *data,char *name, ngx_int_t respawn)
{
//
pid = fork();

switch (pid) {

case -1:
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"fork() failed while spawning \"%s\"", name);
ngx_close_channel(ngx_processes[s].channel, cycle->log);
return NGX_INVALID_PID;

case 0:
ngx_pid = ngx_getpid();
proc(cycle, data);
break;

default:
break;
}

}