实验内容
实现一个http代理,共分为三部分:
- http代理:需要处理client发出的请求报文,处理后再转发到server,再从server接收数据,再传给client。
- 并发:实现基于多线程的并发,主线程作为写者生成响应的任务,其他线程作为读者来处理这些任务。
- cache:实现一个简单的LRU cache,可以将server发来的信息缓存,当其他client再次请求相同的数据时,直接将缓存发给他,不需要再次向server请求。
http代理
一个http请求包含一个请求行(request line)和若干个请求报头(request header),某些请求还包括请求主体(request body),这里仅实现了基础的GET请求。
第一行,即请求行的格式如下,每一行之间用\r\n隔开,请求报头与主体之间由空行+\r\n隔开。
method uri version
这里主要需要对uri进行分析,得到server的地址和端口再进行连接即可。
在实现的过程中遇到了一点问题:我对浏览器的GET请求进行抓包,发现里面的uri是相对路径,并没有完整的绝对路径,如果是这样就需要在请求报头中找到HOST:字段才能得到server地址。但是在用火狐使用我的代理时,发现火狐发给proxy的uri中又是绝对路径了,很迷惑。
后来查阅RFC2616,终于找到了答案:发给proxy的请求中,uri必须要是绝对路径,而proxy转发的请求uri须为相对路径。所以作为代理,只需要分析请求行的uri即可,因为发给代理的uri必须要是绝对路径。
并发
并发为基于多线程的并发,用一个简单的缓冲区实现。buf队列装有若干个等待服务的文件描述符,主线程将新文件描述符插入进缓冲区,其他线程从缓冲区中去除文件描述符进行响应服务:
/* sbuf.h */
#include "csapp.h"
#ifndef __SBUF_H__
#define __SBUF_H__
typedef struct {
/* 循环队列,存储client的文件描述符 */
int* buf;
int n;
int front;
int rear;
sem_t mutex;
sem_t slots;
sem_t items;
} sbuf_t;
/* 初始化 */
void sbuf_init(sbuf_t* sp, int n);
/* 清理 */
void sbuf_deinit(sbuf_t* sp);
/* 将一个文件描述符插入进缓冲区 */
void sbuf_insert(sbuf_t* sp, int fd);
/* 删除队首的文件描述符并返回给线程,交给线程服务 */
int sbuf_remove(sbuf_t* sp);
#endif
/* sbuf.c */
#include "sbuf.h"
void sbuf_init(sbuf_t* sp, int n) {
sp->buf = Calloc(n, sizeof(int));
sp->n = n;
sp->front = 0;
sp->rear = 0;
Sem_init(&(sp->mutex), 0, 1);
Sem_init(&(sp->slots), 0, n);
Sem_init(&(sp->items), 0, 0);
}
void sbuf_deinit(sbuf_t* sp) {
Free(sp->buf);
}
void sbuf_insert(sbuf_t* sp, int fd) {
P(&(sp->slots));
P(&(sp->mutex));
sp->buf[sp->rear] = fd;
sp->rear = (sp->rear + 1) % sp->n;
V(&(sp->mutex));
V(&(sp->items));
}
int sbuf_remove(sbuf_t* sp) {
int fd;
P(&(sp->items));
P(&(sp->mutex));
fd = sp->buf[sp->front];
sp->front = (sp->front + 1) % sp->n;
V(&(sp->mutex));
V(&(sp->slots));
return fd;
}
cache
这里cache根据uri存储server返回的数据,读者-写者读优先,替换策略为LRU。一个proxy cache还应该注意数据的有效性,这里就直接写了一个长期有效的(偷懒)。
/* cache.h */
#ifndef __CACHE_H__
#define __CACHE_H__
#include "csapp.h"
/* cache行 */
#define CACHE_SIZE 10
/* server返回的data最大大小 */
#define MAX_OBJECT_SIZE 102400
typedef struct {
int valid;
int lru_time;
int reader_cnt;
char uri[MAXLINE];
char data[MAX_OBJECT_SIZE];
sem_t mutex;
sem_t w;
} cacheline_t;
/* malloc初始化 */
cacheline_t* cache_init();
/* free空间 */
void cache_deinit(cacheline_t* cache);
/* 根据uri读data,成功返回1,不成功返回0 */
int cache_read(cacheline_t* cache, char* uri, char* data);
/* 将uri-data写入cache,自动根据lru策略替换 */
void cache_write(cacheline_t* cache, char* uri, char* data);
#endif
/* cache.c */
#include "cache.h"
/* 返回LRU策略下的替换行 */
int replace_line(cacheline_t* cache);
cacheline_t* cache_init() {
cacheline_t* cache = Calloc(CACHE_SIZE, sizeof(cacheline_t));
int i;
for (i = 0; i < CACHE_SIZE; ++i) {
Sem_init(&(cache[i].mutex), 0, 1);
Sem_init(&(cache[i].w), 0, 1);
}
return cache;
}
void cache_deinit(cacheline_t* cache) {
Free(cache);
}
/* 写优先 */
int cache_read(cacheline_t* cache, char* uri, char* data) {
int i, r = 0;
for (i = 0; i < CACHE_SIZE; ++i) {
P(&(cache[i].mutex));
/* 第一个进入的写者 */
if (++cache[i].reader_cnt == 1) {
P(&(cache[i].w));
}
V(&(cache[i].mutex));
/* 对比uri */
if (!r && cache[i].valid && !strcmp(cache[i].uri, uri)) {
sprintf(data, "%s", cache[i].data);
cache[i].lru_time = 0;
r = 1;
} else if (cache[i].valid) {
/* 增加其他所有项的lru_time */
++cache[i].lru_time;
}
P(&(cache[i].mutex));
/* 最后一个退出的写者 */
if (--cache[i].reader_cnt == 0) {
V(&(cache[i].w));
}
V(&(cache[i].mutex));
}
return r;
}
void cache_write(cacheline_t* cache, char* uri, char* data) {
int i = replace_line(cache);
P(&(cache[i].w));
cache[i].valid = 1;
cache[i].lru_time = 0;
sprintf(cache[i].uri, "%s", uri);
sprintf(cache[i].data, "%s", data);
V(&(cache[i].w));
}
int replace_line(cacheline_t* cache) {
/* lru_time = -1表示找到了空行,不需要再寻找、替换了 */
int i, r, lru_time = 0;
for (i = 0; i < CACHE_SIZE && lru_time != -1; ++i) {
P(&(cache[i].mutex));
if (++cache[i].reader_cnt == 1) {
P(&(cache[i].w));
}
V(&(cache[i].mutex));
if (!cache[i].valid) {
r = i;
lru_time = -1;
} else if (cache[i].lru_time >= lru_time) {
r = i;
lru_time = cache[i].lru_time;
}
P(&(cache[i].mutex));
if (--cache[i].reader_cnt == 0) {
V(&(cache[i].w));
}
V(&(cache[i].mutex));
}
return r;
}
proxy
proxy的思路为,持续侦听一个端口,将所有client的文件描述符放入池中。
生成若干个线程,持续从池中取出文件描述符进行处理。
处理过程为解析method、uri、version,若cache中有uri对应的数据,则直接将数据传递给client;若cache中没有,则解析出server地址,作为中间人传递数据,结束前将数据存入cache中。
/* proxy.c */
#include <stdio.h>
#include "csapp.h"
#include "sbuf.h"
#include "cache.h"
/* 线程数 */
#define NTHREADS 8
/* sbuf中的文件描述符个数 */
#define SBUF_SIZE 10
void create_thread();
void serve(int client_fd);
int parse_uri(char* uri, char* server_hostname, char* server_port);
void send_request(int server_fd, char* server_hostname, char* server_port, char* method, char* uri, char* version);
void* thread(void* vargp);
/* http请求报头中的User-Agent部分 */
static const char *user_agent_hdr = "User-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:10.0.3) Gecko/20120305 Firefox/10.0.3\r\n";
sbuf_t sbuf;
cacheline_t* cache;
int main(int argc, char** argv) {
int listen_fd, client_fd;
struct sockaddr_storage client_addr;
socklen_t client_addr_len = sizeof(client_addr);
char client_hostname[MAXLINE], client_port[MAXLINE];
char* proxy_port;
if (argc != 2) {
printf("usage: %s <port>\n", argv[0]);
exit(0);
}
proxy_port = argv[1];
listen_fd = Open_listenfd(proxy_port);
/* 初始化sbuf */
sbuf_init(&sbuf, SBUF_SIZE);
/* 初始化cache */
cache = cache_init();
/* 创建线程 */
create_thread();
while(1) {
if ((client_fd = Accept(listen_fd, (SA*)(&client_addr), &client_addr_len)) < 0) {
printf("client_fd error.\n");
return 0;
}
Getnameinfo((SA*)(&client_addr), client_addr_len, client_hostname, MAXLINE, client_port, MAXLINE, 0);
printf("Accepted connection from (%s, %s)\n", client_hostname, client_port);
/* 插入sbuf中,继续侦听 */
sbuf_insert(&sbuf, client_fd);
}
cache_deinit(cache);
sbuf_deinit(&sbuf);
Close(listen_fd);
return 0;
}
/* 创建线程 */
void create_thread() {
pthread_t tid;
int i;
for (i = 0; i < NTHREADS; ++i) {
Pthread_create(&tid, NULL, thread, NULL);
}
}
/* 线程例程,持续从sbuf中去除client_fd处理 */
void* thread(void* vargp) {
Pthread_detach(pthread_self());
int client_fd;
while(1) {
client_fd = sbuf_remove(&sbuf);
serve(client_fd);
Close(client_fd);
}
}
/* 对client_fd进行处理 */
void serve(int client_fd) {
rio_t rio;
char buf[MAXLINE], method[MAXLINE], uri[MAXLINE], version[MAXLINE];
memset(buf, 0, MAXLINE);
Rio_readinitb(&rio, client_fd);
if ( !Rio_readlineb(&rio, buf, MAXLINE) ) {
printf("Failed to read http request line.\n");
return;
}
/* 首行格式为 method uri version */
sscanf(buf, "%s %s %s", method, uri, version);
/* 目前只部署了GET请求 */
if (strcmp(method, "GET")) {
printf("Method %s not implemented.\n", method);
return;
}
/* 发送给client的数据 */
char* data = (char*)Malloc(MAX_OBJECT_SIZE);
/* 先在cache中寻找 */
if (cache_read(cache, uri, data)) {
Rio_writen(client_fd, data, strlen(data));
return;
}
/* 修改version到HTTP/1.0(实验要求) */
sprintf(version, "%s", "HTTP/1.0");
char server_hostname[MAXLINE], server_port[MAXLINE];
memset(server_hostname, 0, MAXLINE);
memset(server_port, 0, MAXLINE);
/* 指出uri中相对路径的起始位置 */
int offset;
/* 解析出server host,返回相对路径位置,便于构建新的uri */
if ((offset = parse_uri(uri, server_hostname, server_port)) < 0) {
printf("parse uri error.\n");
return;
}
/* 默认端口为80 */
if (strlen(server_port) == 0) {
sprintf(server_port, "80");
}
/* 与服务器进行连接 */
int server_fd;
if ((server_fd = Open_clientfd(server_hostname, server_port)) < 0) {
printf("server_fd error.\n");
return;
}
/* 向服务器发送请求 */
send_request(server_fd, server_hostname, server_port, method, uri + offset, version);
rio_t server_rio;
int buf_len = 0;
int cnt = 0;
memset(buf, 0, MAXLINE);
Rio_readinitb(&server_rio, server_fd);
/* 接收服务器返回的数据 */
while ((buf_len = Rio_readnb(&server_rio, buf, MAXLINE)) != 0) {
Rio_writen(client_fd, buf, buf_len);
memcpy(data + cnt, buf, buf_len);
cnt += buf_len;
}
/* 写入cache */
cache_write(cache, uri, data);
Free(data);
}
/* 解析出uri中的server地址,同时返回相对路径位置 */
int parse_uri(char* uri, char* server_hostname, char* server_port) {
/* http:// */
char* ptr1 = strstr(uri, "//");
if (ptr1 == NULL) {
ptr1 = uri;
} else {
ptr1 += 2;
}
/* 相对路径开始位置 */
char* ptr2 = strstr(ptr1, "/");
if (ptr2 == NULL) {
return -1;
}
/* hostname:port,uri中可能没有指定端口 */
char* ptr3 = strstr(ptr1, ":");
if (ptr3 == NULL) {
ptr3 = ptr2;
}
/* 计算长度,得到hostname和port(port可能没有) */
int hostname_len = ptr3 - ptr1;
int port_len = (ptr3 == ptr2 ? 0 : ptr2 - (ptr3 + 1));
strncpy(server_hostname, ptr1, hostname_len);
strncpy(server_port, ptr3 + 1, port_len);
/* 返回相对路径位置 */
return ptr2 - uri;
}
/* 向server发送请求 */
void send_request(int server_fd, char* server_hostname, char* server_port, char* method, char* uri, char* version) {
char buf[MAXLINE];
sprintf(buf, "%s %s %s\r\n", method, uri, version);
Rio_writen(server_fd, buf, strlen(buf));
sprintf(buf, "Host: %s:%s\r\n", server_hostname, server_port);
Rio_writen(server_fd, buf, strlen(buf));
sprintf(buf, "%s\r\n", user_agent_hdr);
Rio_writen(server_fd, buf, strlen(buf));
sprintf(buf, "Connection: close");
Rio_writen(server_fd, buf, strlen(buf));
sprintf(buf, "Proxy-Connection: close");
Rio_writen(server_fd, buf, strlen(buf));
}
这里实现的代理还很简单,还可以添加其他HTTP请求的处理(POST,HEAD……),添加对一些信号的处理,完善错误处理,验证cache内数据的有效时间等等。