perrynzhou

专注于系统组件研发

0%

epoll中的惊群效应

多进程(使用epoll/select/poll)监听同一个端口的惊群效应解决方案

1.什么是惊群效应?

  • 多个进程(A、B、C、D的worker进程)使用epoll/poll/select等函数监听同一个端口时候,当有一个TCP请求连接到该端口时候,A、B、C、D进程会被同时唤醒,但是仅仅有一个进程会accept接受来自客户端的连接,其他的进程则会挂起。
  • 举个例子,假设你去银行柜台取钱,银行有4个窗口,每个窗口的营业员都在等待客户来取钱,叫到某个号码时候(就该手持某个号码的人去柜台取钱),这个人就去柜台,在这个时候4个窗口营业员同时叫那个人去自己窗口办理,但是仅仅只会有一个窗口的营业员提供服务给你,但是这4个窗口的营业员会去“招呼”你去她哪里办理业务。4个窗口的营业员同时叫你自己的号码时候,只有一个窗口提供服务,这个效应就是惊群

2.如何产生惊群效应

  • 在早期的Linux版本中,内核对于阻塞在epoll_wait的进程,也是采用全部唤醒的机制,所以存在和accept相似的“惊群”问题。新版本的的解决方案也是只会唤醒等待队列上的第一个进程或线程,所以,新版本Linux 部分的解决了epoll的“惊群”问题。所谓部分的解决,意思就是:对于部分特殊场景,使用epoll机制,已经不存在“惊群”的问题了,但是对于大多数场景,epoll机制仍然存在“惊群”.
  • 多个进程同时监听同一个端口时候,当有请求连接到该端口时候,多个进程会被同时唤醒,但是仅仅有一个进程会accept的请求,其他的进程则会挂起

3.惊群效应影响

  • 假设有10万tcp请求,后端有64个worker进程,这些进程的唤醒和挂起之间的切换开销非常大,会严重影响服务器的处理请求的吞吐量

4.如何解决惊群效应

  • 多进程方式下,在共享内存设置一个变量,在多个进程accept请求之前针对该变量加锁,哪个进程获取到锁,哪个进程就accept请求。

5.解决惊群效应解决思路

  • 在epoll_wait返回,获取锁,如果获得锁就继续accept新的request;否则继续epoll_wait

6.惊群效应例子

  • 运行实例


  • 代码
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    /*************************************************************************
    > File Name: epoll_test.c
    > Author:perrynzhou
    > Mail:perrynzhou@gmail.com
    > Created Time: Tuesday, June 23, 2020 AM08:45:28 HKT
    ************************************************************************/

    #include <stdio.h>
    #include <assert.h>
    #include <sys/types.h>
    #include <sys/socket.h>
    #include <sys/epoll.h>
    #include <netdb.h>
    #include <string.h>
    #include <stdio.h>
    #include <unistd.h>
    #include <fcntl.h>
    #include <stdlib.h>
    #include <errno.h>
    #include <sys/wait.h>
    #include <unistd.h>
    #include <netinet/in.h>
    #include <arpa/inet.h>

    #define NET_SERVICE_BUFFER_LEN (1024)

    typedef struct net_service_t
    {
    int sfd;
    int efd;
    int worker_process_num;
    struct epoll_event event;
    struct epoll_event *events;
    } net_service;
    inline static int net_service_accept_request(net_service *ns)
    {
    struct sockaddr client_addr;
    socklen_t len = sizeof(struct sockaddr);
    return accept(ns->sfd, &client_addr, &len);
    }
    inline static void net_service_fetch_client_addr(int newfd, char *buf, size_t sz)
    {
    struct sockaddr_in addr;
    socklen_t addr_size = sizeof(struct sockaddr_in);
    int res = getpeername(newfd, (struct sockaddr *)&addr, &addr_size);
    strncpy(buf, inet_ntoa(addr.sin_addr), sz);
    }
    void net_service_handle_request(net_service *ns, int k)
    {
    int max_event = 1024;
    int efd = ns->efd;
    int sfd = ns->sfd;
    struct epoll_event *events = ns->events;
    while (1)
    {
    int n = epoll_wait(efd, events, max_event, -1);
    usleep(100);
    for (int i = 0; i < n; i++)
    {
    if (events[i].events & EPOLLERR)
    {
    fprintf(stdout, "epoll error\n");
    close(events[i].data.fd);
    continue;
    }
    else if (sfd == events[i].data.fd)
    {
    int client_fd = net_service_accept_request(ns);
    if (client_fd == -1)
    {
    fprintf(stdout, "worker-%d-[%d] process return from epoll_wait,accept failed\n", k, getpid());
    break;
    }
    char b[NET_SERVICE_BUFFER_LEN];
    net_service_fetch_client_addr(client_fd, (char *)&b, NET_SERVICE_BUFFER_LEN);
    fprintf(stdout, "worker-%d-[%d] process return from epoll_wait,accept %s success\n", k, getpid(), (char *)&b);
    close(client_fd);
    }
    }
    }
    }
    static int net_service_create_and_bind(net_service *ns, const char *addr, int port)
    {
    int fd = socket(PF_INET, SOCK_STREAM, 0);
    struct sockaddr_in serveraddr;
    serveraddr.sin_family = AF_INET;
    inet_pton(AF_INET, addr, &serveraddr.sin_addr);
    serveraddr.sin_port = htons(port);
    bind(fd, (struct sockaddr *)&serveraddr, sizeof(serveraddr));

    ns->sfd = fd;
    return 0;
    }
    static int net_service_setsockopt(net_service *ns)
    {

    int flags, s;
    flags = fcntl(ns->sfd, F_GETFL, 0);
    if (flags == -1)
    {
    perror("fcntl");
    return -1;
    }
    flags |= O_NONBLOCK;
    if (fcntl(ns->sfd, F_SETFL, flags) == -1)
    {
    perror("fcntl");
    return -1;
    }
    int reuse = 0;
    setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (const char *)&reuse, sizeof(int));
    return 0;
    }
    int net_service_init(net_service *ns, const char *addr, int port, int num)
    {
    int max_events = 1024;
    if (addr != NULL && net_service_create_and_bind(ns, addr, port) != -1)
    {
    net_service_setsockopt(ns);
    listen(ns->sfd, 1024);
    ns->efd = epoll_create(max_events);
    ns->event.data.fd = ns->sfd;
    ns->event.events = EPOLLIN;
    epoll_ctl(ns->efd, EPOLL_CTL_ADD, ns->sfd, &ns->event);
    ns->events = calloc(max_events, sizeof(struct epoll_event));
    assert(ns->events != NULL);
    ns->worker_process_num = num;
    }
    return -1;
    }
    int net_service_run(net_service *ns)
    {
    pid_t pid = 0;
    for (int i = 0; i < ns->worker_process_num; i++)
    {
    fflush(NULL);
    pid = fork();
    if (pid == 0)
    {
    fprintf(stdout, "start worker-%d-%d\n", i, getpid());
    net_service_handle_request(ns, i);
    }
    }
    while ((pid = waitpid(-1, NULL, 0)))
    {
    if (errno == ECHILD)
    {
    break;
    }
    }
    }
    void net_service_deinit(net_service *ns)
    {
    if (ns != NULL)
    {
    if (ns->efd != -1)
    {
    close(ns->efd);
    }
    if (ns->sfd != -1)
    {
    close(ns->sfd);
    }
    ns->sfd = ns->efd = -1;
    if (ns->events != NULL)
    {
    free(ns->events);
    ns->events = NULL;
    }
    }
    }
    int main(void)
    {
    net_service net;
    net_service_init(&net, "127.0.0.1", 9988, 4);
    fprintf(stdout, "run on %s:%d\n", "127.0.0.1", 9988);
    net_service_run(&net);
    net_service_deinit(&net);
    return 0;
    }