利用Python的多线程对接口进程压测

  • Post author:
  • Post category:python



一,接口文档

请求url:http://xx.xx.xx.xx:8080/register/

请求类型:post

请求参数:

参数描述

参数名称

约束

参数类型

示例

用户名

userid

必填

String

“lily”

密码

password

必填

string

“lily”

邮箱

email

必填

String

lily@qq.com

参数规则说明:

username:

1、字母、数字组成

2、长度2~20位

3、字母不区分大小写

password:

1、长度8~20位

2、必须含有字母和数字

email:

标准的email规则

接口返回code说明:

’00’:成功

’01’:用户已存在

’02’:参数不合法

’03’:参数错误(1、用户信息错误 2、参数错误,数据库中不存在相应数据)

‘999’:未知错误,看后台日志


二,单进程请求接口

import requests

import json

data=json.dumps({‘username’: ‘lily’,’password’: ‘wcx123wac’, ’email’: ‘lily@qq.com’})

r=requests.post(‘http://xx.xx.xx.xx:8080/register/’,data = data)

print r.status_code#返回http相应状态码

print r.json()#返回想要结果

运行结果:

200{u’username’: u’lily’, u’code’: u’01’}#用户已存在


三,多进程请求

#coding=utf-8
import requests
import json
import random
from multiprocessing import Pool,Manager,Lock
import multiprocessing
def register(sucess_count,failure_count,lock):
    data={'username': 'lily', 'password': 'wcx123wac', 'email': 'abc@qq.com'}
    data['username']='test'+str(random.randint(100,999))
    data=json.dumps(data)
    r=requests.post('http://39.106.41.11:8080/register/', data = data)
    code=r.status_code
    print r.json()
    if r.json()['code']=='00':
        with lock:
            sucess_count.value+=1
    else:
        lock.acquire()
        failure_count.value += 1
        lock.release()
if __name__=='__main__':
    manager=Manager()
    lock=manager.Lock()
    sucess_count=manager.Value('i',0)
    failure_count=manager.Value('i',0)
    p=Pool()
    for i in range(multiprocessing.cpu_count()):
        p.apply_async(register,args=(sucess_count,failure_count,lock))
    p.close()
    p.join()
    print '注册成功次数为:',sucess_count.value
    print '注册失败次数为:', failure_count.value

执行结果:
{u'username': u'test464', u'code': u'999'}
{u'code': u'00', u'userid': 11801}
{u'code': u'00', u'userid': 11802}
{u'code': u'00', u'userid': 11803}
注册成功次数为: 3
注册失败次数为: 1



版权声明:本文为qq_32551117原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。