python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python多线程并发测试

Python实现多线程并发请求测试的脚本

作者:一夜奈何梁山

这篇文章主要为大家分享了一个Python实现多线程并发请求测试的脚本,文中的示例代码简洁易懂,具有一定的借鉴价值,需要的小伙伴可以了解一下

一: 需求

今天接到一个需求, 要对线上环境进行并发请求测试。 请求方式可以是两种一种是发送HTTP请求, 一种是发送MESH请求。

测试达到的效果

1: 通过测试检测网关, 引擎的内存, CPU消耗, 负载等。

2: 通过批量测试, 检测引擎规则是否有异常。

3: 通过测试, 发现单请求最短耗时和最长耗时。

二:测试脚本

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time   : 2022/12/5 10:57 AM
import json
import time
import requests
import click
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
from xylib.lib.http import mesh_http_path

mesh_appid = "XXXXXXX"
mesh_request_path = "XXXXXXXX"
http_request_url = "http://127.0.0.1:8888/xxxxx/xxxx"


def read_csv(file_name, test_num):
    """读取CSV文件"""
    res_datas = []
    df = pd.read_csv(file_name)
    index, column = df.shape
    for idx in range(0, index):
        params = df.loc[idx].to_dict()
        res_datas.append(params)
    return res_datas[:test_num]


def send_request(input_msg, request_type):
    request_data = {
        "xxxxx": 2,
        "xxxxxx": "oooooooo",
        "xxxxxxx": {
            "xxxxxxxx": str(input_msg.get("aaaa", "0000")),
            "xxxxxx": int(input_msg.get("bbbbb", 50)),
            "xxxxx": int(input_msg.get("ccccc", 48)),
            "xxxxxxxxxx": str(input_msg.get("dddddd", "")),
        }
    }
    if request_type == "http":
        res = requests.post(http_request_url, data=json.dumps(request_data))
    else:
        res = mesh_http_path(
            mesh_appid,
            mesh_appid,
            mesh_request_path,
            'POST',
            data=json.dumps(request_data)
        )
    result = dict()
    try:
        result = json.loads(res)
    except Exception as e:
        print("error is {}".format(e.message))
    input_msg["xdxaxaxa"] = result.get("xaxx", {}).get("xaxsaxs", {}).get("xaxaxsx", "")
    input_msg["xaxaxs"] = result.get("xaxsxs").get("xaxsaxs", {}).get("xsaxsaxsax", "")
    return input_msg


def threading_test(input_datas, pool_num, req_type):
    """多线程并发测试"""
    out_put_datas = []
    futures = []
    start_time = time.time()
    try:
        with ThreadPoolExecutor(max_workers=pool_num) as executor:
            for input_data in input_datas:
                futures.append(executor.submit(send_request, (input_data, req_type)))
        for future in futures:
            out_put_datas.append(future.result())
    except Exception as e:
        print("error is {}".format(e.message))
    finally:
        end_time = time.time()
        print("cost time is %s" % str(end_time - start_time))
    return out_put_datas


def write_to_csv(out_put_datas, out_file_name):
    """写入到csv文件中"""
    rdf = pd.DataFrame(out_put_datas)
    rdf.to_csv(out_file_name)


@click.command()
@click.option('--req_type', default="http", help='You need input http or mesh')
@click.option('--pool_num', default=80, help='You need input a num')
@click.option('--test_num', default=1000000, help='You need input a num')
@click.option('--file_name', default="hy.csv", help='You need input a file name')
@click.option('--out_file_name', default="result.csv", help='You need input a file name')
def run(req_type, pool_num, test_num, file_name, out_file_name):
    """主运行函数"""
    # 读取测试需要用的CSV文件内容, test_num限制测试数据数量
    res_datas = read_csv(file_name=file_name, test_num=test_num)
    # 进行并发请求测试
    out_put_datas = threading_test(input_datas=res_datas, pool_num=pool_num, req_type=req_type)
    # 测试结果写入到CSV文件中
    write_to_csv(out_put_datas=out_put_datas, out_file_name=out_file_name)


if __name__ == '__main__':
    run()

到此这篇关于Python实现多线程并发请求测试的脚本的文章就介绍到这了,更多相关Python多线程并发测试内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文