python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > python多线程并发测试

python多线程并发测试过程

作者:姑娘别秃头

这篇文章主要介绍了python多线程并发测试过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教

一、并发与并行?

1、多任务概念:操作系统可以同时运行多个任务。

2、并发:任务数量多于cpu核数,通过操作系统的任务调度算法,多个任务可以一起执行,实际总有一些任务不在一起执行,因为切换任务的速度相当快。宏观上是一起执行,微观上不是一起执行的。

3、并行:指的是任务数量小于cpu核数,即任务真的是一起执行。

⚠️多线程都是并发的状态,缺点是比并行慢

二、同步与异步的概念?

1、同步:协同步调

def work():
    print("1")
def work2():
    print("2")
if __name__ == '__main__':
    work()
    work2()

2、异步:步调各异

三、线程与进程的区别?

1、线程被包含在进程内,一个程序启动时,先启动进程,进程调用线程执行任务。

2、线程依赖进程。

3、线程可以被抢占(中断)

需求1:多线程执行不同任务

1、函数

函数式:调用thread模块中的start_new_thread()函数来产生新线程:

thread.start_new_thread ( function, args[, kwargs] )

参数说明:

示例1:主线程不等待子线程

import time
import threading as td

#为线程定义一个函数

def asr_test1(url):
    print("start")
    print(time.ctime(time.time()))


def asr_test2(url):
    print("end")
    print(time.ctime(time.time()))

if __name__ == '__main__':
#target为函数名,args为元组,函数一定要输入形参
    t1=td.Thread(target=asr_test1,args=("",))
    t2 = td.Thread(target=asr_test1, args=("",))
    t1.start()
    t2.start()
结果:
start
Fri Aug  5 16:03:05 2022
start
Fri Aug  5 16:03:05 2022
示例2、主线程等待子线程执行完毕
import threading as th
def work():
    print("1")
def work2():
    print("2")


#原理:启动时会调用python进程,进程调用主线程。
if __name__ == '__main__':
    '''子线程启动'''
    start_time = time.time()
    t1=th.Thread(target=work)
    t2 = th.Thread(target=work2)
    t1.start()
    t2.start()
    #主线程不会等待
    # t1.join(5)#设置主线程等待子线程的执行时间
    end = time.time() - start_time
    print(end)

thread提供了低级别的、原始的线程以及一个简单的锁。

Thread类提供以下方法:

2、类包装线程对象

需求2:多线程执行相同任务

1.threading并发性

线程并发运行并共享内存。

代码如下(重写run方法,继承Thread类):

import time
from threading import Thread
'''
Thread其他初始化参数:
target:指定任务函数
name:指定线程分组
'''
class MyThread(Thread):
    
    def run(self):
        for i in range(100):
            time.sleep(0.1)
            print("请求第{}次".format(self.name,i))


if __name__ == '__main__':
    for i in range(5):
        t=MyThread(name=f"线程{i+1}")
        t.start()

传递参数一:

def work(name,age):
    for i in range(5):
        # time.sleep(1)
        print("{}work1----{}---{}".format(name,i,age))


if __name__ == '__main__':
    #给任务函数传递参数,方式一:args参数,里面是元组
    # t=Thread(target=work,args=("",))
    # t.start()
    #方式2:kargs
    t2 = Thread(target=work, kwargs={"name":"wupig","age":18})
    t2.start()

传递参数:继承类

2.多线程并发—资源共享,资源竞争问题:

1、无法并行,原因是python解释器有一把锁:GIL,在同一时间只能执行一个线程。多线程是并发执行的,经过python解释器后进行线程调度,线程切换机制:

2、线程执行遇到IO耗时操作(sleep,网络io,文件io),python解释器释放锁,进行线程切换,再次获取全局锁。

3、线程执行时间达到一定的阈值(面试)

解决方法一:加锁

解决方法二:队列

死锁案例:

需求:

1、并发读取本地文件。

2、并发获取url地址。

xxx.txt文件中有很多URL地址,需求多并发获取。

'''需求:并发读取txt中的URL地址'''
list_test=[]
filename="../../url.txt"
file=open(filename,"r")
#列表推导式
list_test=[i.strip("\n") for i in file]
def audio_test():
    while list_test:
        url=list_test.pop()
        print(url)

def atest_thread(THREAD_MAX, Test_Script):
    #创建空列表
    thread_list = []
    for i in range(int(THREAD_MAX)):
        # print("**************",i)
        thread_name = 'thread-' + str(i)
        print("thread_name",thread_name)
        new_thread = threading.Thread(target=Test_Script, args="", name=thread_name)
        thread_list.append(new_thread)
        new_thread.start()
    for thread_obj in thread_list:
        #thread_obj.join:等待至线程中止
        thread_obj.join()

if __name__ == '__main__':
    starttime=time.time()
    count = 10
    # THREAD_MAX=10
    atest_thread(count, audio_test)
    endtime=time.time()-starttime
    print(endtime)

占坑

这样只有一个线程执行。需要创建列表,将线程放入列表中,然后循环变量列表进行json。

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

您可能感兴趣的文章:
阅读全文