分类 redis 下的文章

相关资料

networking.c
请求处理完成后,通过调用 addReply 函数族来完成响应。

void addReply(redisClient *c, robj *obj);
void addReplySds(redisClient *c, sds s);
void addReplyString(redisClient *c, char *s, size_t len);
void addReplyString(redisClient *c, char *s, size_t len) {
    if (prepareClientToWrite(c) != REDIS_OK) return;
    if (_addReplyToBuffer(c,s,len) != REDIS_OK)
        _addReplyStringToList(c,s,len);
}
以 addReplyString 为例,在 addReply 函数中:
首先调用 prepareClientToWrite 函数,完成准备工作。
然后向客户端缓存写入响应内容,写入响应内容时,总是先尝试写入到固定 buf,如果写入失败,再写入到动态分配的链表中。
客户端缓存由两部分组成:固定大小的 buf 和动态分配的 reply(链表)。

#define REDIS_REPLY_CHUNK_BYTES (16*1024) /* 16k output buffer */
char buf[REDIS_REPLY_CHUNK_BYTES];

c->reply = listCreate();

1. 写入响应内容之前,完成准备工作

/* This function is called every time we are going to transmit new data
 * to the client. The behavior is the following:
 *
 * If the client should receive new data (normal clients will) the function
 * returns REDIS_OK, and make sure to install the write handler in our event
 * loop so that when the socket is writable new data gets written.
 *
 * If the client should not receive new data, because it is a fake client,
 * a master, a slave not yet online, or because the setup of the write handler
 * failed, the function returns REDIS_ERR.
 *
 * Typically gets called every time a reply is built, before adding more
 * data to the clients output buffers. If the function returns REDIS_ERR no
 * data should be appended to the output buffers. */
int prepareClientToWrite(redisClient *c) {
    if (c->flags & REDIS_LUA_CLIENT) return REDIS_OK;
    if ((c->flags & REDIS_MASTER) &&
        !(c->flags & REDIS_MASTER_FORCE_REPLY)) return REDIS_ERR;
    if (c->fd <= 0) return REDIS_ERR; /* Fake client */
    if (c->bufpos == 0 && listLength(c->reply) == 0 &&
        (c->replstate == REDIS_REPL_NONE ||
         c->replstate == REDIS_REPL_ONLINE) &&
        aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
        sendReplyToClient, c) == AE_ERR) return REDIS_ERR;
    return REDIS_OK;
}
在向客户端缓存写入响应内容之前,先向事件处理程序注册写事件,回调函数是:sendReplyToClient()。
此时,客户端缓存为空:c->bufpos == 0 && listLength(c->reply) == 0

2. 写入响应内容

写入响应内容,通过调用以下函数来完成:

int _addReplyToBuffer(redisClient *c, char *s, size_t len);
void _addReplyObjectToList(redisClient *c, robj *o);
void _addReplySdsToList(redisClient *c, sds s);
void _addReplyStringToList(redisClient *c, char *s, size_t len);

注:写入响应内容时,总是先尝试写入到固定 buf,如果写入失败,再写入到动态分配的链表中。
int _addReplyToBuffer(redisClient *c, char *s, size_t len) {
    size_t available = sizeof(c->buf)-c->bufpos;

    if (c->flags & REDIS_CLOSE_AFTER_REPLY) return REDIS_OK;

    /* If there already are entries in the reply list, we cannot
     * add anything more to the static buffer. */
    if (listLength(c->reply) > 0) return REDIS_ERR;

    /* Check that the buffer has enough space available for this string. */
    if (len > available) return REDIS_ERR;

    memcpy(c->buf+c->bufpos,s,len);
    c->bufpos+=len;
    return REDIS_OK;
}
void _addReplyStringToList(redisClient *c, char *s, size_t len) {
    robj *tail;

    if (c->flags & REDIS_CLOSE_AFTER_REPLY) return;

    if (listLength(c->reply) == 0) {
        robj *o = createStringObject(s,len);

        listAddNodeTail(c->reply,o);
        c->reply_bytes += zmalloc_size_sds(o->ptr);
    } else {
        tail = listNodeValue(listLast(c->reply));

        /* Append to this object when possible. */
        if (tail->ptr != NULL &&
            sdslen(tail->ptr)+len <= REDIS_REPLY_CHUNK_BYTES)
        {
            c->reply_bytes -= zmalloc_size_sds(tail->ptr);
            tail = dupLastObjectIfNeeded(c->reply);
            tail->ptr = sdscatlen(tail->ptr,s,len);
            c->reply_bytes += zmalloc_size_sds(tail->ptr);
        } else {
            robj *o = createStringObject(s,len);

            listAddNodeTail(c->reply,o);
            c->reply_bytes += zmalloc_size_sds(o->ptr);
        }
    }
    asyncCloseClientOnOutputBufferLimitReached(c);
}

3. 发送响应内容

发送响应内容,通过 sendReplyToClient 函数来实现。
while(c->bufpos > 0 || listLength(c->reply)) {
    if (c->bufpos > 0) {
        nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
        if (nwritten <= 0) break;
        c->sentlen += nwritten;
        totwritten += nwritten;

        /* If the buffer was sent, set bufpos to zero to continue with
         * the remainder of the reply. */
        if (c->sentlen == c->bufpos) {
            c->bufpos = 0;
            c->sentlen = 0;
        }
    } else {
        o = listNodeValue(listFirst(c->reply));
        objlen = sdslen(o->ptr);
        objmem = zmalloc_size_sds(o->ptr);

        if (objlen == 0) {
            listDelNode(c->reply,listFirst(c->reply));
            continue;
        }

        nwritten = write(fd, ((char*)o->ptr)+c->sentlen,objlen-c->sentlen);
        if (nwritten <= 0) break;
        c->sentlen += nwritten;
        totwritten += nwritten;

        /* If we fully sent the object on head go to the next one */
        if (c->sentlen == objlen) {
            listDelNode(c->reply,listFirst(c->reply));
            c->sentlen = 0;
            c->reply_bytes -= objmem;
        }
    }
    /* Note that we avoid to send more than REDIS_MAX_WRITE_PER_EVENT
     * bytes, in a single threaded server it's a good idea to serve
     * other clients as well, even if a very large request comes from
     * super fast link that is always able to accept data (in real world
     * scenario think about 'KEYS *' against the loopback interface).
     *
     * However if we are over the maxmemory limit we ignore that and
     * just deliver as much data as it is possible to deliver. */
    server.stat_net_output_bytes += totwritten;
    if (totwritten > REDIS_MAX_WRITE_PER_EVENT &&
        (server.maxmemory == 0 ||
         zmalloc_used_memory() < server.maxmemory)) break;
}

相关资料

redis.c
networking.c

1. 建立连接

/* Create an event handler for accepting new connections in TCP and Unix
 * domain sockets. */
for (j = 0; j < server.ipfd_count; j++) {
    if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
        acceptTcpHandler,NULL) == AE_ERR)
        {
            redisPanic(
                "Unrecoverable error creating server.ipfd file event.");
        }
}
if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
    acceptUnixHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.sofd file event.");
当客户端请求连接时,对应的事件处理函数是:acceptTcpHandler、acceptUnixHandler,其中:
acceptTcpHandler 接受 tcp 连接请求;
acceptUnixHandler 接受 unix 域连接请求。
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
    char cip[REDIS_IP_STR_LEN];
    REDIS_NOTUSED(el);
    REDIS_NOTUSED(mask);
    REDIS_NOTUSED(privdata);

    while(max--) {
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                redisLog(REDIS_WARNING,
                    "Accepting client connection: %s", server.neterr);
            return;
        }
        redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
        acceptCommonHandler(cfd,0);
    }
}
anetTcpAccept 接受 tcp 连接,同时返回连接套接字cfd。
acceptCommonHandler 对连接套接字cfd及对应的客户端结构进行初始化。
c = createClient(fd))

初始化主要在 createClient 函数中进行,其中,通过注册事件处理函数 readQueryFromClient 来读取请求。

aeCreateFileEvent(server.el,fd,AE_READABLE,readQueryFromClient, c)

2. 请求处理

请求处理可以分为3个步骤:
1. 读取请求
2. 解析请求
3. 执行请求
readQueryFromClient 函数是请求处理的起点,该函数将请求读取到 c->querybuf 中,然后调用 processInputBuffer 函数。
void processInputBuffer(redisClient *c) {
    /* Keep processing while there is something in the input buffer */
    while(sdslen(c->querybuf)) {
        /* Immediately abort if the client is in the middle of something. */
        if (c->flags & REDIS_BLOCKED) return;

        /* REDIS_CLOSE_AFTER_REPLY closes the connection once the reply is
         * written to the client. Make sure to not let the reply grow after
         * this flag has been set (i.e. don't process more commands). */
        if (c->flags & REDIS_CLOSE_AFTER_REPLY) return;

        /* Determine request type when unknown. */
        if (!c->reqtype) {
            if (c->querybuf[0] == '*') {
                c->reqtype = REDIS_REQ_MULTIBULK;
            } else {
                c->reqtype = REDIS_REQ_INLINE;
            }
        }

        if (c->reqtype == REDIS_REQ_INLINE) {
            if (processInlineBuffer(c) != REDIS_OK) break;
        } else if (c->reqtype == REDIS_REQ_MULTIBULK) {
            if (processMultibulkBuffer(c) != REDIS_OK) break;
        } else {
            redisPanic("Unknown request type");
        }

        /* Multibulk processing could see a <= 0 length. */
        if (c->argc == 0) {
            resetClient(c);
        } else {
            /* Only reset the client when the command was executed. */
            if (processCommand(c) == REDIS_OK)
                resetClient(c);
        }
    }
}
if (c->reqtype == REDIS_REQ_INLINE) {
    if (processInlineBuffer(c) != REDIS_OK) break;
} else if (c->reqtype == REDIS_REQ_MULTIBULK) {
    if (processMultibulkBuffer(c) != REDIS_OK) break;
}

根据请求类型,分别调用 processInlineBuffer 和 processMultibulkBuffer 解析请求。
最后调用 processCommand 函数执行请求。

c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
根据argv[0],查找CommandTable,找到对应的执行函数。

call(c,REDIS_CALL_FULL);
调用函数执行命令。