CSAPP ProxyLab

  • Post author:
  • Post category:其他


实验内容

实现一个http代理,共分为三部分:

  • http代理:需要处理client发出的请求报文,处理后再转发到server,再从server接收数据,再传给client。
  • 并发:实现基于多线程的并发,主线程作为写者生成响应的任务,其他线程作为读者来处理这些任务。
  • cache:实现一个简单的LRU cache,可以将server发来的信息缓存,当其他client再次请求相同的数据时,直接将缓存发给他,不需要再次向server请求。

http代理

一个http请求包含一个请求行(request line)和若干个请求报头(request header),某些请求还包括请求主体(request body),这里仅实现了基础的GET请求。

第一行,即请求行的格式如下,每一行之间用\r\n隔开,请求报头与主体之间由空行+\r\n隔开。

method uri version

这里主要需要对uri进行分析,得到server的地址和端口再进行连接即可。

在实现的过程中遇到了一点问题:我对浏览器的GET请求进行抓包,发现里面的uri是相对路径,并没有完整的绝对路径,如果是这样就需要在请求报头中找到HOST:字段才能得到server地址。但是在用火狐使用我的代理时,发现火狐发给proxy的uri中又是绝对路径了,很迷惑。

后来查阅RFC2616,终于找到了答案:发给proxy的请求中,uri必须要是绝对路径,而proxy转发的请求uri须为相对路径。所以作为代理,只需要分析请求行的uri即可,因为发给代理的uri必须要是绝对路径。

并发

并发为基于多线程的并发,用一个简单的缓冲区实现。buf队列装有若干个等待服务的文件描述符,主线程将新文件描述符插入进缓冲区,其他线程从缓冲区中去除文件描述符进行响应服务:

/* sbuf.h */
#include "csapp.h"

#ifndef __SBUF_H__
#define __SBUF_H__

typedef struct {
  /* 循环队列,存储client的文件描述符 */
  int* buf;
  int n;
  int front;
  int rear;
  sem_t mutex;
  sem_t slots;
  sem_t items;
} sbuf_t;

/* 初始化 */
void sbuf_init(sbuf_t* sp, int n);
/* 清理 */
void sbuf_deinit(sbuf_t* sp);
/* 将一个文件描述符插入进缓冲区 */
void sbuf_insert(sbuf_t* sp, int fd);
/* 删除队首的文件描述符并返回给线程,交给线程服务 */
int sbuf_remove(sbuf_t* sp);

#endif
/* sbuf.c */
#include "sbuf.h"

void sbuf_init(sbuf_t* sp, int n) {
  sp->buf = Calloc(n, sizeof(int));
  sp->n = n;
  sp->front = 0;
  sp->rear = 0;
  Sem_init(&(sp->mutex), 0, 1);
  Sem_init(&(sp->slots), 0, n);
  Sem_init(&(sp->items), 0, 0);
}

void sbuf_deinit(sbuf_t* sp) {
  Free(sp->buf);
}

void sbuf_insert(sbuf_t* sp, int fd) {
  P(&(sp->slots));
  P(&(sp->mutex));
  sp->buf[sp->rear] = fd;
  sp->rear = (sp->rear + 1) % sp->n;
  V(&(sp->mutex));
  V(&(sp->items));
}

int sbuf_remove(sbuf_t* sp) {
  int fd;
  P(&(sp->items));
  P(&(sp->mutex));
  fd = sp->buf[sp->front];
  sp->front = (sp->front + 1) % sp->n;
  V(&(sp->mutex));
  V(&(sp->slots));
  return fd;
}

cache

这里cache根据uri存储server返回的数据,读者-写者读优先,替换策略为LRU。一个proxy cache还应该注意数据的有效性,这里就直接写了一个长期有效的(偷懒)。

/* cache.h */
#ifndef __CACHE_H__
#define __CACHE_H__

#include "csapp.h"

/* cache行 */
#define CACHE_SIZE 10
/* server返回的data最大大小 */
#define MAX_OBJECT_SIZE 102400

typedef struct {
  int valid;
  int lru_time;
  int reader_cnt;
  char uri[MAXLINE];
  char data[MAX_OBJECT_SIZE];
  sem_t mutex;
  sem_t w;
} cacheline_t;

/* malloc初始化 */
cacheline_t* cache_init();
/* free空间 */
void cache_deinit(cacheline_t* cache);
/* 根据uri读data,成功返回1,不成功返回0 */
int cache_read(cacheline_t* cache, char* uri, char* data);
/* 将uri-data写入cache,自动根据lru策略替换 */
void cache_write(cacheline_t* cache, char* uri, char* data);

#endif
/* cache.c */
#include "cache.h"

/* 返回LRU策略下的替换行 */
int replace_line(cacheline_t* cache);

cacheline_t* cache_init() {
  cacheline_t* cache = Calloc(CACHE_SIZE, sizeof(cacheline_t));
  int i;
  for (i = 0; i < CACHE_SIZE; ++i) {
    Sem_init(&(cache[i].mutex), 0, 1);
    Sem_init(&(cache[i].w), 0, 1);
  }
  return cache;
}

void cache_deinit(cacheline_t* cache) {
  Free(cache);
}

/* 写优先 */
int cache_read(cacheline_t* cache, char* uri, char* data) {
  int i, r = 0;
  for (i = 0; i < CACHE_SIZE; ++i) {
    P(&(cache[i].mutex));
    /* 第一个进入的写者 */
    if (++cache[i].reader_cnt == 1) {
      P(&(cache[i].w));
    }
    V(&(cache[i].mutex));

    /* 对比uri */
    if (!r && cache[i].valid && !strcmp(cache[i].uri, uri)) {
      sprintf(data, "%s", cache[i].data);
      cache[i].lru_time = 0;
      r = 1;
    } else if (cache[i].valid) {
      /* 增加其他所有项的lru_time */
      ++cache[i].lru_time;
    }
    
    P(&(cache[i].mutex));
    /* 最后一个退出的写者 */
    if (--cache[i].reader_cnt == 0) {
      V(&(cache[i].w));
    }
    V(&(cache[i].mutex));
  }
  return r;
}

void cache_write(cacheline_t* cache, char* uri, char* data) {
  int i = replace_line(cache);
  P(&(cache[i].w));
  cache[i].valid = 1;
  cache[i].lru_time = 0;
  sprintf(cache[i].uri, "%s", uri);
  sprintf(cache[i].data, "%s", data);
  V(&(cache[i].w));
}

int replace_line(cacheline_t* cache) {
  /* lru_time = -1表示找到了空行,不需要再寻找、替换了 */
  int i, r, lru_time = 0;
  for (i = 0; i < CACHE_SIZE && lru_time != -1; ++i) {
    P(&(cache[i].mutex));
    if (++cache[i].reader_cnt == 1) {
      P(&(cache[i].w));
    }
    V(&(cache[i].mutex));

    if (!cache[i].valid) {
      r = i;
      lru_time = -1;
    } else if (cache[i].lru_time >= lru_time) {
      r = i;
      lru_time = cache[i].lru_time;
    }

    P(&(cache[i].mutex));
    if (--cache[i].reader_cnt == 0) {
      V(&(cache[i].w));
    }
    V(&(cache[i].mutex));
  }

  return r;
}

proxy

proxy的思路为,持续侦听一个端口,将所有client的文件描述符放入池中。

生成若干个线程,持续从池中取出文件描述符进行处理。

处理过程为解析method、uri、version,若cache中有uri对应的数据,则直接将数据传递给client;若cache中没有,则解析出server地址,作为中间人传递数据,结束前将数据存入cache中。

/* proxy.c */
#include <stdio.h>
#include "csapp.h"
#include "sbuf.h"
#include "cache.h"

/* 线程数 */
#define NTHREADS 8
/* sbuf中的文件描述符个数 */
#define SBUF_SIZE 10

void create_thread();
void serve(int client_fd);
int parse_uri(char* uri, char* server_hostname, char* server_port);
void send_request(int server_fd, char* server_hostname, char* server_port, char* method, char* uri, char* version);
void* thread(void* vargp);

/* http请求报头中的User-Agent部分 */
static const char *user_agent_hdr = "User-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:10.0.3) Gecko/20120305 Firefox/10.0.3\r\n";
sbuf_t sbuf;
cacheline_t* cache;

int main(int argc, char** argv) {
  int listen_fd, client_fd;
  struct sockaddr_storage client_addr;
  socklen_t client_addr_len = sizeof(client_addr);
  char client_hostname[MAXLINE], client_port[MAXLINE];
  char* proxy_port;

  if (argc != 2) {
    printf("usage: %s <port>\n", argv[0]);
    exit(0);
  }
  proxy_port = argv[1];
  listen_fd = Open_listenfd(proxy_port);

  /* 初始化sbuf */
  sbuf_init(&sbuf, SBUF_SIZE);
  /* 初始化cache */
  cache = cache_init();
  /* 创建线程 */
  create_thread();

  while(1) {
    if ((client_fd = Accept(listen_fd, (SA*)(&client_addr), &client_addr_len)) < 0) {
      printf("client_fd error.\n");
      return 0;
    }
    Getnameinfo((SA*)(&client_addr), client_addr_len, client_hostname, MAXLINE, client_port, MAXLINE, 0);
    printf("Accepted connection from (%s, %s)\n", client_hostname, client_port);
    /* 插入sbuf中,继续侦听 */
    sbuf_insert(&sbuf, client_fd);
  }

  cache_deinit(cache);
  sbuf_deinit(&sbuf);
  Close(listen_fd);
  return 0;
}

/* 创建线程 */
void create_thread() {
  pthread_t tid;
  int i;
  for (i = 0; i < NTHREADS; ++i) {
    Pthread_create(&tid, NULL, thread, NULL);
  }
}

/* 线程例程,持续从sbuf中去除client_fd处理 */
void* thread(void* vargp) {
  Pthread_detach(pthread_self());
  int client_fd;
  while(1) {
    client_fd = sbuf_remove(&sbuf);
    serve(client_fd);
    Close(client_fd);
  }
}

/* 对client_fd进行处理 */
void serve(int client_fd) {

  rio_t rio;
  char buf[MAXLINE], method[MAXLINE], uri[MAXLINE], version[MAXLINE];
  memset(buf, 0, MAXLINE);

  Rio_readinitb(&rio, client_fd);
  if ( !Rio_readlineb(&rio, buf, MAXLINE) ) {
    printf("Failed to read http request line.\n");
    return;
  }
  /* 首行格式为 method uri version */
  sscanf(buf, "%s %s %s", method, uri, version);

  /* 目前只部署了GET请求 */
  if (strcmp(method, "GET")) {
    printf("Method %s not implemented.\n", method);
    return;
  }

  /* 发送给client的数据 */
  char* data = (char*)Malloc(MAX_OBJECT_SIZE);
  /* 先在cache中寻找 */
  if (cache_read(cache, uri, data)) {
    Rio_writen(client_fd, data, strlen(data));
    return;
  }

  /* 修改version到HTTP/1.0(实验要求) */
  sprintf(version, "%s", "HTTP/1.0");

  char server_hostname[MAXLINE], server_port[MAXLINE];
  memset(server_hostname, 0, MAXLINE);
  memset(server_port, 0, MAXLINE);

  /* 指出uri中相对路径的起始位置 */
  int offset;
  /* 解析出server host,返回相对路径位置,便于构建新的uri */
  if ((offset = parse_uri(uri, server_hostname, server_port)) < 0) {
    printf("parse uri error.\n");
    return;
  }

  /* 默认端口为80 */
  if (strlen(server_port) == 0) {
    sprintf(server_port, "80");
  }

  /* 与服务器进行连接 */
  int server_fd;
  if ((server_fd = Open_clientfd(server_hostname, server_port)) < 0) {
    printf("server_fd error.\n");
    return;
  }
  /* 向服务器发送请求 */
  send_request(server_fd, server_hostname, server_port, method, uri + offset, version);

  rio_t server_rio;
  int buf_len = 0;
  int cnt = 0;
  memset(buf, 0, MAXLINE);
  Rio_readinitb(&server_rio, server_fd);

  /* 接收服务器返回的数据 */
  while ((buf_len = Rio_readnb(&server_rio, buf, MAXLINE)) != 0) {
    Rio_writen(client_fd, buf, buf_len);
    memcpy(data + cnt, buf, buf_len);
    cnt += buf_len;
  }
  /* 写入cache */
  cache_write(cache, uri, data);
  Free(data);
}

/* 解析出uri中的server地址,同时返回相对路径位置 */
int parse_uri(char* uri, char* server_hostname, char* server_port) {

  /* http:// */
  char* ptr1 = strstr(uri, "//");
  if (ptr1 == NULL) {
    ptr1 = uri;
  } else {
    ptr1 += 2;
  }

  /* 相对路径开始位置 */
  char* ptr2 = strstr(ptr1, "/");
  if (ptr2 == NULL) {
    return -1;
  }

  /* hostname:port,uri中可能没有指定端口 */
  char* ptr3 = strstr(ptr1, ":");
  if (ptr3 == NULL) {
    ptr3 = ptr2;
  }

  /* 计算长度,得到hostname和port(port可能没有) */
  int hostname_len = ptr3 - ptr1;
  int port_len = (ptr3 == ptr2 ? 0 : ptr2 - (ptr3 + 1));
  strncpy(server_hostname, ptr1, hostname_len);
  strncpy(server_port, ptr3 + 1, port_len);
  
  /* 返回相对路径位置 */
  return ptr2 - uri;
}

/* 向server发送请求 */
void send_request(int server_fd, char* server_hostname, char* server_port, char* method, char* uri, char* version) {
  char buf[MAXLINE];
  sprintf(buf, "%s %s %s\r\n", method, uri, version);
  Rio_writen(server_fd, buf, strlen(buf));
  sprintf(buf, "Host: %s:%s\r\n", server_hostname, server_port);
  Rio_writen(server_fd, buf, strlen(buf));
  sprintf(buf, "%s\r\n", user_agent_hdr);
  Rio_writen(server_fd, buf, strlen(buf));
  sprintf(buf, "Connection: close");
  Rio_writen(server_fd, buf, strlen(buf));
  sprintf(buf, "Proxy-Connection: close");
  Rio_writen(server_fd, buf, strlen(buf));
}

这里实现的代理还很简单,还可以添加其他HTTP请求的处理(POST,HEAD……),添加对一些信号的处理,完善错误处理,验证cache内数据的有效时间等等。



版权声明:本文为qq_40955029原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。