400行python教你写个高性能http服务器+web框架,性能秒胜tornado uwsgi webpy django
时间: 2014-05-09来源:开源中国
前景提要
HDC调试需求开发(15万预算),能者速来!>>>
echo hello 性能压测abtest
tornado 4kqps
nginx+tornado 9kqps
nginx+uwsgi 8kqps (注意:没说比nginx快,只是这几个web框架不行)

本server 3.2w qps
没用任何python加速
不相信的可以自己压测下哦


什么都不说400行代码加使用例子,欢迎吐槽,欢迎加好友一起进步
server.py #!/usr/bin/python #-*- coding:utf-8 -*- # Copyright (c) 2012, Baidu.com Inc. # # Author : hemingzhe <512284622@qq.com>; xiaorixin <xiaorx@live.com> # Date : Dec 20, 2012 # import socket, logging import select, errno import os import sys import traceback import Queue import threading import time import thread import cgi from cgi import parse_qs import json import imp from sendfile import sendfile from os.path import join, getsize import md5 import gzip from StringIO import StringIO from BaseHTTPServer import BaseHTTPRequestHandler import re logger = logging.getLogger("network-server") action_dic = {} action_time = {} static_file_dir = "static" static_dir = "/%s/" % static_file_dir cache_static_dir = "cache_%s" % static_file_dir if not os.path.exists(cache_static_dir): os.makedirs(cache_static_dir) filedic = {"HTM":None,"HTML":None,"CSS":None,"JS":None,"TXT":None} def getTraceStackMsg(): tb = sys.exc_info()[2] msg = '' for i in traceback.format_tb(tb): msg += i return msg def md5sum(fobj): m = md5.new() while True: d = fobj.read(65536) if not d: break m.update(d) return m.hexdigest() class QuickHTTPRequest(): def __init__(self, data): headend = data.find("\r\n\r\n") rfile = "" if headend > 0: rfile = data[headend+4:] headlist = data[0:headend].split("\r\n") else: headlist = data.split("\r\n") self.rfile = StringIO(rfile) first_line = headlist.pop(0) self.command, self.path, self.http_version = re.split('\s+', first_line) indexlist = self.path.split('?') indexlist = indexlist[0].split('/') while len(indexlist) != 0: self.index = indexlist.pop() if self.index == "": continue else: self.action,self.method = os.path.splitext(self.index) self.method = self.method.replace('.', '') break self.headers = {} for item in headlist: if item.strip() == "": continue segindex = item.find(":") if segindex < 0: continue key = item[0:segindex].strip() value = item[segindex+1:].strip() self.headers[key] = value c_low = self.command.lower() self.getdic = {} self.form = {} self.postdic = {} if c_low == "get" and "?" in self.path: self.getdic = parse_qs(self.path.split("?").pop()) elif c_low == "post" and self.headers.get('Content-Type',"").find("boundary") > 0: self.form = cgi.FieldStorage(fp=self.rfile,headers=None, environ={'REQUEST_METHOD':self.command,'CONTENT_TYPE':self.headers['Content-Type'],}) if self.form == None: self.form = {} elif c_low == "post": self.postdic = parse_qs(rfile) def sendfilejob(request, data, epoll_fd, fd): #print request.path,"3:",time.time() try: base_filename = request.path[request.path.find(static_dir)+1:] cache_filename = "./cache_"+base_filename filename = "./"+base_filename if not os.path.exists(filename): raise Exception("file not found") name,ext = os.path.splitext(filename) ext = ext.replace('.', '') iszip = False etag = request.headers.get("If-Modified-Since", None) #filemd5 = md5sum(file(filename)) filemd5 = str(os.path.getmtime(filename)) if ext.upper() in filedic: if not os.path.exists(cache_filename) or (etag != None and etag != filemd5): d,f = os.path.split(cache_filename) try: if not os.path.exists(d): os.makedirs(d) f_out = gzip.open(cache_filename, 'wb') f_out.write(open(filename).read()) f_out.close() except Exception, e: print str(e) pass filename = cache_filename iszip = True sock = data["connections"] #sock.setblocking(1) if etag == filemd5: #sock.send("HTTP/1.1 304 Not Modified\r\nLast-Modified: %s\r\n\r\n" % filemd5) data["writedata"] = "HTTP/1.1 304 Not Modified\r\nLast-Modified: %s\r\n\r\n" % filemd5 else: data["sendfile"] = True sock.setblocking(1) offset = 0 filesize = os.path.getsize(filename) f = open(filename, "rb") if iszip: headstr = "HTTP/1.0 200 OK\r\nContent-Length: %s\r\nLast-Modified: %s\r\nContent-Encoding: gzip\r\n\r\n" % (filesize,filemd5) else: headstr = "HTTP/1.0 200 OK\r\nContent-Length: %s\r\nLast-Modified: %s\r\n\r\n" % (filesize,filemd5) sock.send(headstr) while True: sent = sendfile(sock.fileno(), f.fileno(), offset, 65536) if sent == 0: break # EOF offset += sent f.close() data["writedata"] = "" data["sendfile"] = False #print request.path,"4:",time.time() except Exception, e: data["sendfile"] = False #data["writedata"] = str(e)+getTraceStackMsg() data["writedata"] = "file not found" pass try: data["readdata"] = "" epoll_fd.modify(fd, select.EPOLLET | select.EPOLLOUT | select.EPOLLERR | select.EPOLLHUP) except Exception, e: print str(e)+getTraceStackMsg() class Worker(object): def __init__(self): pass def process(self, data, epoll_fd, fd): res = "" add_head = "" try: request = QuickHTTPRequest(data["readdata"]) except Exception, e: #return "http parser error:"+str(e)+getTraceStackMsg() res = "http format error" try: headers = {} headers["Content-Type"] = "text/html;charset=utf-8" if request.path == "/favicon.ico": request.path = "/"+static_file_dir+request.path if static_dir in request.path or "favicon.ico" in request.path: thread.start_new_thread(sendfilejob, (request,data,epoll_fd,fd)) #sendfilejob(request,data,epoll_fd,fd) return None action = action_dic.get(request.action, None) if action == None: action = __import__(request.action) mtime = os.path.getmtime("./%s.py" % request.action) action_time[request.action] = mtime action_dic[request.action] = action else: load_time = action_time[request.action] mtime = os.path.getmtime("./%s.py" % request.action) if mtime>load_time: action = reload(sys.modules[request.action]) action_time[request.action] = mtime action_dic[request.action] = action method = getattr(action, request.method) res = method(request, headers) if headers.get("Connection","") != "close": data["keepalive"] = True res_len = len(res) headers["Content-Length"] = res_len for key in headers: add_head += "%s: %s\r\n" % (key, headers[key]) except Exception, e: #res = str(e)+getTraceStackMsg() res = "page no found" try: data["writedata"] = "HTTP/1.1 200 OK\r\n%s\r\n%s" % (add_head, res) data["readdata"] = "" epoll_fd.modify(fd, select.EPOLLET | select.EPOLLOUT | select.EPOLLERR | select.EPOLLHUP) except Exception, e: print str(e)+getTraceStackMsg() def InitLog(): logger.setLevel(logging.DEBUG) fh = logging.FileHandler("network-server.log") fh.setLevel(logging.DEBUG) ch = logging.StreamHandler() ch.setLevel(logging.ERROR) formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") ch.setFormatter(formatter) fh.setFormatter(formatter) logger.addHandler(fh) logger.addHandler(ch) class MyThread(threading.Thread): ind = 0 def __init__(self, threadCondition, shareObject, **kwargs): threading.Thread.__init__(self, kwargs=kwargs) self.threadCondition = threadCondition self.shareObject = shareObject self.setDaemon(True) self.worker = Worker() def processer(self, args, kwargs): try: param = args[0] epoll_fd = args[1] fd = args[2] self.worker.process(param, epoll_fd, fd) except: print "job error:" + getTraceStackMsg() def run(self): while True: try: args, kwargs = self.shareObject.get() self.processer(args, kwargs) except Queue.Empty: continue except : print "thread error:" + getTraceStackMsg() class ThreadPool: def __init__( self, num_of_threads=10): self.threadCondition=threading.Condition() self.shareObject=Queue.Queue() self.threads = [] self.__createThreadPool( num_of_threads ) def __createThreadPool( self, num_of_threads ): for i in range( num_of_threads ): thread = MyThread( self.threadCondition, self.shareObject) self.threads.append(thread) def start(self): for thread in self.threads: thread.start() def add_job( self, *args, **kwargs ): self.shareObject.put( (args,kwargs) ) def run_main(listen_fd): try: epoll_fd = select.epoll() epoll_fd.register(listen_fd.fileno(), select.EPOLLIN | select.EPOLLET | select.EPOLLERR | select.EPOLLHUP) except select.error, msg: logger.error(msg) tp = ThreadPool(16) tp.start() params = {} def clearfd(fd): epoll_fd.unregister(fd) params[fd]["connections"].close() del params[fd] last_min_time = -1 while True: epoll_list = epoll_fd.poll() for fd, events in epoll_list: cur_time = time.time() if fd == listen_fd.fileno(): while True: try: conn, addr = listen_fd.accept() #print "accept",time.time(),conn.fileno() conn.setblocking(0) epoll_fd.register(conn.fileno(), select.EPOLLIN | select.EPOLLET | select.EPOLLERR | select.EPOLLHUP) conn.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) #conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True) params[conn.fileno()] = {"addr":addr,"writelen":0, "connections":conn, "time":cur_time} except socket.error, msg: break elif select.EPOLLIN & events: #print "read",time.time() param = params[fd] param["time"] = cur_time datas = param.get("readdata","") cur_sock = params[fd]["connections"] while True: try: data = cur_sock.recv(102400) if not data: clearfd(fd) break else: datas += data except socket.error, msg: if msg.errno == errno.EAGAIN: #logger.debug("%s receive %s" % (fd, datas)) param["readdata"] = datas len_s = -1 len_e = -1 contentlen = -1 headlen = -1 len_s = datas.find("Content-Length:") if len_s > 0: len_e = datas.find("\r\n", len_s) if len_s > 0 and len_e > 0 and len_e > len_s+15: len_str = datas[len_s+15:len_e].strip() if len_str.isdigit(): contentlen = int(datas[len_s+15:len_e].strip()) headend = datas.find("\r\n\r\n") if headend > 0: headlen = headend + 4 data_len = len(datas) if (contentlen > 0 and headlen > 0 and (contentlen + headlen) == data_len) or \ (contentlen == -1 and headlen == data_len): tp.add_job(param,epoll_fd,fd) #print "1:",time.time() #thread.start_new_thread(process, (param,epoll_fd,fd)) break else: clearfd(fd) logger.error(msg) break elif select.EPOLLHUP & events or select.EPOLLERR & events: clearfd(fd) logger.error("sock: %s error" % fd) elif select.EPOLLOUT & events: #print "write",time.time() param = params[fd] param["time"] = cur_time sendLen = param.get("writelen",0) writedata = param.get("writedata", "") total_write_len = len(writedata) cur_sock = params[fd]["connections"] if writedata == "": clearfd(fd) continue while True: try: sendLen += cur_sock.send(writedata[sendLen:]) if sendLen == total_write_len: if param.get("keepalive", True): param["readdata"] = "" param["writedata"] = "" param["writelen"] = 0 epoll_fd.modify(fd, select.EPOLLET | select.EPOLLIN | select.EPOLLERR | select.EPOLLHUP) else: clearfd(fd) break except socket.error, msg: if msg.errno == errno.EAGAIN: param["writelen"] = sendLen break else: continue #check time out if cur_time - last_min_time > 10: last_min_time = cur_time objs = params.items() for (key_fd,value) in objs: fd_time = value.get("time", 0) del_time = cur_time - fd_time if del_time > 10: sendfile = value.get("sendfile", False) if sendfile != True: clearfd(key_fd) elif fd_time < last_min_time: last_min_time = fd_time if __name__ == "__main__": InitLog() port = int(sys.argv[1]) try: listen_fd = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) except socket.error, msg: logger.error("create socket failed") try: listen_fd.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) except socket.error, msg: logger.error("setsocketopt SO_REUSEADDR failed") try: listen_fd.bind(('', port)) except socket.error, msg: logger.error("bind failed") try: listen_fd.listen(1024) listen_fd.setblocking(0) except socket.error, msg: logger.error(msg) child_num = 8 c = 0 while c < child_num: c = c + 1 newpid = os.fork() if newpid == 0: run_main(listen_fd) run_main(listen_fd)






用户文档:


1、启动:
指定监听端口即可启动
python server.py 8992


2、快速编写cgi,支持运行时修改,无需重启server


在PySvr.py同一目录下
随便建一个python 文件
例如:
example.py
定义一个tt函数:
则请求该函数的url为 http://ip:port/example.tt
修改后保存,即可访问,无需重启


example.py
def tt(request,response_head):
#print request.form
#print request.getdic
#print request.postdic
return "ccb"+request.path


函数必须带两个参数
request:表示请求的数据 默认带以下属性
headers: 头部 (字典)
form: multipart/form表单 (字典)
getdic: url参数 (字典)
postdic: httpbody参数 (字典)
rfile: 原始http content内容 (字符串)
action: python文件名 (这里为example)
method: 函数方法 (这里为tt)
command: (get or post)
path: url (字符串)
http_version: http版本号 (http 1.1)
response_head: 表示response内容的头部
例如如果要返回用gzip压缩
则增加头部
response_head["Content-Encoding"] = "gzip"


3、下载文件
默认静态文件放在static文件夹下
例如把a.jpg放到static文件夹下
访问的url为 http://ip:port/static/a.jpg
支持etag 客户端缓存功能
支持range 支持断点续传
(server 使用sendfile进行文件发送,不占内存且快速)


4、支持网页模板编写
创建一个模板 template.html
<HTML>
<HEAD><TITLE>$title</TITLE></HEAD>
<BODY>
$contents
</BODY>
</HTML>


则对应的函数:
def template(request,response_head):
t = Template(file="template.html")
t.title = "my title"
t.contents = "my contents"
return str(t)
模板实现使用了python最快速Cheetah开源模板,
性能约为webpy django thinkphp等模板的10倍以上:
http://my.oschina.net/whp/blog/112296


example.py


import os
import imp
import sys
import time
import gzip
from StringIO import StringIO
from Cheetah.Template import Template


def tt(request,response_head):
#print request.form
#print request.getdic
#print request.postdic
return "ccb"+request.path


def getdata(request,response_head):
f=open("a.txt")
content = f.read()
f.close()
response_head["Content-Encoding"] = "gzip"
return content


def template(request,response_head):
t = Template(file="template.html")
t.title = "my title"
t.contents = "my contents"
response_head["Content-Encoding"] = "gzip"
return str(t)




安装 如果需要使用web模板功能这里需要安装Cheetah
附件中有安装脚本

科技资讯:

科技学院:

科技百科:

科技书籍:

网站大全:

软件大全:

热门排行