perrynzhou

专注于系统组件研发

0%

glusterfs客户端如何cp一个文件?

glusterfs fuse简单介绍

xlator是glusterfs核心的概念,每个xlator对用一系列函数处理对应的文件操作,glusterfs/glusterfsd/glusterd的三个二进制的入口都是相同的(glusterfs/src/glusterfsd.c),但是在不同模块的逻辑上处理会加载不同的xlator来完成对应模块功能的操作。glusterfs客户端实现是基于fuse实现了自己的API。如下是glusterfs客户单核心函数调用链(glusterfs/src/glusterfsd.c)

fuse 请求转发流程

  • 1.main

    • glusterfs入口函数
  • 2.create_fuse_mount

    • 设置mount/fuse xlator,调用xlator_set_type初始化这个mount/fuse的模块(实际调用mount/fuse中的xlator的init方法),这个也是后面我们着重需要讲清楚的模块
  • 3.glusterfs_volumes_init

    • 根据挂载时候的提供的节点IP,初始化glusterfs客户端需要的信息,比如服务端brick的元数据局,客户端需要加载的哪些xlators
  • 4.glusterfs_mgmt_init

    • 依据挂载(mount)时候提供的IP,和节点所在glusterd通信获取服务端的brick信息以及需要xlators
  • 5.glusterfs_process_volfp

  • 依据获取到的服务端提供的元数据,调用xlator_init初始化,整个客户端glustefs初始化完成,glusterfs 第一个访问的是mount/fuse xlator(这个xlator的配置信息不需要从glusterd请求获取) ,最后访问的是protocol/client的xlator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/* client.so 对应的protocol/client 的xlator */
/usr/local/lib/glusterfs/2020.05.12/xlator/protocol/client.so
/* replicate.so 对应cluster/replicate,也就是cluster/afr 副本卷这块实现 */
/usr/local/lib/glusterfs/2020.05.12/xlator/cluster/replicate.so
/* distribute.so 对应cluster/distribute,也就是cluster/dht哈希卷的实现 */
/usr/local/lib/glusterfs/2020.05.12/xlator/cluster/distribute.so
/usr/local/lib/glusterfs/2020.05.12/xlator/features/utime.so
/usr/local/lib/glusterfs/2020.05.12/xlator/performance/write-behind.so
/usr/local/lib/glusterfs/2020.05.12/xlator/performance/read-ahead.so
/usr/local/lib/glusterfs/2020.05.12/xlator/performance/readdir-ahead.so
/usr/local/lib/glusterfs/2020.05.12/xlator/performance/io-cache.so
/usr/local/lib/glusterfs/2020.05.12/xlator/performance/open-behind.so
/usr/local/lib/glusterfs/2020.05.12/xlator/performance/quick-read.so
/usr/local/lib/glusterfs/2020.05.12/xlator/performance/md-cache.so
/usr/local/lib/glusterfs/2020.05.12/xlator/debug/io-stats.so
  • glusterfs_graph_activate

    • ctx->master在初始化时候已经把mount/fuse的xlator赋值给master了,然后在xlator graph激活时候,如果master不为空就执行xlator_notify函数,第一个参数就是mount/fuse的xlator的结构体指针

      1
      2
      3
      4
      5
      6
      7
      8
      glusterfs_graph_activate(glusterfs_graph_t *graph, glusterfs_ctx_t *ctx)
      {
      //fuse-bridge.c:notify在这里调用
      if (ctx->master) {
      ret = xlator_notify(ctx->master, GF_EVENT_GRAPH_NEW, graph)
      ((xlator_t *)ctx->master)->next = graph->top;
      }
      }
  • xlator_notify

    • xlator_notify函数主要是调用mount/fuse这个xlator的notify函数多个线程读取/dev/fuse中数据
    1
    2
    3
    4
    5
    xlator_notify(xlator_t *xl, int event, void *data, ...)
    {
    //调用mount/fuse xlator中的notify函数
    xl->notify(xl, event, data);
    }
  • notify

    • 该函数是mount/fuse中的notify函数

      1
      2
      3
      4
      5
      6
      7
      //mount/fuse中notify的函数
      notify(xlator_t *this, int32_t event, void *data, ...)
      {
      //fork子进程在多线程中执行fuse_thread_proc
      for (i = 0; i < private->reader_thread_count; i++) {
      ret = gf_thread_create(&private->fuse_thread[i], NULL,fuse_thread_proc, this, "fuseproc");
      }
  • fuse_thread_proc

    • 读取/dev/fuse中的数据,转发给对应的fuse_xxx的函数
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    //启动线程轮训读取数据,在转发给对应的fuse_xxx函数
    fuse_thread_proc(void *data)
    {
    xlator_t *this = NULL;
    fuse_private_t *priv = NULL;
    this = data;
    priv = this->private;

    THIS = this;
    //在xlator/mount/fuse源代码模块中init函数中已经把priv->fd设置为/dev/fuse的文件描述符
    res = sys_readv(priv->fd, iov_in, 2);
    gf_async(&fasync->async, this, fuse_dispatch);

    }
  • fuse_dispatch

    • 主要是读取到/dev/fuse的数据转发给对应的fuse_xxx函数,比如fuse初始化挂载操作会转发请求到fuse_init函数

      1
      2
      3
      4
      5
      6
      7
      //读取到的数据转发到对应的fuse_ops函数中
      fuse_dispatch(xlator_t *xl, gf_async_t *async)
      {
      priv->fuse_ops[finh->opcode](xl, finh, fasync->msg, iobuf);
      //priv->fuse_ops对应的是fuse_std_ops函数指针数组
      priv->fuse_ops[finh->opcode](xl, finh, fasync->msg, iobuf);
      }
      • 其中priv->fuse_ops中定义了fuse一系列操作,具体的定义如下

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        38
        39
        40
        41
        42
        43
        44
        45
        46
        47
        48
        49
        50
        51
        52
        53
        54
        55
        56
        57
        58
        59
        60
        61
        62
        63
        static fuse_handler_t *fuse_std_ops[FUSE_OP_HIGH] = {
        [FUSE_LOOKUP] = fuse_lookup,
        [FUSE_FORGET] = fuse_forget,
        [FUSE_GETATTR] = fuse_getattr,
        [FUSE_SETATTR] = fuse_setattr,
        [FUSE_READLINK] = fuse_readlink,
        [FUSE_SYMLINK] = fuse_symlink,
        [FUSE_MKNOD] = fuse_mknod,
        [FUSE_MKDIR] = fuse_mkdir,
        [FUSE_UNLINK] = fuse_unlink,
        [FUSE_RMDIR] = fuse_rmdir,
        [FUSE_RENAME] = fuse_rename,
        [FUSE_LINK] = fuse_link,
        [FUSE_OPEN] = fuse_open,
        [FUSE_READ] = fuse_readv,
        [FUSE_WRITE] = fuse_write,
        [FUSE_STATFS] = fuse_statfs,
        [FUSE_RELEASE] = fuse_release,
        [FUSE_FSYNC] = fuse_fsync,
        [FUSE_SETXATTR] = fuse_setxattr,
        [FUSE_GETXATTR] = fuse_getxattr,
        [FUSE_LISTXATTR] = fuse_listxattr,
        [FUSE_REMOVEXATTR] = fuse_removexattr,
        [FUSE_FLUSH] = fuse_flush,
        [FUSE_INIT] = fuse_init,
        [FUSE_OPENDIR] = fuse_opendir,
        [FUSE_READDIR] = fuse_readdir,
        [FUSE_RELEASEDIR] = fuse_releasedir,
        [FUSE_FSYNCDIR] = fuse_fsyncdir,
        [FUSE_GETLK] = fuse_getlk,
        [FUSE_SETLK] = fuse_setlk,
        [FUSE_SETLKW] = fuse_setlk,
        [FUSE_ACCESS] = fuse_access,
        [FUSE_CREATE] = fuse_create,
        [FUSE_INTERRUPT] = fuse_interrupt,
        /* [FUSE_BMAP] */
        [FUSE_DESTROY] = fuse_destroy,
        /* [FUSE_IOCTL] */
        /* [FUSE_POLL] */
        /* [FUSE_NOTIFY_REPLY] */

        #if FUSE_KERNEL_MINOR_VERSION >= 16
        [FUSE_BATCH_FORGET] = fuse_batch_forget,
        #endif

        #if FUSE_KERNEL_MINOR_VERSION >= 19
        #ifdef FALLOC_FL_KEEP_SIZE
        [FUSE_FALLOCATE] = fuse_fallocate,
        #endif /* FALLOC_FL_KEEP_SIZE */
        #endif

        #if FUSE_KERNEL_MINOR_VERSION >= 21
        [FUSE_READDIRPLUS] = fuse_readdirp,
        #endif

        #if FUSE_KERNEL_MINOR_VERSION >= 24 && HAVE_SEEK_HOLE
        [FUSE_LSEEK] = fuse_lseek,
        #endif

        #if FUSE_KERNEL_MINOR_VERSION >= 28
        [FUSE_COPY_FILE_RANGE] = fuse_copy_file_range,
        #endif
        };
  • FUSE_FOP

    • FUSE_FOP用于转发当前操作到下一个xlator的fops函数,比如下一个xlator是cluster/dht,dht下一个xlator是cluster/afr,afr最后一个是protocol/client,procotol/client对应的是client4_0_writev函数,把数据通过网络push到对应某个glusterfsd进程。
      1
      2
      3
      FUSE_FOP(state, fuse_writev_cbk, GF_FOP_WRITE, writev, state->fd,
      &state->vector, 1, state->off, state->io_flags, iobref,
      state->xdata);
  • dht_writev
    *dht_writev按照哈希卷的方式写入数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    //cluster/dht 的xlator中的dht_writev方法
    int dht_writev(call_frame_t *frame, xlator_t *this, fd_t *fd, struct iovec *vector,
    int count, off_t off, uint32_t flags, struct iobref *iobref,
    dict_t *xdata)
    {
    //下一个xlator的调用申请
    STACK_WIND_COOKIE(frame, dht_writev_cbk, subvol, subvol,
    subvol->fops->writev, fd, local->rebalance.vector,
    local->rebalance.count, local->rebalance.offset,
    local->rebalance.flags, local->rebalance.iobref,
    local->xattr_req);
    }
  • afr_writev

    • 副本卷执行写入操作的方法
      1
      2
      3
      4
      int afr_writev(call_frame_t *frame, xlator_t *this,...)
      {
      afr_do_writev(frame, this);
      }
  • client4_0_writev(protocol/client xlator)

    • procotcol/client中的client4_0_writev是把数据push到某个节点的glusterfsd进程
      1
      2
      3
      4
      5
      6
      7
      // procotol/client的xlator的writev方法
      int32_t client4_0_writev(call_frame_t *frame, xlator_t *this, void *data)
      {
      ret = client_submit_request(this, &req, frame, conf->fops, GFS3_OP_WRITE, client4_0_writev_cbk, &cp,(xdrproc_t)xdr_gfx_write_req);
      }
      //(gdb) p conf->fops
      $99 = (rpc_clnt_prog_t *) 0x7fffe9b79c20 <clnt4_0_fop_prog>

mount/fuse cp文件流程

  • 设计基本思路

    • mount/fuse 这个xlator的实现在xlator/src/mount模块中,该模块以fuse_开头的函数并么有在这个模块中显性中调用,都是通过fuse_std_ops初始化这个一些列函数,在具体执行时候根据这个数组的指针来隐形的调用。针对每个文件系统的操作(比如,ls,rm)都会转发到对应的fuse的函数处理,处理完毕后在走mount/fuse的下一个xlator,走对应xlator的一些列函数。
  • gdb 堆栈的信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
//简化版本
(gdb) bt
#0 afr_writev at afr-inode-write.c:491
#1 dht_writev at dht-inode-write.c:223
#2 shard_common_inode_write_wind at shard.c:5397
#3 shard_common_inode_write_do at shard.c:5527
#4 wb_writev at write-behind.c:1897
#5 ra_writev at read-ahead.c:650
#6 rda_writev at readdir-ahead.c:788
#7 ioc_writev at io-cache.c:1303
#8 default_writev_resume at defaults.c:1983
#9 call_resume_wind at call-stub.c:2085
#10 call_resume at call-stub.c:2555
#11 open_and_resume at open-behind.c:485
#12 ob_writev at open-behind.c:683
#13 qr_writev at quick-read.c:849
#14 mdc_writev at md-cache.c:2082
#15 io_stats_writev at io-stats.c:2882
#16 default_writev at defaults.c:2735
#17 meta_writev at meta.c:131
#18 fuse_write_resume at fuse-bridge.c:2959
#19 fuse_fop_resume at fuse-bridge.c:1030
#20 fuse_resolve_done at fuse-resolve.c:629
#21 fuse_resolve_all at fuse-resolve.c:653
#22 fuse_resolve at fuse-resolve.c:620
#23 fuse_resolve_all at fuse-resolve.c:650
#24 fuse_resolve_continue at fuse-resolve.c:668
#25 fuse_resolve_fd at fuse-resolve.c:543
#26 fuse_resolve at fuse-resolve.c:611
#27 fuse_resolve_all at fuse-resolve.c:644
#28 fuse_resolve_and_resume at fuse-resolve.c:680
#29 fuse_write at fuse-bridge.c:3011
#30 fuse_dispatch at fuse-bridge.c:5838