分享

memcache 源代码分析 - 数据结构篇(上)

rsgg03 发表于 2015-4-14 21:37:12 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 12485
本帖最后由 rsgg03 于 2015-4-14 21:39 编辑
问题导读


1.memcache 主进程与工作进程之间如何通信?

2.connection 代表的是什么?










LIBEVENT_THREAD
  LIBEVENT_THREAD 代表的是 memcache 里的 worker thread,数据结构如下:

typedef struct {  
    pthread_t thread_id;        /* unique ID of this thread */
    struct event_base *base;    /* libevent handle this thread uses */
    struct event notify_event;  /* listen event for notify pipe */
    int notify_receive_fd;      /* receiving end of notify pipe */
    int notify_send_fd;         /* sending end of notify pipe */
    struct thread_stats stats;  /* Stats generated by this thread */
    struct conn_queue *new_conn_queue; /* queue of new connections to handle */
    cache_t *suffix_cache;      /* suffix cache */
} LIBEVENT_THREAD;
memcache 主进程与工作进程之间如何通信
  LIBEVENT_THREAD的数据成员主要是用于 main thread 与 worker thread 之间通信的。主进程每次创建一个工作线程的时候,都会创建一个 pipe,数据结构里对应的是notify_send_fd和notify_receive_fd,代码如下:

void memcached_thread_init(...) {  
    ...
    for (i = 0; i < nthreads; i++) {
        int fds[2];
        if (pipe(fds)) { // <-- 创建 pipe
            ...
        }

        threads.notify_receive_fd = fds[0];
        threads.notify_send_fd = fds[1];

        setup_thread(&threads);
       ...
    }
    ...
}
&#8195;&#8195;每当有新的连接请求的时候,主进程就会往notify_send_fd里写数据来通知工作线程,子线程接到消息之后会创建新的 connection,代码如下:

void dispatch_conn_new(...) {  
    CQ_ITEM *item = cqi_new(); // <-- CQ_ITEM 用来存放与连接相关的各种参数
    char buf[1];
    ...

    int tid = (last_thread + 1) % settings.num_threads;
    LIBEVENT_THREAD *thread = threads + tid; // <-- 选择子线程

    last_thread = tid;

    // 存储各种参数
    item->sfd = sfd;
    item->init_state = init_state;
    item->event_flags = event_flags;
    item->read_buffer_size = read_buffer_size;
    item->transport = transport;

    cq_push(thread->new_conn_queue, item); // <-- 将请求参数压入工作线程的 connection 队列当中

    ...
    buf[0] = 'c';
    if (write(thread->notify_send_fd, buf, 1) != 1) { // <-- 通过 pipe 通知 worker thread
        perror("Writing to thread notify pipe");
    }
}
// 子线程接收请求通知
static void thread_libevent_process(int fd, short which, void *arg) {  
    ...

    if (read(fd, buf, 1) != 1)
        ...

    switch (buf[0]) {
    case 'c':
    item = cq_pop(me->new_conn_queue); // <-- 从队列里面读取最新的请求

    if (NULL != item) {
        // 创建新的连接
        conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
                           item->read_buffer_size, item->transport, me->base);
       ...
    }
        break;
   ...
   }
}
connection
&#8195;&#8195;connection 代表的是服务接收的每个连接。如上面注释里提到的,是在工作线程接收到请求的时候创建的。connection 内部结构比较庞大,声明了与连接相关的各种变量。比如其中的rbuf和rbytes,它们是用来存储从 socket 连接中读取的内容的,还有 rcurr 则是用来标记 rbuf 已经解析到了哪里。

&#8195;&#8195;memcached 启动的时候会创建一个叫做conns的全局变量,用来缓存所有连接。当调用conn_new函数创建新的 connection 的时候,会先从conns当中查询是否已经创建了连接,如果有则返回现有的 connection 。如果没有,才创建新的,代码大致如下:

conn *conn_new(...) {  
    conn *c;

    assert(sfd >= 0 && sfd < max_fds);
    c = conns[sfd];

    if (NULL == c) {
        // 创建新的 connection
        ...
        c->sfd = sfd;
        conns[sfd] = c;
    }

    // 使用缓存的 connection
    ...
    return c;
}
slabclass_t & item
&#8195;&#8195;slabclass_t和item是 memcache 里与缓存功能和内存管理相关联的关键数据结构。

欢迎大家如about云官方群371358502,更新咨询,更新资源,随时关注

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条