威尼斯人线上娱乐

线程中互斥锁,拾贰线程爬虫

19 4月 , 2019  

线程(下)

线程的定义

线程是操作系统能够实行演算调度的小不点儿单位。它被含有在经过中。是经过中的实际运作单位。一条线程指的是进程中八个十足顺序的调节流。2个进程中得以并发五个线程,每条线程并行实行差别的职责
多个线程的举行会因而线程的调度去抢占CPU的能源

线程与经过

1、同步锁 (Lock)

  当全局能源(counter)被私吞的场合,难点时有发生的由来尽管未有决定四个线程对同壹财富的访问,对数据形成损坏,使得线程运营的结果不可预料。那种场馆称为“线程不安全”。在支付进程中我们务供给制止那种情状,那怎么防止?那就用到了互斥锁了。

例如:

 1 import threading,time
 2 def sub():
 3     global num         #对全局变量进行操作
 4 
 5     temp=num
 6     time.sleep(0.001)    #模拟线程执行中出现I/o延迟等
 7     num=temp-1           #所有线程对全局变量进行减一
 8 
 9     time.sleep(1)
10 
11 num=100
12 l=[]
13 
14 for i in range(100):
15     t=threading.Thread(target=sub,args=())
16     t.start()
17     l.append(t)
18 
19 for obj in l:
20     obj.join()
21 
22 print(num)          
23 
24 #执行结果不可预期:
25 >>:90
26 >>:93
27 >>:92
28 

7.同步锁

本条例子很优良,实话说,这些事例作者是直接照搬前辈的,并不是原创,可是确实也很有意思,请看:

#!usr/bin/env python
#-*- coding:utf-8 -*-

# author:yangva

import threading,time

number = 100
def subnum():
    global number
    number -= 1

threads = []
for i in range(100):
    t = threading.Thread(target=subnum,args=[])
    t.start()
    threads.append(t)

for i in threads:
    i.join()

print(number)

 

那段代码的情趣是,用玖十八个线程去减一,以此让变量number为100的变为0

 

结果:

 

威尼斯人线上娱乐 1

 

那正是说本身稍微的改下代码看看: 

 

#!usr/bin/env python
#-*- coding:utf-8 -*-

# author:yangva

import threading,time

number = 100
def subnum():
    global number
    temp = number
    time.sleep(0.2)
    number = temp -1

threads = []
for i in range(100):
    t = threading.Thread(target=subnum,args=[])
    t.start()
    threads.append(t)

for i in threads:
    i.join()

print(number)

  

并没有异常的大的转移对吗,只是加了一个暂时变量,并且中途抛锚了0.2s而已。

而那几个结果就分化样了:

威尼斯人线上娱乐 2

 

那边自个儿先说下,time.sleep(0.二)是小编故意加的,正是要反映这几个效应,假诺你的微处理器不加sleep就早已面世这几个场所了那么您就不用加了,这咋回事呢?那就是线程共用多少的潜在惊恐性,因为线程都以抢着CPU财富在运作,只要发现存空儿就各自抢着跑,所以在那停顿的0.2s时间中,就会有新的线程抢到机会开始运营,那么玖拾玖个线程就有7000克个线程在抢机会运营,抢到的小运都以在temp还未曾减一的值,也正是100,所以超越八分之四的线程都抢到了100,然后减1,少一些线程没抢到,抢到已经减了三遍的9玖,那就是怎么会是9九的缘由。而这一个抢占的时光和结果并不是常有的因由,究其一贯依然因为计算机的配备难题了,配置越好的话,那种越不易于发生,因为三个线程抢到CPU财富后平昔在运维,其余的线程在短短的时间里得不到机会。

 

而为啥number -= 1,不正视任何变量的写法就没事吗?因为numebr -=
壹事实上是多个步骤,减一一视同仁复赋值给number,这几个动作太快,所以根本没给其余的线程机会。

 

图解: 

威尼斯人线上娱乐 3

 

那正是说这么些难题大家怎么消除吗,在事后的支出中绝对会高出那种气象对啊,这一个可以消除吗?总局方的授课,有人会想到用join,而眼下已经提过了join会使八线程产生串行,失去了102线程的意图。这几个到底怎么化解呢,用同步锁

同步锁:当运营起来加锁,幸免别的线程索取,当运营甘休释放锁,让其余线程继续

 

#!usr/bin/env python
#-*- coding:utf-8 -*-

# author:yangva
import threading,time

r = threading.Lock() #创建同步锁对象

number = 100
def subnum():
    global number
    r.acquire() #加锁
    temp = number
    time.sleep(0.2)
    number = temp - 1
    r.release() #释放


threads = []
for i in range(100):
    t = threading.Thread(target=subnum,args=[])
    t.start()
    threads.append(t)

for i in threads:
    i.join()

print(number)

  

运营结果:

威尼斯人线上娱乐 4

 

可是你意识没,这么些运行太慢了,每一种线程都运作了三次sleep,竟然又改成和串行运营大约了对啊?可是照旧和串行稍微有点分裂,只是在有一同锁这里是串行,在此外地点大概二十多线程的效用

 

那正是说有对象要问了,既然都以锁,已经有了3个GIL,那么还要联合锁来干嘛呢?一句话,GIL是器重于保障线程安全,同步锁是用户级的可控机制,开辟中防备那种不分明的私人住房隐患

 

进度的定义

程序执行的实例称为进度
种种进程提供推行顺序所需的财富。进程具备虚拟地址空间,可实行代码,系统对象的张开句柄,安全上下文,唯一进度标志符,环境变量,优先级档次,最小和最大职业集。每一种进程都接纳单线程运营,平时号称主线程,但能够从其任何线程创建别的线程

进度和线程的可比
经过和线程之间的比较是没风趣的,因为经过是叁个顺序的实践实例,而经过是由线程举办推行的,但线程和经过究竟还是二种体制

  • 进程能够创立子进度,而各类子进度又足以开七个线程
  • 线程之间能够共享数据,而线程之间不能够共享数据,线程之间能够开始展览通讯,而经过之间进行通信就会比较劳碌
  • 开采进度要比开采线程的支出大过多

怎么样是线程(thread)?

线程中互斥锁,拾贰线程爬虫。线程是操作系统能够举办演算调度的微小单位。它被含有在经过之中,是进程中的实际运转单位。一条线程指的是经过中3个纯净顺序的调节流,贰个进程中能够并发多少个线程,每条线程并行推行区别的天职

A thread is an execution context, which is all the information a CPU
needs to execute a stream of instructions.

Suppose you’re reading a book, and you want to take a break right now,
but you want to be able to come back and resume reading from the exact
point where you stopped. One way to achieve that is by jotting down the
page number, line number, and word number. So your execution context for
reading a book is these 3 numbers.

If you have a roommate, and she’s using the same technique, she can take
the book while you’re not using it, and resume reading from where she
stopped. Then you can take it back, and resume it from where you were.

Threads work in the same way. A CPU is giving you the illusion that it’s
doing multiple computations at the same time. It does that by spending a
bit of time on each computation. It can do that because it has an
execution context for each computation. Just like you can share a book
with your friend, many tasks can share a CPU.

On a more technical level, an execution context (therefore a thread)
consists of the values of the CPU’s registers.

Last: threads are different from processes. A thread is a context of
execution, while a process is a bunch of resources associated with a
computation. A process can have one or many threads.

Clarification: the resources associated with a process include memory
pages (all the threads in a process have the same view of the memory),
file descriptors (e.g., open sockets), and security credentials (e.g.,
the ID of the user who started the process).

互斥锁概念

  Python编程中,引进了对象互斥锁的概念,来保障共享数据操作的完整性。各个对象都对应于3个可称为”
互斥锁”
的暗号,那个标识用来有限协理在任一时半刻刻,只可以有1个线程访问该对象。在Python中大家应用threading模块提供的Lock类。

  我们对地点的次第举办整顿改进,为此大家须要足够三个排斥锁变量lock =
threading.Lock(),然后在武斗能源的时候前边大家会先抢占那把锁lock.acquire(),对能源采纳产生以往大家在释放那把锁mutex.release()。

代码如下:

import threading,time
def sub():
    global num

    lock.acquire()
    temp=num
    time.sleep(0.01)
    num=temp-1
    lock.release()

    time.sleep(1)

num=100
l=[]
lock=threading.Lock()
for i in range(100):
    t=threading.Thread(target=sub,args=())
    t.start()
    l.append(t)

for obj in l:
    obj.join()

print(num)

 8.死锁现象/可采用锁

前方既然已经用了伙同锁,那么相信在之后的成本中,绝对会用到利用八个同步锁的时候,所以那边模拟一下选用多少个共同锁,看看会有怎么样情状发生

 

#!usr/bin/env python
#-*- coding:utf-8 -*-

# author:yangva
import threading,time

a = threading.Lock() #创建同步锁对象a
b = threading.Lock() #创建同步锁对象b

def demo1():
    a.acquire() #加锁
    print('threading model test A....')
    b.acquire()
    time.sleep(0.2)
    print('threading model test B....')
    b.release()
    a.release() #释放

def demo2():
    b.acquire() #加锁
    print('threading model test B....')
    a.acquire()
    time.sleep(0.2)
    print('threading model test A....')
    a.release()
    b.release() #释放

threads = []
for i in range(5):
    t1 = threading.Thread(target=demo1,args=[])
    t2 = threading.Thread(target=demo2,args=[])
    t1.start()
    t2.start()
    threads.append(t1)
    threads.append(t2)

for i in threads:
    i.join()

 

  

运行结果:

威尼斯人线上娱乐 5

 

此地就一贯阻塞住了,因为demo1函数用的锁是外围a锁,内层b锁,demo二函数刚好相反,外层b锁,内层a锁,所以当四线程运营时,三个函数同时在互抢锁,哪个人也不让什么人,那就产生了不通,这些阻塞现象又叫死锁现象。

 

那么为了幸免发生那种事,大家得以利用threading模块下的帕杰罗LOCK来创立重用锁依此来制止那种现象

 

#!usr/bin/env python
#-*- coding:utf-8 -*-

# author:yangva
import threading,time

r = threading.RLock() #创建重用锁对象

def demo1():
    r.acquire() #加锁
    print('threading model test A....')
    r.acquire()
    time.sleep(0.2)
    print('threading model test B....')
    r.release()
    r.release() #释放

def demo2():
    r.acquire() #加锁
    print('threading model test B....')
    r.acquire()
    time.sleep(0.2)
    print('threading model test A....')
    r.release()
    r.release() #释放

threads = []
for i in range(5):
    t1 = threading.Thread(target=demo1,args=[])
    t2 = threading.Thread(target=demo2,args=[])
    t1.start()
    t2.start()
    threads.append(t1)
    threads.append(t2)

for i in threads:
    i.join()

  

运转结果:

威尼斯人线上娱乐 6

 

其一福睿斯lock其实正是Lock+总括器,总结器里的初叶值为0,每嵌套一层锁,总结器值加一,每释放壹层锁,计算器值减1,和联合锁同样,唯有当班值日为0时才算过逝,让别的线程接着抢着运营。而那个帕杰罗lock也有1个法定一点的名字,递归锁

 

 那么预计有意中人会问了,为啥会有死锁现象吧?大概您应有问,是怎样生产环境导致有死锁现象的,如故那句,为了保证数量同步性,防止二十八线程操作同一数据时产生抵触。这几个说辞很笼统对吧,作者说细点。比如前边的购物车系统,尽管我们在操作数据时又再次取了三次数据来保障数据的实在,假若多个用户同时登六购物车系统在操作的话,或然差异的操作但会波及到同五个多少的时候,就会促成数据或然不一样台了,那么就能够在中间代码里加一遍联袂锁,然后再在实际操作处再加1回联合锁,那样就现身多层同步锁,那么也就会合世死锁现象了,而那时这么些死锁现象是我们开采中恰恰要求的。

作者想,说了那些例子你应当能够了然为何lock里还要有lock,很轻便导致死锁现象大家依然要用它了,一言以蔽之假设供给死锁现象就用1块锁,不供给就换到递归锁。

 

Python中开创线程

Python中开创线程有三种情势

哪些是进度(process)?

An executing instance of a program is called a process.

Each process provides the resources needed to execute a program. A
process has a virtual address space, executable code, open handles to
system objects, a security context, a unique process identifier,
environment variables, a priority class, minimum and maximum working set
sizes, and at least one thread of execution. Each process is started
with a single thread, often called the primary thread, but can create
additional threads from any of its threads.

贰、死锁与递归锁

  所谓死锁:
是指四个或四个以上的进度或线程在实践进程中,因争夺财富而变成的一种相互等待的景观,若无外力功用,它们都将不能够推进下去。此时称系统处于死锁状态或系统发出了死锁,这么些长久在相互等待的进度称为死锁进度。 

会爆发死锁的例子:

class MyThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        self.foo()


    def foo(self):
        LockA.acquire()
        print('I am %s GET LOCKA---------%s'%(self.name,time.ctime()))
        LockB.acquire()
        print('I am %s GET LOCKB---------%s' % (self.name, time.ctime()))

        LockB.release()

        LockA.release()

LockA=threading.Lock()
LockB=threading.Lock()

for i in range(10):
    t=MyThread()
    t.start()

 九.实信号量/绑定式随机信号量

实信号量也是七个线程锁

1)Semaphore

非非确定性信号量认为更有具备八线程的含义。先不急着说,看看例子就懂:

#!usr/bin/env python
#-*- coding:utf-8 -*-

# author:yangva
import threading,time

s = threading.Semaphore(3) #创建值为3的信号量对象

def demo():
    s.acquire() #加锁
    print('threading model test A....')
    time.sleep(2)
    s.release() #释放

threads = []
for i in range(10):
    t = threading.Thread(target=demo,args=[])
    t.start()
    threads.append(t)

for i in threads:
    i.join()

  

运作结果:

威尼斯人线上娱乐 7

 

一旦您亲自测试那段代码,你会发现,那么些结果是三个壹组出的,出了二回叁个一组的,最终出了2个1组,一个一组都是互为的,中间停顿二秒。

此间能够给很形象的例证,要是有个别地方的停车位只可以同时停三辆车,当停车位有空时别的的车才足以停进来。那里的三个停车位就一定于确定性信号量。

 

2)BoundedSemaphore

既是有实信号量为我们做到那个1组一组的操作结果,但敢不敢保险那些线程就不会忽然的越出那么些设定好的车位呢?比如设定好的叁个功率信号量壹组,大家都清楚线程是争强着运维,万壹就有除了设定的二个线程外的一多少个线程抢到了运维权,何人也不让什么人,正是要一齐运营吧?好比,那里唯有三个车位,已经停满了,但有人正是要去挤1挤,出现第伍辆或许第陆辆车的情状,那几个和现实生活中的例子差不多太对劲了对吧?

那么大家咋办?当然这几个题目早就有人想好了,所以有了能量信号量的晋升版——绑定式复信号量(BoundedSemaphore)。既然是进步版,那么同复信号量同样该有的都有些,用法也1律,就是有个效益,在设定好的多少个线程一组运转时,即使有任何线程也抢到运转权,那么就会报错

比如thread_lock =
threading.BoundedSemaphore(5),那么二十四线程同时运营的线程数就必须在伍以内(包涵五),不然就报错。换句话,它具有了实时监督检查的职能,好比停车位上的掩护,即便发现车位满了,就禁止放行车辆,直到有空位了再允许车辆进入停车。

因为那些很简单,就多了个督察效果,其余和semaphore同样的用法,笔者就不演示了,本身探究吧

 

threading 模块

经过与线程的界别?

  1. Threads share the address space of the process that created it;
    processes have their own address space.
  2. Threads have direct access to the data segment of its process;
    processes have their own copy of the data segment of the parent
    process.
  3. Threads can directly communicate with other threads of its process;
    processes must use interprocess communication to communicate with
    sibling processes.
  4. New threads are easily created; new processes require duplication of
    the parent process.
  5. Threads can exercise considerable control over threads of the same
    process; processes can only exercise control over child processes.
  6. Changes to the main thread (cancellation, priority change, etc.) may
    affect the behavior of the other threads of the process; changes to
    the parent process does not affect child processes.

行使递归锁消除:

  在Python中为了接济在同1线程中屡屡伸手同1能源,python提供了可重入锁中华VLock。那个奥迪Q5Lock内部维护着二个Lock和二个counter变量,counter记录了acquire的次数,从而使得能源能够被反复require。直到3个线程全体的acquire都被release,别的的线程才具收获财富。上面包车型大巴例子假若运用CR-VLock代替Lock,则不会发生死锁:

class MyThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        self.foo()
        self.bar()

    def foo(self):
        RLock.acquire()
        print('I am %s GET LOCKA---------%s'%(self.name,time.ctime()))
        RLock.acquire()
        print('I am %s GET LOCKB---------%s' % (self.name, time.ctime()))

        RLock.release()
        RLock.release()

    def bar(self):

        RLock.acquire()
        print('I am %s GET LOCKB---------%s' % (self.name, time.ctime()))
        time.sleep(1)
        RLock.acquire()
        print('I am %s GET LOCKA---------%s' % (self.name, time.ctime()))

        RLock.release()
        RLock.release()

RLock=threading.RLock()

for i in range(10):
    t=MyThread()
    t.start()

  

十.规格变量同步锁

不多说,它也是多个线程锁,本质上是在本田UR-Vlock基础之上再增添上边包车型大巴四个措施 

condition = threading.Condition([Lock/RLock]),暗中同意里面包车型大巴参数是兰德瑞虎lock

 

wait():条件不满足时调用,释放线程并跻身等待绿灯

notify():条件创设后调用,文告等待池激活多个线程

notifyall():条件创建后调用,布告等待池激活全部线程

 

一向上例子

#!usr/bin/env python
#-*- coding:utf-8 -*-

# author:yangva
import threading,time
from random import randint

class producer(threading.Thread):
    '''
    生产者
    '''
    def run(self):
        global Li
        while True:
            value = randint(0,100) #创建一百以内随机数
            print('生产者',self.name,'Append:'+str(value),Li)
            if con.acquire(): #加锁
                Li.append(value) #把产品加入产品列表里
                con.notify()  #通知等待池里的消费者线程激活并运行
                con.release() #释放
            time.sleep(3)     #每3秒做一次产品

class consumer(threading.Thread):
    '''
    消费者
    '''
    def run(self):
        global Li
        while True:
            con.acquire() #获取条件变量锁,必须和生产者同一个锁对象,生产者通知后在此处开始运行
            if len(Li) == 0: #如果产品列表内没数据,表示消费者先抢到线程运行权
                con.wait()   #阻塞状态,等待生产者线程通知
            print('消费者',self.name,'Delete:'+str(Li [0]),Li)
            Li.remove(Li[0]) #删除被消费者用掉的产品
            con.release()    #释放
            time.sleep(0.5)  #每0.5秒用掉一个产品

con = threading.Condition() #创建条件变量锁对象
threads = [] #线程列表
Li = [] #产品列表

for i in range(5):
    threads.append(producer())

threads.append(consumer())

for i in threads:
    i.start()

for i in threads:
    i.join()

  

运行结果:

威尼斯人线上娱乐 8

 

图形只截取了一部分,因为它一向在有线循环着的。这么些生产者和顾客的模子很优异,必须通晓,每一种步骤分别什么看头作者都注释了,不再赘言了。

 

直白调用threading模块 创立线程

Python中成立线程能够应用threading模块

  • threading.Thread(target=func,args = params,) 创设线程
    target钦定实践的函数 target钦定参数元组方式

'''
python thread
'''
import threading

import time

beggin = time.time()


def foo(n):
    print('foo%s' % n)
    time.sleep(1)


def bar(n):
    print('bar %s' % n)


end = time.time()
cast_time = end - beggin
print(float(cast_time))
# 创建线程
t1 = threading.Thread(target=foo, args=('thread1',))
t2 = threading.Thread(target=bar, args=('thread2',))
t1.start()
t2.start()

Python GIL(Global Interpreter Lock) 

CPython implementation detail: In CPython, due to the Global Interpreter
Lock, only one thread can execute Python code at once (even though
certain performance-oriented libraries might overcome this limitation).
If you want your application to make better use of the computational
resources of multi-core machines, you are advised to use
multiprocessing. However, threading is still an appropriate model if you
want to run multiple I/O-bound tasks simultaneously.

3、Semaphore(信号量)

Semaphore管理二个放置的计数器,
每当调用acquire()时内置计数器-1;
调用release() 时内置计数器+1;
计数器不可能小于0;当计数器为0时,acquire()将阻塞线程直到其余线程调用release()。

实例:(同时只有六个线程能够取得semaphore,即能够界定最加纳Ake拉接数为五):

 1 import threading
 2 import time
 3 
 4 semaphore = threading.Semaphore(5)
 5 
 6 def func():
 7     if semaphore.acquire():
 8         print (threading.currentThread().getName() + ' get semaphore')
 9         time.sleep(2)
10         semaphore.release()
11 
12 for i in range(20):
13   t1 = threading.Thread(target=func)
14   t1.start()

 

11.event事件

 类似于condition,但它并不是1个线程锁,并且没有锁的成效

event = threading.伊芙nt(),条件环境目的,初阶值为False

 

event.isSet():重临event的图景值

event.wait():如果event.isSet()的值为False将阻塞

event.set():设置event的情事值为True,全体阻塞池的线程激活并进入就绪状态,等待操作系统调度

event.clear():苏醒event的情景值False

 

不多说,看二个事例:

#!usr/bin/env python
#-*- coding:utf-8 -*-

# author:yangva

import threading,time

class boss(threading.Thread):
    def run(self):
        print('boss:今晚加班!')
        event.isSet() or event.set() #设置为True
        time.sleep(5)   #切换到员工线程
        print('boss:可以下班了')
        event.isSet() or event.set() #又设置为True


class worker(threading.Thread):
    def run(self):
        event.wait() #等待老板发话,只有值为True再往下走
        print('worker:唉~~~,又加班')
        time.sleep(1) #开始加班
        event.clear() #设置标志为false
        event.wait()  #等老板发话
        print('worker:oh yeah,终于可以回家了')


event = threading.Event()
threads = []
for i in range(5):
    threads.append(worker())
threads.append(boss())

for i in threads:
    i.start()

for i in threads:
    i.join()

  

 

运作结果:

威尼斯人线上娱乐 9

 

实质上那些和condition的通讯原理是同一的,只是condition用的是notify,event用的set和isset

透过接二连三threading模块调用线程

import threading
import time


class MyThread(threading.Thread):
    def __init__(self,num):
        threading.Thread.__init__(self)
        self.num = num

    def run(self):#定义每个线程要运行的函数

        print("running on number:%s" %self.num)

        time.sleep(3)

if __name__ == '__main__':

    t1 = MyThread(1)
    t2 = MyThread(2)
    t1.start()
    t2.start()
  • 开创类承接threading.Thread
  • 重写类的run方法

threading模块

4、Event对象

  线程的1个首要性格是各类线程都以独自运营且情况不行预测。假若程序中的别的线程要求通过判定某些线程的事态来分明自个儿下一步的操作,那时线程同步难题就
会变得相当困难。

  为了缓解那个难题,大家供给选取threading库中的伊夫nt对象。伊芙nt对象涵盖一个可由线程设置的连续信号标识,它同意线程等待有些事件的发出。

  在
初步景况下,伊芙nt对象中的功率信号标识棉被服装置为假。如果无线程等待3个伊夫nt对象,
而那个伊芙nt对象的表明为假,那么那几个线程将会被直接不通直至该标记为真。2个线程要是将2个伊夫nt对象的时域信号标识设置为真,它将唤起全体等待那些伊夫nt对象的线程。假若3个线程等待三个业已被设置为真正伊芙nt对象,那么它将忽略这几个事件,
继续实行

event.isSet():返回event的状态值;

event.wait():如果 event.isSet()==False将阻塞线程;

event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;

event.clear():恢复event的状态值为False。

 

用evnt对象模拟红绿灯:

import queue,threading,time
import random

event = threading.Event()
def light():
    while True:
        event.set()
        for i in range(10):
            print('light green')
            time.sleep(1)
        event.clear()
        for i in range(10,13):
            print('light yellow')
            time.sleep(1)
        for i in range(13,21):
            print('light red')
            time.sleep(1)

def car(i):
    while True:
        time.sleep(random.randint(1,5))
        if event.isSet():
            print('car %s is runing'%i)
        else:
            print('car %s is waiting'%i)

if __name__ == '__main__':
    l1=threading.Thread(target=light)
    l1.start()

    for i in range(5):
        i = threading.Thread(target=car,args=(i,))
        i.start()

 

Python 拾2线程中的GIL

Python的GIL并不是Python的特点,它是在得以落成Python解析器也正是依据C语言的解析器
CPython时所引进的三个概念。Python能够用分裂的编译器来编写翻译成可进行代码。例如C语言中的GCC等。也正是说只有在CPython中才会油不过生GIL的状态
GIL又称作全局解释器锁(Global Interpreter Lock)
当代的CPU已经是多核CPU,为了更实惠的使用多核处理器的品质,就应运而生了四线程的编制程序格局。而在缓解十二线程之间数据完整性和意况同步的最简便易行的章程正是加锁。GIL正是给Python解释器加了壹把大锁。我们了解Python是由解释器实施的,由于GIL的留存
只好有三个线程被解释器施行,那样就使得Python在多线程施行上的功用变低。由于历史遗留难点,发现多量库代码开辟者现已重度信赖GIL而老大不便去除了。也正是说在多核CPU上,并行施行的Python三十二线程,甚至不比串行执行的Python程序,这就是GIL存在的主题材料

壹 线程的贰种调用情势

直白调用

实例1:

威尼斯人线上娱乐 10

威尼斯人线上娱乐 11

import threading
import time

def sayhi(num): #定义每个线程要运行的函数

    print("running on number:%s" %num)

    time.sleep(3)

if __name__ == '__main__':

    t1 = threading.Thread(target=sayhi,args=(1,)) #生成一个线程实例
    t2 = threading.Thread(target=sayhi,args=(2,)) #生成另一个线程实例

    t1.start() #启动线程
    t2.start() #启动另一个线程

    print(t1.getName()) #获取线程名
    print(t2.getName())

威尼斯人线上娱乐 12

承接式调用:

威尼斯人线上娱乐 13

威尼斯人线上娱乐 14

import threading
import time


class MyThread(threading.Thread):
    def __init__(self,num):
        threading.Thread.__init__(self)
        self.num = num

    def run(self):#定义每个线程要运行的函数

        print("running on number:%s" %self.num)

        time.sleep(3)

if __name__ == '__main__':

    t1 = MyThread(1)
    t2 = MyThread(2)
    t1.start()
    t2.start()

威尼斯人线上娱乐 15

4、队列(queue)

”’

创制2个“队列”对象

import queueq
q = queue.Queue(maxsize = 10)
    #queue.Queue类便是1个系列的壹块达成。队列长度可为Infiniti或然轻易。可由此Queue的构造函数的可选参数
    #maxsize来设定队列长度。假诺maxsize小于壹就意味着队列长度Infiniti。

q.put()    将二个值2二放入队列中

    #调用队列对象的put()方法在队尾插入多个品种。put()有几个参数,第1个item为须求的,为插入项目的值;第二个block为可选参数,暗中同意为一。假诺队列当前为空且block为1,put()方法就使调用线程暂停,直到空出三个数额单元。就算block为0,put方法将迷惑Full异常。

q.get()    将三个值从队列中收取    

    #调用队列对象的get()方法从队头删除并再次回到二个档次。可选参数为block,默认为True。假使队列为空且block为True,get()就使调用线程暂停,直至有项目可用。假诺队列为空且block为False,队列将引发Empty非常。

”’

queue的常用方法

'''

此包中的常用方法(q = Queue.Queue()):

q.qsize() 返回队列的大小
q.empty() 如果队列为空,返回True,反之False
q.full() 如果队列满了,返回True,反之False
q.full 与 maxsize 大小对应
q.get([block[, timeout]]) 获取队列,timeout等待时间
q.get_nowait() 相当q.get(False)非阻塞 
q.put(item) 写入队列,timeout等待时间
q.put_nowait(item) 相当q.put(item, False)
q.task_done() 在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号
q.join() 实际上意味着等到队列为空,再执行别的操作

'''

*12.队列(queue)

实质上,队列是二个数据结构。

 

一)创制二个“队列”对象
import Queue
q = Queue.Queue(maxsize = 10)
Queue.Queue类正是两个队列的壹道达成。队列长度可为Infiniti只怕个别。可经过Queue的构造函数的可选参数maxsize来设定队列长度。假使maxsize小于1就意味着队列长度Infiniti。

二)将二个值放入队列中
q.put(obj)
调用队列对象的put()方法在队尾插入贰个品类。put()有五个参数,第4个item为须要的,为插入项目标值;第二个block为可选参数,默感到
一。假设队列当前为空且block为一,put()方法就使调用线程暂停,直到空出1个数据单元。假若block为0,put方法将掀起Full至极。

三)将一个值从队列中抽取
q.get()
调用队列对象的get()方法从队头删除并重返3个品类。可选参数为block,默感觉True。若是队列为空且block为True,get()就使调用线程暂停,直至有档次可用。借使队列为空且block为False,队列将引发Empty非凡。

 

例:

威尼斯人线上娱乐 16

 

 

四)Python Queue模块有三种队列及构造函数:

  • Python Queue模块的FIFO队列先进先出    class queue.Queue(maxsize)
  • LIFO类似于堆,即先进后出        class
    queue.LifoQueue(maxsize)
  • 还有一种是事先级队列等第越低越先出来  class
    queue.PriorityQueue(maxsize)

 

当maxsize值比put的数码少时就会阻塞住,当数码被get后留有空间手艺跟着put进去,接近于线程的非时限信号量

威尼斯人线上娱乐 17

 

 

伍)queue中的常用方法(q = Queue.Queue()):
q.qsize():重回队列的大大小小
q.empty():借使队列为空,再次来到True,反之False
q.full():假若队列满了,再次回到True,反之False,q.full与 maxsize 大小对应
q.get([block[, timeout]]) 获取队列,timeout等待时间
q.get_nowait():相当q.get(False)
q.put_nowait(item):相当q.put(item, False)
q.task_done():在成就一项事业之后,q.task_done()
函数向职分已经成功的行列发送一个实信号
q.join():实际上意味着等到队列为空,再试行别的操作

 

6)队列有怎么着便宜,与列表差距

队列本人就有1把锁,内部已经维持壹把锁,借使你用列表的话,当条件是在二十多线程下,那么列表数据就必将会有冲突,而队列不会,因为此,队列有个诨名——多线程利器

例:

#!usr/bin/env python
#-*- coding:utf-8 -*-

# author:yangva

import threading,time
import queue
from random import randint

class productor(threading.Thread):
    def run(self):
        while True:
            r = randint(0,100)
            q.put(r)
            print('生产出来 %s 号产品'%r)
            time.sleep(1)

class consumer(threading.Thread):
    def run(self):
        while True:
            result =q.get()
            print('用掉 %s 号产品'%result)
            time.sleep(1)

q = queue.Queue(10)
threads = []
for i in range(3):
    threads.append(productor())

threads.append(consumer())

for i in threads:
    i.start()

  

运作结果:

威尼斯人线上娱乐 18

 

那边素有毫无加锁就成功了前方的劳动者消费者模型,因为queue里面自带了一把锁。

 

好的,关于线程的知识点,批注完。

 

Python GIL的面世情形

在Python中要是任务是IO密集型的,能够行使八线程。而且Python的八线程卓殊擅长处理那种主题材料
而即使Python中假使职务是计量密集型的,就供给处理一下GIL

二 Join & Daemon

威尼斯人线上娱乐 19

威尼斯人线上娱乐 20

import threading
from time import ctime,sleep
import time

def music(func):
    for i in range(2):
        print ("Begin listening to %s. %s" %(func,ctime()))
        sleep(4)
        print("end listening %s"%ctime())

def move(func):
    for i in range(2):
        print ("Begin watching at the %s! %s" %(func,ctime()))
        sleep(5)
        print('end watching %s'%ctime())

threads = []
t1 = threading.Thread(target=music,args=('七里香',))
threads.append(t1)
t2 = threading.Thread(target=move,args=('阿甘正传',))
threads.append(t2)

if __name__ == '__main__':

    for t in threads:
        # t.setDaemon(True)
        t.start()
        # t.join()
    # t1.join()
    t2.join()########考虑这三种join位置下的结果?
    print ("all over %s" %ctime())

威尼斯人线上娱乐 21

setDaemon(True):

      将线程注明为护理线程,必须在start() 方法调用在此以前设置,
若是不设置为守护线程程序会被Infiniti挂起。这些主意基本和join是倒转的。当大家在程序运转中,实施叁个主线程,假诺主线程再次创下办三个子线程,主线程和子线程
就分兵两路,分别运营,那么当主线程完结想退出时,会查验子线程是还是不是成功。假诺子线程未形成,则主线程会等待子线程落成后再脱离。然则有时我们需求的是
只要主线程落成了,不管敬仲线程是还是不是做到,都要和主线程一同退出,这时就足以
用setDaemon方法啦 

join():

       在子线程落成运转在此以前,这几个子线程的父线程将直接被打断。

其余措施

威尼斯人线上娱乐 22

威尼斯人线上娱乐 23

thread 模块提供的其他方法:
# threading.currentThread(): 返回当前的线程变量。
# threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
# threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
# 除了使用方法外,线程模块同样提供了Thread类来处理线程,Thread类提供了以下方法:
# run(): 用以表示线程活动的方法。
# start():启动线程活动。
# join([time]): 等待至线程中止。这阻塞调用线程直至线程的join() 方法被调用中止-正常退出或者抛出未处理的异常-或者是可选的超时发生。
# isAlive(): 返回线程是否活动的。
# getName(): 返回线程名。
# setName(): 设置线程名。

威尼斯人线上娱乐 24

join与task_done方法

'''
join() 阻塞进程,直到所有任务完成,需要配合另一个方法task_done。

    def join(self):
     with self.all_tasks_done:
      while self.unfinished_tasks:
       self.all_tasks_done.wait()

task_done() 表示某个任务完成。每一条get语句后需要一条task_done。


import queue
q = queue.Queue(5)
q.put(10)
q.put(20)
print(q.get())
q.task_done()
print(q.get())
q.task_done()

q.join()

print("ending!")
'''

十二线程式爬虫

有些朋友学完线程还不知情线程到底能利用于怎么样生活其实,好的,不多说,来,大家爬下堆糖网()的校花照片。

 

import requests
import urllib.parse
import threading,time,os

#设置照片存放路径
os.mkdir('duitangpic')
base_path = os.path.join(os.path.dirname(__file__),'duitangpic')

#设置最大信号量线程锁
thread_lock=threading.BoundedSemaphore(value=10)

#通过url获取数据
def get_page(url):
    header={'User-Agent':'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 Safari/537.36'}
    page=requests.get(url,headers=header)
    page=page.content #content是byte
    #转为字符串
    page=page.decode('utf-8')
    return page

#label  即是搜索关键词
def page_from_duitang(label):
    pages=[]
    url='https://www.duitang.com/napi/blog/list/by_search/?kw={}&start={}&limit=1000'
    label=urllib.parse.quote(label)#将中文转成url(ASCII)编码
    for index in range(0,3600,100):
        u=url.format(label,index)
        #print(u)
        page=get_page(u)
        pages.append(page)
    return pages

def findall_in_page(page,startpart,endpart):
    all_strings=[]
    end=0
    while page.find(startpart,end) !=-1:
        start=page.find(startpart,end)+len(startpart)
        end=page.find(endpart,start)
        string=page[start:end]
        all_strings.append(string)

    return all_strings

def pic_urls_from_pages(pages):
    pic_urls=[]
    for page in pages:
        urls=findall_in_page(page,'path":"','"')
        #print('urls',urls)
        pic_urls.extend(urls)
    return pic_urls

def download_pics(url,n):
    header={'User-Agent':'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 Safari/537.36'}
    r=requests.get(url,headers=header)
    path=base_path+'/'+str(n)+'.jpg'
    with open(path,'wb') as f:
        f.write(r.content)
    #下载完,解锁
    thread_lock.release()

def main(label):
    pages=page_from_duitang(label)
    pic_urls=pic_urls_from_pages(pages)
    n=0
    for url in pic_urls:
        n+=1
        print('正在下载第{}张图片'.format(n))
        #上锁
        thread_lock.acquire()
        t=threading.Thread(target=download_pics,args=(url,n))
        t.start()
main('校花')

  

运行结果:

威尼斯人线上娱乐 25

 

在与本py文件1律的目录下,有个duitangpic的文书夹,打开看看:

威尼斯人线上娱乐 26

 

 全是常娥,而且不出意外又好几千张呢,小编那只有1000多张是因为自个儿手动停止了py程序运营,终究本身那是出现说法,不须求真正等程序运维完。笔者大约猜度,不出意外应该能爬到三千张左右的照片

 

怎么样,老铁,得劲不?刺不激情?感受到十二线程的用处了不?而且那要么python下的伪三十二线程(IO密集型,但并不到底真正意义上的多线程),你用别样的言语来爬越来越精神。

 

join 和daemon

join

  • 在子线程落成运转在此以前,这一个子线程的父线程将直接被打断。在三个顺序中大家实行二个主线程,这一个主线程再次创下办1个子线程,主线程和子线程就竞相实施,当子线程在主线程中调用join方法时,主线程会等待子线程实行完后再截至

'''in main thread'''
t.join() 主线程会等待线程t执行完成后再继续执行

daemon

  • setDaemon(true)
    将线程评释为护理线程,必须在start() 方法调用此前安装,
    假使不设置为守护线程程序会被Infiniti挂起。那么些主意基本和join是倒转的。当我们在程序运维中,实施三个主线程,假诺主线程再创办三个子线程,主线程和子线程
    就分兵两路,分别运维,那么当主线程实现想退出时,会核实子线程是还是不是达成。倘若子线程未形成,则主线程会等待子线程实现后再脱离。不过有时大家要求的是
    只要主线程实现了,不管仲线程是或不是产生,都要和主线程一齐退出,那时就足以
    用setDaemon方法啦
  • currentThread() 获取当前推行的线程

三 同步锁(Lock)

威尼斯人线上娱乐 27

import time
import threading

def addNum():
    global num #在每个线程中都获取这个全局变量
    # num-=1

    temp=num
    print('--get num:',num )
    #time.sleep(0.1)
    num =temp-1 #对此公共变量进行-1操作


num = 100  #设定一个共享变量
thread_list = []
for i in range(100):
    t = threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)

for t in thread_list: #等待所有线程执行完毕
    t.join()

print('final num:', num )

威尼斯人线上娱乐 28

 

 

威尼斯人线上娱乐 29

 

注意:

①:  why num-=壹没难题吧?那是因为动作太快(达成那个动作在切换的时光内)

二: if
sleep(一),现象会更显然,9拾伍个线程每3个毫无疑问都并未有实行完就开始展览了切换,大家说过sleep就等效于IO阻塞,一s以内不会再切换回来,所以最后的结果料定是9玖.

 

多个线程都在同时操作同三个共享能源,所以导致了能源破坏,如何做吧?

有同学会想用join呗,但join会把全路径程给停住,变成了串行,失去了多线程的意思,而小编辈只需求把总结(涉及到操作公共数据)的时候串行试行。

我们得以透过共同锁来化解这种主题材料

威尼斯人线上娱乐 30

import time
import threading

def addNum():
    global num #在每个线程中都获取这个全局变量
    # num-=1
    lock.acquire()
    temp=num
    print('--get num:',num )
    #time.sleep(0.1)
    num =temp-1 #对此公共变量进行-1操作
    lock.release()

num = 100  #设定一个共享变量
thread_list = []
lock=threading.Lock()

for i in range(100):
    t = threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)

for t in thread_list: #等待所有线程执行完毕
    t.join()

print('final num:', num )

威尼斯人线上娱乐 31

难题解决,但

借问:同步锁与GIL的涉嫌?

Python的线程在GIL的操纵之下,线程之间,对任何python解释器,对python提供的C
API的拜访都以排斥的,那足以视作是Python内核级的排外机制。不过这种互斥是我们无法调节的,大家还必要其余1种可控的排斥机制———用户级互斥。内核级通过互斥珍视了基本的共享财富,同样,用户级互斥保护了用户程序中的共享能源。

GIL
的功能是:对于一个解释器,只好有四个thread在实施bytecode。所以每时每刻只有一条bytecode在被实践3个thread。GIL保险了bytecode
这层面上是thread safe的。
然则借使您有个操作比如 x +=
一,那一个操作须求多少个bytecodes操作,在实行那些操作的多条bytecodes时期的时候恐怕中途就换thread了,这样就出现了data
races的气象了。

 

那本身的二头锁也是保险平等时刻唯有3个线程被实行,是否向来不GIL也足以?是的;那要GIL有啥样鸟用?你没治;

queue的二种情势:

1、queue.Queue()  先进先出方式

二、queue.LifoQueue()    先进后出,类似栈

三、queue.PriorityQueue()  
优先级方式,优先级越高越先出,数字月尾代表优先级越高

import queue

#######################先进后出
q=queue.LifoQueue()

q.put(34)
q.put(56)
q.put(12)

#####################优先级
q=queue.PriorityQueue()
q.put([5,100])
q.put([7,200])
q.put([3,"hello"])
q.put([4,{"name":"alex"}])

while 1:
  data=q.get()
  print(data)

线程中的锁

先看二个线程共享数据的难点

'''
线程安全问题
'''
# 定义一个共享变量
import threading

import time

num = 100


def sub():
    # 操作类变量
    global num
    tmp = num
    time.sleep(0.1)
    num = tmp - 1


if __name__ == '__main__':
    thread_list = []
    for i in range(100):
        t1 = threading.Thread(target=sub)
        t1.start()
        thread_list.append(t1)
    for i in range(100):
        t2 = thread_list[i]
        t2.join()

print('final num' + str(num))
>>> 
final num99

4 线程死锁和递归锁

     
在线程间共享多少个能源的时候,固然三个线程分别攻陷一部分财富并且同时等待对方的财富,就会招致死锁,因为系统推断那有些财富都正在使用,全体那多少个线程在无外力成效下将一贯等待下去。上边是三个死锁的例证:

威尼斯人线上娱乐 32

威尼斯人线上娱乐 33

import threading,time

class myThread(threading.Thread):
    def doA(self):
        lockA.acquire()
        print(self.name,"gotlockA",time.ctime())
        time.sleep(3)
        lockB.acquire()
        print(self.name,"gotlockB",time.ctime())
        lockB.release()
        lockA.release()

    def doB(self):
        lockB.acquire()
        print(self.name,"gotlockB",time.ctime())
        time.sleep(2)
        lockA.acquire()
        print(self.name,"gotlockA",time.ctime())
        lockA.release()
        lockB.release()
    def run(self):
        self.doA()
        self.doB()
if __name__=="__main__":

    lockA=threading.Lock()
    lockB=threading.Lock()
    threads=[]
    for i in range(5):
        threads.append(myThread())
    for t in threads:
        t.start()
    for t in threads:
        t.join()#等待线程结束,后面再讲。

威尼斯人线上娱乐 34

化解办法:使用递归锁,将

1
2
lockA=threading.Lock()
lockB=threading.Lock()<br>#--------------<br>lock=threading.RLock()

为了帮助在同一线程中一再呼吁同1能源,python提供了“可重入锁”:threading.PRADOLock。奥迪Q5Lock内部维护着一个Lock和1个counter变量,counter记录了acquire的次数,从而使得能源得以被多次acquire。直到1个线程全体的acquire都被release,别的的线程才具得到能源。

应用

威尼斯人线上娱乐 35

威尼斯人线上娱乐 36

import time

import threading

class Account:
    def __init__(self, _id, balance):
        self.id = _id
        self.balance = balance
        self.lock = threading.RLock()

    def withdraw(self, amount):

        with self.lock:
            self.balance -= amount

    def deposit(self, amount):
        with self.lock:
            self.balance += amount


    def drawcash(self, amount):#lock.acquire中嵌套lock.acquire的场景

        with self.lock:
            interest=0.05
            count=amount+amount*interest

            self.withdraw(count)


def transfer(_from, to, amount):

    #锁不可以加在这里 因为其他的其它线程执行的其它方法在不加锁的情况下数据同样是不安全的
     _from.withdraw(amount)

     to.deposit(amount)



alex = Account('alex',1000)
yuan = Account('yuan',1000)

t1=threading.Thread(target = transfer, args = (alex,yuan, 100))
t1.start()

t2=threading.Thread(target = transfer, args = (yuan,alex, 200))
t2.start()

t1.join()
t2.join()

print('>>>',alex.balance)
print('>>>',yuan.balance)

威尼斯人线上娱乐 37

队列的采用:生产者消费者模型

  在线程世界里,生产者正是生产数据的线程,消费者正是开销数量的线程。在多线程开垦在那之中,假如劳动者处理速度非常的慢,而顾客处理速度异常的慢,那么生产者就务须等待买主处理完,技术两次三番生产数量。同样的道理,如若消费者的拍卖技巧超过生产者,那么消费者就必须待产者。为了缓解那么些难题于是引进了劳动者和顾客方式。

  生产者消费者格局是经过七个器皿来化解劳动者和消费者的强耦合难点。生产者和消费者相互之间不直接通信,而因而阻塞队列来进展杂志发表,所以生产者生产完数据之后不要等待买主处理,直接扔给卡住队列,消费者不找生产者要多少,而是直接从绿灯队列里取,堵塞队列就也等于3个缓冲区,平衡了劳动者和顾客的拍卖才能。

  那就好像,在餐厅,大厨做好菜,不供给一直和客户交换,而是交由前台,而客户去饭菜也不须求不找大厨,直接去前台领取就能够,这也是八个结耦的进程。

import queue,threading,time
import random

q = queue.Queue(50)

def Producer():
    while True:
        if q.qsize() < 20:
            n = random.randint(1, 100)
            q.put(n)
            print(" has made baozi %s" % n)
            time.sleep(1)

def Consumer(id):
    while True:
         s = q.get()
         print("Consumer"+id+"has eat %s" % s)
         time.sleep(2)

for i in range(5):
    t1=threading.Thread(target=Producer,args=())
    t1.start()

for i in range(2):
    t=threading.Thread(target=Consumer,args=(str(i),))
    t.start()

  

  

 

分析

地点的先后中,大家想要的是打开九十几个线程,种种线程将共享数据减去一,不过大家发现
输出的结果是99,那种情景是因为二1010二线程在cpu中实践时是抢占式的,程序在初步施行时,开启了一百个线程去实践,当程序推行到time.sleep(0.1)时,由于爆发了线程的堵塞,所以cpu进行了切换,此时,程序的共享变量num是拾0,中间变量tmp也是100
在线程阻塞过后,将共享变量num的值减壹,值变为99此时别的的线程获得cpu的进行机会,而当前线程中的共享变量num的值依然十0所以实行减1操作后,又将中间值赋值给共享变量num所以num的值向来为9九

  • 线程的执市场价格况
![](https://upload-images.jianshu.io/upload_images/6052465-461749d8c9eb7ea5.png)

多线程抢占.png

伍 条件变量同步(Condition)

     
有一类线程须要满意条件之后才具够继续试行,Python提供了threading.Condition
对象用于规范变量线程的支撑,它除了能提供HavalLock()或Lock()的措施外,还提供了
wait()、notify()、notifyAll()方法。

      lock_con=threading.Condition([Lock/Rlock]):
锁是可选选项,不传人锁,对象活动创造3个帕杰罗Lock()。

wait():条件不满足时调用,线程会释放锁并进入等待阻塞;
notify():条件创造后调用,通知等待池激活一个线程;
notifyAll():条件创造后调用,通知等待池激活所有线程。

实例

威尼斯人线上娱乐 38

威尼斯人线上娱乐 39

import threading,time
from random import randint
class Producer(threading.Thread):
    def run(self):
        global L
        while True:
            val=randint(0,100)
            print('生产者',self.name,":Append"+str(val),L)
            if lock_con.acquire():
                L.append(val)
                lock_con.notify()
                lock_con.release()
            time.sleep(3)
class Consumer(threading.Thread):
    def run(self):
        global L
        while True:
                lock_con.acquire()
                if len(L)==0:
                    lock_con.wait()
                print('消费者',self.name,":Delete"+str(L[0]),L)
                del L[0]
                lock_con.release()
                time.sleep(0.25)

if __name__=="__main__":

    L=[]
    lock_con=threading.Condition()
    threads=[]
    for i in range(5):
        threads.append(Producer())
    threads.append(Consumer())
    for t in threads:
        t.start()
    for t in threads:
        t.join()

威尼斯人线上娱乐 40

Python 同步锁

操作锁的章程在threading 模块中 Lock()

  • threading.Lock() 会获得1把锁
  • Python 中使用acquire() 获得锁

r = threading.Lock()
# 加锁
r.acquire()
  • Python中使用release()释放锁

r.release()

加锁后代码

'''
线程安全问题
'''
# 定义一个共享变量
import threading
import time
num = 100
r = threading.Lock()
def sub():
    # 操作类变量
    global num
    r.acquire()
    tmp = num
    time.sleep(0.1)
    num = tmp - 1
    r.release()
if __name__ == '__main__':
    thread_list = []
    for i in range(100):
        t1 = threading.Thread(target=sub)
        t1.start()
        thread_list.append(t1)
    for i in range(100):
        t2 = thread_list[i]
        t2.join()
print('final num' + str(num))

6 同步条件(伊夫nt)

     
条件同步和规格变量同步差不离意思,只是少了锁功效,因为条件同步设计于不访问共享能源的规范环境。event=threading.伊夫nt():条件环境目标,开首值
为False;

威尼斯人线上娱乐 41

event.isSet():返回event的状态值;

event.wait():如果 event.isSet()==False将阻塞线程;

event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;

event.clear():恢复event的状态值为False。

威尼斯人线上娱乐 42

实例1:

威尼斯人线上娱乐 43

威尼斯人线上娱乐 44

import threading,time
class Boss(threading.Thread):
    def run(self):
        print("BOSS:今晚大家都要加班到22:00。")
        event.isSet() or event.set()
        time.sleep(5)
        print("BOSS:<22:00>可以下班了。")
        event.isSet() or event.set()
class Worker(threading.Thread):
    def run(self):
        event.wait()
        print("Worker:哎……命苦啊!")
        time.sleep(0.25)
        event.clear()
        event.wait()
        print("Worker:OhYeah!")
if __name__=="__main__":
    event=threading.Event()
    threads=[]
    for i in range(5):
        threads.append(Worker())
    threads.append(Boss())
    for t in threads:
        t.start()
    for t in threads:
        t.join()

威尼斯人线上娱乐 45

实例2:

威尼斯人线上娱乐 46

威尼斯人线上娱乐 47

import threading,time
import random
def light():
    if not event.isSet():
        event.set() #wait就不阻塞 #绿灯状态
    count = 0
    while True:
        if count < 10:
            print('\033[42;1m--green light on---\033[0m')
        elif count <13:
            print('\033[43;1m--yellow light on---\033[0m')
        elif count <20:
            if event.isSet():
                event.clear()
            print('\033[41;1m--red light on---\033[0m')
        else:
            count = 0
            event.set() #打开绿灯
        time.sleep(1)
        count +=1
def car(n):
    while 1:
        time.sleep(random.randrange(10))
        if  event.isSet(): #绿灯
            print("car [%s] is running.." % n)
        else:
            print("car [%s] is waiting for the red light.." %n)
if __name__ == '__main__':
    event = threading.Event()
    Light = threading.Thread(target=light)
    Light.start()
    for i in range(3):
        t = threading.Thread(target=car,args=(i,))
        t.start()

威尼斯人线上娱乐 48

线程中的死锁和递归锁

在线程间共享多个能源的时候,假设多少个线程分别攻克一部分资源并且同时等待对方释放对方的财富,就会促成死锁,因为系统判定这一部分能源正在使用,所以那三个线程在无外力成效下将向来守候下去
看个栗子:

'''
线程死锁
'''

import threading, time


class myThread(threading.Thread):
    def doA(self):
        lockA.acquire()
        print(self.name, "gotlockA", time.ctime())
        time.sleep(3)
        lockB.acquire()
        print(self.name, "gotlockB", time.ctime())
        lockB.release()
        lockA.release()

    def doB(self):
        lockB.acquire()
        print(self.name, "gotlockB", time.ctime())
        time.sleep(2)
        lockA.acquire()
        print(self.name, "gotlockA", time.ctime())
        lockA.release()
        lockB.release()

    def run(self):
        self.doA()
        self.doB()


if __name__ == "__main__":

    lockA = threading.Lock()
    lockB = threading.Lock()

    threads = []
    for i in range(5):
        threads.append(myThread())
    for t in threads:
        t.start()
    for t in threads:
        t.join()  # 等待线程结束,后面再讲。

在上述程序中,七个线程互周旋有对方的锁并且等待对方释放,那就造成了死锁

七 信号量(Semaphore)

     
时限信号量用来决定线程并发数的,BoundedSemaphore或Semaphore管理2个停放的计数
器,每当调用acquire()时-一,调用release()时+一。

      计数器不可能小于0,当计数器为
0时,acquire()将封堵线程至2头锁定状态,直到其余线程调用release()。(类似于停车位的概念)

     
BoundedSemaphore与Semaphore的唯1分裂在于前者将要调用release()时检查计数
器的值是还是不是超越了计数器的始发值,假如超越了将抛出3个老大。

实例:

威尼斯人线上娱乐 49

威尼斯人线上娱乐 50

import threading,time
class myThread(threading.Thread):
    def run(self):
        if semaphore.acquire():
            print(self.name)
            time.sleep(5)
            semaphore.release()
if __name__=="__main__":
    semaphore=threading.Semaphore(5)
    thrs=[]
    for i in range(100):
        thrs.append(myThread())
    for t in thrs:
        t.start()

威尼斯人线上娱乐 51

消除死锁的艺术

  • threading.中华VLock() 可重入锁
    为了协理在同1线程中数次呼吁同一能源,python提供了“可重入锁”:threading.陆风X8Lock。PRADOLock内部维护着一个Lock和三个counter变量,counter记录了acquire的次数,从而使得能源得以被数十四遍acquire。直到二个线程全数的acquire都被release,其余的线程本事赢得能源。可重入锁的个中维持了三个计数器和锁对象。

 八 10二线程利器(queue)

     queue is especially useful in threaded programming when information
must be exchanged safely between multiple threads.

信号量

功率信号量用来支配线程并发数的,Bounded塞马phore或Semaphore管理贰个平放的计数器,每当调用acquire()时-一,调用release()时+1
计数器无法小于0当计数器为0时,acquire()将卡住线程至2头锁定状态,直到别的线程调用release()。
BoundedSemaphore与Semaphore的绝无仅有差异在于前者将要调用release()时检查计数器的值是还是不是超越了计数器的开首值。若是超过了将抛出三个12分

queue列队类的方法

威尼斯人线上娱乐 52

始建二个“队列”对象
import Queue
q = Queue.Queue(maxsize = 10)
Queue.Queue类就是贰个连串的一路落成。队列长度可为Infiniti也许简单。可经过Queue的构造函数的可选参数maxsize来设定队列长度。如果maxsize小于1就意味着队列长度无限。

将一个值放入队列中
q.put(10)
调用队列对象的put()方法在队尾插入叁个类型。put()有多个参数,第1个item为需要的,为插入项目标值;第三个block为可选参数,暗中同意为
一。若是队列当前为空且block为1,put()方法就使调用线程暂停,直到空出三个数码单元。假设block为0,put方法将抓住Full分外。

将1个值从队列中抽取
q.get()
调用队列对象的get()方法从队头删除并回到2个品种。可选参数为block,默以为True。假设队列为空且block为True,get()就使调用线程暂停,直至有档次可用。假如队列为空且block为False,队列将引发Empty十分。

Python Queue模块有二种队列及构造函数:
1、Python Queue模块的FIFO队列先进先出。  class queue.Queue(maxsize)
二、LIFO类似于堆,即先进后出。             class
queue.LifoQueue(maxsize)
三、还有一种是先期级队列品级越低越先出来。   class
queue.PriorityQueue(maxsize)

此包中的常用方法(q = Queue.Queue()):
q.qsize() 再次来到队列的大小
q.empty() 假若队列为空,重回True,反之False
q.full() 要是队列满了,重临True,反之False
q.full 与 maxsize 大小对应
q.get([block[, timeout]]) 获取队列,timeout等待时间
q.get_nowait() 相当q.get(False)
非阻塞 q.put(item) 写入队列,timeout等待时间
q.put_nowait(item) 相当q.put(item, False)
q.task_done() 在做到1项工作之后,q.task_done()
函数向任务现已达成的队列发送三个非非确定性信号
q.join() 实际上意味着等到队列为空,再执行别的操作

威尼斯人线上娱乐 53

制造时域信号量

  • threading.BoundedSemaphore(num) 钦赐期域信号量为num

import threading

import time


class Mythread(threading.Thread):
    def run(self):
        # 判断是否加锁
        if semaphore.acquire():
            print(self.name)
            time.sleep(1)
            # 释放锁
            semaphore.release()


if __name__ == '__main__':
    # 创建带有信号量的锁
    semaphore = threading.BoundedSemaphore(5)
    # 存放线程的序列
    thrs = []
    for i in range(100):
        thrs.append(Mythread())
    for t in thrs:
        t.start()

实例

实例1:

威尼斯人线上娱乐 54

威尼斯人线上娱乐 55

import threading,queue
from time import sleep
from random import randint
class Production(threading.Thread):
    def run(self):
        while True:
            r=randint(0,100)
            q.put(r)
            print("生产出来%s号包子"%r)
            sleep(1)
class Proces(threading.Thread):
    def run(self):
        while True:
            re=q.get()
            print("吃掉%s号包子"%re)
if __name__=="__main__":
    q=queue.Queue(10)
    threads=[Production(),Production(),Production(),Proces()]
    for t in threads:
        t.start()

威尼斯人线上娱乐 56

实例2:

威尼斯人线上娱乐 57

威尼斯人线上娱乐 58

import time,random
import queue,threading
q = queue.Queue()
def Producer(name):
  count = 0
  while count <20:
    time.sleep(random.randrange(3))
    q.put(count)
    print('Producer %s has produced %s baozi..' %(name, count))
    count +=1
def Consumer(name):
  count = 0
  while count <20:
    time.sleep(random.randrange(4))
    if not q.empty():
        data = q.get()
        print(data)
        print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data))
    else:
        print("-----no baozi anymore----")
    count +=1
p1 = threading.Thread(target=Producer, args=('A',))
c1 = threading.Thread(target=Consumer, args=('B',))
p1.start()
c1.start()

威尼斯人线上娱乐 59

实例3:

威尼斯人线上娱乐 60

威尼斯人线上娱乐 61

#实现一个线程不断生成一个随机数到一个队列中(考虑使用Queue这个模块)
# 实现一个线程从上面的队列里面不断的取出奇数
# 实现另外一个线程从上面的队列里面不断取出偶数

import random,threading,time
from queue import Queue
#Producer thread
class Producer(threading.Thread):
  def __init__(self, t_name, queue):
    threading.Thread.__init__(self,name=t_name)
    self.data=queue
  def run(self):
    for i in range(10):  #随机产生10个数字 ,可以修改为任意大小
      randomnum=random.randint(1,99)
      print ("%s: %s is producing %d to the queue!" % (time.ctime(), self.getName(), randomnum))
      self.data.put(randomnum) #将数据依次存入队列
      time.sleep(1)
    print ("%s: %s finished!" %(time.ctime(), self.getName()))

#Consumer thread
class Consumer_even(threading.Thread):
  def __init__(self,t_name,queue):
    threading.Thread.__init__(self,name=t_name)
    self.data=queue
  def run(self):
    while 1:
      try:
        val_even = self.data.get(1,5) #get(self, block=True, timeout=None) ,1就是阻塞等待,5是超时5秒
        if val_even%2==0:
          print ("%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(),self.getName(),val_even))
          time.sleep(2)
        else:
          self.data.put(val_even)
          time.sleep(2)
      except:   #等待输入,超过5秒 就报异常
        print ("%s: %s finished!" %(time.ctime(),self.getName()))
        break
class Consumer_odd(threading.Thread):
  def __init__(self,t_name,queue):
    threading.Thread.__init__(self, name=t_name)
    self.data=queue
  def run(self):
    while 1:
      try:
        val_odd = self.data.get(1,5)
        if val_odd%2!=0:
          print ("%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(), self.getName(), val_odd))
          time.sleep(2)
        else:
          self.data.put(val_odd)
          time.sleep(2)
      except:
        print ("%s: %s finished!" % (time.ctime(), self.getName()))
        break
#Main thread
def main():
  queue = Queue()
  producer = Producer('Pro.', queue)
  consumer_even = Consumer_even('Con_even.', queue)
  consumer_odd = Consumer_odd('Con_odd.',queue)
  producer.start()
  consumer_even.start()
  consumer_odd.start()
  producer.join()
  consumer_even.join()
  consumer_odd.join()
  print ('All threads terminate!')

if __name__ == '__main__':
  main()

威尼斯人线上娱乐 62

小心:列表是线程不安全的

威尼斯人线上娱乐 63

威尼斯人线上娱乐 64

import threading,time

li=[1,2,3,4,5]

def pri():
    while li:
        a=li[-1]
        print(a)
        time.sleep(1)
        try:
            li.remove(a)
        except:
            print('----',a)

t1=threading.Thread(target=pri,args=())
t1.start()
t2=threading.Thread(target=pri,args=())
t2.start()

威尼斯人线上娱乐 65

原则变量同步

有①类线程须求满足条件之后技艺够继续推行,Python提供了threading.Condition
对象用于规范变量线程的支撑,它除了能提供奥迪Q5Lock()或Lock()的办法外,还提供了
wait()、notify()、notifyAll()方法。
条件变量也是线程中的壹把锁,可是规格变量能够完毕线程间的通讯,类似于Java中的唤醒和等候

9 Python中的上下文物管理理器(contextlib模块)

上下文管理器的任务是:代码块推行前准备,代码块实行后处置

创设标准变量锁

  • lock_con = threading.Condition(Lock/科雷傲lock)
    锁是可选选项,不扩散锁对象活动成立三个CR-VLock()
  • wait() 条件不满意时调用,线程会放出锁并进入等待绿灯
  • notify() 条件成立后调用,通告等待池激活1个线程
  • notifyAll() 条件创造后调用,布告等待池激活全体线程
    看个栗子

'''
线程条件变量
'''
import threading
from random import randint

import time


class Producer(threading.Thread):
    def run(self):
        global L
        while True:
            val = randint(0, 100)
            print('生产者', self.name, ':Append' + str(val), L)
            if lock_con.acquire():
                L.append(val)
                lock_con.notify()
                lock_con.release()
            time.sleep(3)


class Consumer(threading.Thread):
    def run(self):
        global L
        while True:
            lock_con.acquire()
            if len(L) == 0:
                lock_con.wait()
            print('消费者',self.name,"Delete"+str(L[0]),L)
            del  L[0]
            lock_con.release()
            time.sleep(0.25)


if __name__ == '__main__':
    L = []
    # 创建条件变量锁
    lock_con = threading.Condition()
    # 线程存放列表
    threads = []
    for i in range(5):
        threads.append(Producer())
    threads.append(Consumer())
    for t in threads:
        t.start()
    for t in threads:
        t.join()

一、怎样使用上下文物管理理器:

何以展开3个文书,并写入”hello world”

1
2
3
4
5
filename="my.txt"
mode="w"
f=open(filename,mode)
f.write("hello world")
f.close()

当发生尤其时(如磁盘写满),就平昔不机会实践第五行。当然,大家能够选用try-finally语句块举行打包:

1
2
3
4
5
writer=open(filename,mode)
try:
    writer.write("hello world")
finally:
    writer.close()

当大家举办复杂的操作时,try-finally语句就会变得丑陋,选用with语木正写:

1
2
with open(filename,mode) as writer:
    writer.write("hello world")

as指代了从open()函数重返的内容,并把它赋给了新值。with落成了try-finally的职分。

一同条件event

条件同步和规格变量同步大概意思,只是少了锁功效,因为条件同步设计于不访问共享财富的尺度环境。event=threading.伊夫nt():条件环境目标,起始值
为False;

  • event.isSet():再次回到event的事态值;

  • event.wait():假诺 event.isSet()==False将卡住线程;

  • event.set():
    设置event的动静值为True,全体阻塞池的线程激活进入就绪状态,
    等待操作系统调度;

  • event.clear():苏醒event的境况值为False。
    举个栗子:

'''
同步条件event
'''
import threading

import time


class Boss(threading.Thread):
    def run(self):
        print('BOSS: 今晚加班')
        # 改变事件
        event.isSet() or event.set()
        time.sleep(5)
        print('BOSS:加班结束')
        event.isSet() or event.set()


class Worker(threading.Thread):
    def run(self):
        event.wait()
        print('WORKER:OH NO')
        time.sleep(0.25)
        # 改变同步事件标志
        event.clear()
        event.wait()
        print('WORKER:OH YEAD!')

if __name__ == '__main__':
    # 获取同步事件
    event = threading.Event()
    threads = []
    for i in range(5):
        threads.append(Worker())
    threads.append(Boss())
    for t in threads:
        t.start()
    for t in threads:
        t.join()

二、自定义上下文物管理理器  

with语句的效能类似于try-finally,提供1种上下文机制。要运用with语句的类,其里面必须提供五个放置函数__enter__和__exit__。前者在重点代码实施前执行,后者在主导代码试行后实行。as前边的变量,是在__enter__函数中回到的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class echo():
    def output(self):
        print "hello world"
    def __enter__(self):
        print "enter"
        return self  #可以返回任何希望返回的东西
    def __exit__(self,exception_type,value,trackback):
        print "exit"
        if exception_type==ValueError:
            return True
        else:
            return Flase
  
>>>with echo as e:
    e.output()
     
输出:
enter
hello world
exit

完备的__exit__函数如下:

1
def __exit__(self,exc_type,exc_value,exc_tb)

其中,exc_type:相当类型;exc_value:异常值;exc_tb:分外追踪新闻

当__exit__回到True时,非凡不传播

线程利器队列 queue

队列是1种数据结构,队列分为先进先出(FIFO) 和 先进后出(FILO)
Python Queue模块有三种队列及构造函数:
壹、Python Queue模块的FIFO队列先进先出。 class queue.Queue(maxsize)
贰、LIFO类似于堆,即先进后出。 class queue.LifoQueue(maxsize)
3、还有一种是预先级队列等第越低越先出来。 class
queue.PriorityQueue(maxsize)
队列能够保障数据安全,是因为队列的里边维护着1把锁。各样去队列中取数据的都会保障数据的安全。而列表即便有着同样的效果,但是列表不是多少安全的

3、contextlib模块  

contextlib模块的法力是提供更易用的上下文物管理理器,它是由此Generator达成的。contextlib中的contextmanager作为装饰器来提供一种针对函数等级的上下文物管理理机制,常用框架如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from contextlib import contextmanager
@contextmanager
def make_context():
    print 'enter'
    try:
        yield "ok"
    except RuntimeError,err:
        print 'error',err
    finally:
        print 'exit'
         
>>>with make_context() as value:
    print value
     
输出为:
    enter
    ok
    exit

其中,yield写入try-finally中是为了保证丰富安全(能处理非凡)as后的变量的值是由yield再次来到。yield后边的语句可看作代码块施行前操作,yield之后的操作能够作为在__威尼斯人线上娱乐 ,exit__函数中的操作。

以线程锁为例:

威尼斯人线上娱乐 66

@contextlib.contextmanager
def loudLock():
    print 'Locking'
    lock.acquire()
    yield
    print 'Releasing'
    lock.release()

with loudLock():
    print 'Lock is locked: %s' % lock.locked()
    print 'Doing something that needs locking'

#Output:
#Locking
#Lock is locked: True
#Doing something that needs locking
#Releasing

威尼斯人线上娱乐 67

创办1个队列

Queue.Queue类就是一个队列的共同达成。队列长度可为Infiniti大概个别。可透过Queue的构造函数的可选参数maxsize来设定队列长度。要是maxsize小于1就表示队列长度Infiniti。

四、contextlib.nested:裁减嵌套

对于:

1
2
3
with open(filename,mode) as reader:
    with open(filename1,mode1) as writer:
        writer.write(reader.read())

可以经过contextlib.nested举办简化:

1
2
with contextlib.nested(open(filename,mode),open(filename1,mode1)) as (reader,writer):
    writer.write(reader.read())

在python 贰.七及事后,被一种新的语法取代:

1
2
with open(filename,mode) as reader,open(filename1,mode1) as writer:
    writer.write(reader.read())

向队列中插入数据

  • q.put(item,block)
    调用队列对象的put()方法在队尾插入3个体系。put()有四个参数,第叁个item为须要的,为插入项目的值;第1个block为可选参数,默感到一。若是队列当前为空且block为1,put()方法就使调用线程暂停,直到空出3个数据单元。若是block为0,put方法将抓住Full十分。

5、contextlib.closing() 

file类直接匡助上下文物管理理器API,但有点代表张开句柄的对象并不扶助,如urllib.urlopen()重临的目的。还有个别遗留类,使用close()方法而不协助上下文物管理理器API。为了保证关闭句柄,必要动用closing()为它成立2个上下文物管理理器(调用类的close方法)。

威尼斯人线上娱乐 68

威尼斯人线上娱乐 69

import contextlib
class myclass():
    def __init__(self):
        print '__init__'
    def close(self):
        print 'close()'

with contextlib.closing(myclass()):
    print 'ok'

输出:
__init__
ok
close()

威尼斯人线上娱乐 70

从队列中抽取数据

  • q.get()
    调用队列对象的get()方法从队头删除并回到1个类型。可选参数为block,默以为True。假若队列为空且block为True,get()就使调用线程暂停,直至有项目可用。如若队列为空且block为False,队列将引发Empty相当。

10 自定义线程池

大约版本:

威尼斯人线上娱乐 71

威尼斯人线上娱乐 72

import queue
import threading
import time

class ThreadPool(object):

    def __init__(self, max_num=20):
        self.queue = queue.Queue(max_num)
        for i in range(max_num):
            self.queue.put(threading.Thread)

    def get_thread(self):
        return self.queue.get()

    def add_thread(self):
        self.queue.put(threading.Thread)


'''
pool = ThreadPool(10)

def func(arg, p):
    print(arg)
    time.sleep(1)
    p.add_thread()


for i in range(30):
    Pool = pool.get_thread()
    t = Pool(target=func, args=(i, pool))
    t.start()
'''

威尼斯人线上娱乐 73

复杂版本:

威尼斯人线上娱乐 74

威尼斯人线上娱乐 75

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import queue
import threading
import contextlib
import time

StopEvent = object()

class ThreadPool(object):

    def __init__(self, max_num, max_task_num = None):
        if max_task_num:
            self.q = queue.Queue(max_task_num)
        else:
            self.q = queue.Queue()
        self.max_num = max_num
        self.cancel = False
        self.terminal = False
        self.generate_list = []
        self.free_list = []

    def run(self, func, args, callback=None):
        """
        线程池执行一个任务
        :param func: 任务函数
        :param args: 任务函数所需参数
        :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
        :return: 如果线程池已经终止,则返回True否则None
        """
        if self.cancel:
            return
        if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
            self.generate_thread()
        w = (func, args, callback,)#主线程
        self.q.put(w)#主线程

    def generate_thread(self):
        """
        创建一个线程
        """
        t = threading.Thread(target=self.call)
        t.start()

    def call(self):
        """
        循环去获取任务函数并执行任务函数
        """
        current_thread = threading.currentThread()
        self.generate_list.append(current_thread)

        event = self.q.get()#if q为空,则阻塞住,一直等到有任务进来并把它取出来
        while event != StopEvent:

            func, arguments, callback = event
            try:
                result = func(*arguments)
                success = True
            except Exception as e:
                success = False
                result = None

            if callback is not None:
                try:
                    callback(success, result)
                except Exception as e:
                    pass

            with self.worker_state(self.free_list, current_thread):
                if self.terminal:
                    event = StopEvent
                else:
                    event = self.q.get()#key:该线程在这里继续等待新的任务,任务来了,继续执行
                                        #暂时将该线程对象放到free_list中。
        else:

            self.generate_list.remove(current_thread)

    def close(self):
        """
        执行完所有的任务后,所有线程停止
        """
        self.cancel = True
        full_size = len(self.generate_list)
        while full_size:
            self.q.put(StopEvent)
            full_size -= 1

    def terminate(self):
        """
        无论是否还有任务,终止线程
        """
        self.terminal = True

        while self.generate_list:
            self.q.put(StopEvent)

        self.q.queue.clear()

    @contextlib.contextmanager
    def worker_state(self, free_list, worker_thread):
        """
        用于记录线程中正在等待的线程数
        """
        free_list.append(worker_thread)#新的任务来的时候判断
                                 # if len(self.free_list) == 0 and len(self.generate_list) < self.max_num
                                 # 任务得创建新的线程来处理;如果len(self.free_list) != 0:由阻塞着的存在free_list中的线程处理(event = self.q.get())
        try:
            yield
        finally:
            free_list.remove(worker_thread)

# How to use


pool = ThreadPool(5)

def callback(status, result):
    # status, execute action status
    # result, execute action return value
    pass


def action(i):
    time.sleep(1)
    print(i)

for i in range(30):
    ret = pool.run(action, (i,), callback)

time.sleep(2)
print(len(pool.generate_list), len(pool.free_list))
print(len(pool.generate_list), len(pool.free_list))

# pool.close()
# pool.terminate()

威尼斯人线上娱乐 76

 延伸:

威尼斯人线上娱乐 77

威尼斯人线上娱乐 78

import contextlib
import socket
@contextlib.contextmanager
def context_socket(host,port):
    sk=socket.socket()
    sk.bind((host,port))
    sk.listen(5)
    try:
        yield sk
    finally:sk.close()

with context_socket('127.0.0.1',8888) as socket:
    print(socket)

威尼斯人线上娱乐 79

 

API

  • q.qsize() 再次回到队列的大大小小
  • q.empty() 借使队列为空,重返True,反之False
  • q.full() 假使队列满了,重临True,反之False
  • q.full 与 maxsize 大小对应
  • q.get([block[, timeout]]) 获取队列,timeout等待时间
  • q.get_nowait() 相当q.get(False)
    非阻塞 q.put(item) 写入队列,timeout等待时间
  • q.put_nowait(item) 相当q.put(item, False)
  • q.task_done() 在做到壹项工作将来,q.task_done()
    函数向职务现已到位的行列发送一个实信号
  • q.join() 实际上意味着等到队列为空,再实施其他操作


相关文章

发表评论

电子邮件地址不会被公开。 必填项已用*标注

网站地图xml地图