威尼斯人线上娱乐

【威尼斯人线上娱乐】洗礼灵魂,进度间通讯

20 4月 , 2019  

 

多进程

经过之间是相互独立的,python是运维进度的时候,是开发银行的是原生进程。进程是向来不GIL锁的,而且不设有锁的概念,进程之间的数据式不可能共享的,而线程是可以的。

当10二线程创制完毕之后,start并从未了及时运转,依旧须要和任何线程抢CPU的资格,只是
日子异常的短。
进度之间的通讯分为二种,queue和pipe

写一个game 循环
game
loop是各类游戏的宗旨.它不停的取得用户输入,更新游戏状态,渲染游戏结果到显示屏上.网页游戏分为客户端和服务端两部分.两边的loop通过互连网连接起来.常常状态下,客户端获取用户输入,发送到服务端,服务端管理计算数据,更新游戏用户状态,发送结果个客户端.比方游戏用户或许游戏物体的地点.万分主要的是,不要把客户端和服务端的效能混淆了,假如未有充足的理由的话.
假如在客户端做游戏总括,那么不相同的客户端格外轻松就差异台了.

进程

壹、进度的概念

用muliprocessing这么些包中的Process来定义多进度,跟定义多线程类似

from multiprocessing import Process   # 导入进程模块
import time


def run(name):
    time.sleep(2)
    print("hello", name)

if __name__ == "__main__":
    p_obj_list = list()    # 存放进程对象
    for i in range(10):    # 启动10个进程
        p = Process(target=run, args=("QQ{0}".format(i),))  # 产生一个进程实例
        p.start()   # 启动进程
        p_obj_list.append(p)

    for p in p_obj_list:
        p.join()   # 等待进程结果
 1 import multiprocessing
 2 def foo(q):
 3     q.put([1,'hello',True])
 4 if __name__=='__main__':
 5     q=multiprocessing.Queue()#通过multiprocessing建立一个队列
 6     p=multiprocessing.Process(target=foo,args=(q,))
 7   #用multiprocessing在调用Process,建立一个子进程,定义函数名,将q作为参数传到foo函数,
 8     #foo函数就可以通过这个参数来与主进程做交互了。
 9     p.start()#激活这个子进程
10     print(q.get())#主进程

威尼斯人线上娱乐 ,A game loop iteration is often called a tick. Tick is an event
meaning that current game loop iteration is over and the data for the
next frame(s) is ready.

一.含义:计算机中的程序关于某数码集结上的叁回运营活动,是系统进行能源分配和调解的着力单位。说白了正是1个顺序的施行实例。

实践四个顺序就是多少个经过,比方你张开浏览器看到笔者的博客,浏览器自身是一个软件程序,你此时开发的浏览器便是三个进程。

 

2、进度中投入线程

from multiprocessing import Process
import time,threading


def thread_run(name):   # 定义线程执行的方法
    print("{0}:{1}".format(name, threading.get_ident()))  # thread.get_ident ()返回当前线程的标识符,标识符是一个非零整数


def run(name):
    time.sleep(2)
    print("hello", name)
    t = threading.Thread(target=thread_run, args=(name,))   # 嵌入线程
    t.start()   # 执行线程


if __name__ == "__main__":
    p_obj_list = list()
    for i in range(10):
        p = Process(target=run, args=("QQ{0}".format(i),))
        p.start()
        p_obj_list.append(p)

    for p in p_obj_list:
        p.join()

地方函数通过multiprocessing的queue来实现进度间通讯。

在下八个例证中,大家写一个客户端,这么些客户端通过WebSocket连接服务器,同时运行二个总结的loop,接受输入发送给服务器,回显音信.Client
source code is located
here.

二.进程的风味

  • ### 1个历程里能够有七个子进程

  • ### 新的长河的制造是完全拷贝整个主进度

  • ### 进度里能够包罗线程

  • ### 进度之间(包含主进度和子进度)不设有多中国少年共产党享,相互通讯(浏览器和python之间的数码不可能互通的),要通讯则要借助队列,管道之类的

 

3、父亲和儿子进程

【威尼斯人线上娱乐】洗礼灵魂,进度间通讯。各类子进度都是由二个父进度运营的,各样程序也是有1个父进程

from multiprocessing import Process
import os


def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())  # 获得父进程ID
    print('process id:', os.getpid())  # 获得子进程ID
    print("\n\n")


def f(name):
    info('\033[31;1m function f\033[0m')
    print('hello', name)

if __name__ == '__main__':
    info('\033[32;1m main process line\033[0m')
    p = Process(target=f, args=('QQ',))
    p.start()
    p.join()

  

 

 

 1 from multiprocessing import  Pipe,Process
 2 def foo(sk):
 3     sk.send('hello world')#通过管道sk发送内容
 4     print(sk.recv())#打印接收到的内容
 5 if __name__ == '__main__':
 6     sock,conn=Pipe()#定义一个管道的两头
 7     p=Process(target=foo,args=(sock,))#由于上面已经通过multiprocessing导入了Process,
 8     # 所以这里直接就可以创建一个子进程,并将sock(管道的一头)作为参数给foo函数
 9     p.start()#激活这个进程
10     print(conn.recv())#打印接收到的内容,conn是管道的另一头
11     conn.send('hi son')#通过管道发送内容

3.1

Example 3.1 source
code

大家应用aiohttp来创制二个game
server.那个库能够创造asyncio的client和server.那么些库的益处是还要帮助http请求和websocket.所以服务器就不要求把结果处理成html了.
来看一下server如何运维:

import asyncio
from aiohttp import web

async def handle(request):
    index = open("index.html", 'rb')
    content = index.read()
    return web.Response(body=content)


async def wshandler(request):
    app = request.app
    ws = web.WebSocketResponse()
    await ws.prepare(request)
    app["sockets"].append(ws)

    while 1:
        msg = await ws.receive()
        if msg.tp == web.MsgType.text:
            print("Got message %s" % msg.data)
            ws.send_str("Pressed key code: {}".format(msg.data))
        elif msg.tp == web.MsgType.close or\
             msg.tp == web.MsgType.error:
            break

    app["sockets"].remove(ws)
    print("Closed connection")
    return ws

async def game_loop(app):
    while 1:
        for ws in app["sockets"]:
            ws.send_str("game loop says: tick")
        await asyncio.sleep(2)


app = web.Application()
app["sockets"] = []

asyncio.ensure_future(game_loop(app))

app.router.add_route('GET', '/connect', wshandler)
app.router.add_route('GET', '/', handle)

web.run_app(app)

这些代码就不翻译了,

三.经过和线程之间的界别

  • ### 线程共享地址空间,而经过之间有互动独立的空间

  • ### 线程之间数据互通,相互操作,而经过不得以

  • ### 新的线程比新的经过创立轻易,比开进度的开垦小大多

  • ### 主线程能够影响子线程,而主进度不能够影响子进程

 

 

经过间数据交互与共享

知情分歧进度之间内部存款和储蓄器是不共享的,要想落成三个进程间的通讯要求选取multiprocessing库中的queue(队列)模块,这几个multiprocessing库中的queue模块跟单纯的queue库是不平等的。进度导入前者(这里的queue是越发为经过之间的通信设计的)不失误,导入后者(那里的queue主如若线程间数据交互)出错。

上边代码通过Pipe来落到实处三个经过间的通讯。

三.二 有请求才开头loop

地点的例证,server是不停的loop.今后改成有请求才loop.
同时,server上或然存在多个room.2个player创立了多个session(一场较量照旧1个别本?),其余的player可以参加.

import asyncio
from aiohttp import web

async def handle(request):
    index = open("index.html", 'rb')
    content = index.read()
    return web.Response(body=content)


async def wshandler(request):
    app = request.app
    ws = web.WebSocketResponse()
    await ws.prepare(request)
    app["sockets"].append(ws)

    if app["game_is_running"] == False:
        asyncio.ensure_future(game_loop(app))
    while 1:
        msg = await ws.receive()
        if msg.tp == web.MsgType.text:
            print("Got message %s" % msg.data)
            ws.send_str("Pressed key code: {}".format(msg.data))
        elif msg.tp == web.MsgType.close or\
             msg.tp == web.MsgType.error:
            break

    app["sockets"].remove(ws)
    print("Closed connection")

    return ws

async def game_loop(app):
    app["game_is_running"] = True
    while 1:
        for ws in app["sockets"]:
            ws.send_str("game loop says: tick")
        if len(app["sockets"]) == 0:
            break
        await asyncio.sleep(2)
    app["game_is_running"] = False


app = web.Application()

app["sockets"] = []
app["game_is_running"] = False

app.router.add_route('GET', '/connect', wshandler)
app.router.add_route('GET', '/', handle)

web.run_app(app)

四.在python中,进度与线程的用法就只是名字区别,使用的法子也是没多大区别

一、线程访问queue

import queue,threading


def f(q):
    q.put([66, None, 'hello word'])

if __name__ == '__main__':
    q = queue.Queue()   # 把这个q传给了子线程
    p = threading.Thread(target=f, args=(q,))   # 子线程访问父线程的q
    p.start()
    print(q.get())
    p.join()

#执行结果
[66, None, 'hello word']
 1 from multiprocessing import  Manager,Process
 2 def foo(l,i):#收到参数,l是Mlist,i是循环的i
 3     l.append(i*i)#将i平方添加到Mlist
 4 if __name__=='__main__':
 5     manager=Manager()
 6     Mlist=manager.list([11,22,33])#定义一个列表
 7 
 8     l=[]
 9     for i in range(5):#创建5个子进程
10         p=Process(target=foo,args=(Mlist,i))#定义一个进程,将Mlist和i作为参数传到foo
11         p.start()#激活这个进程,执行foo函数
12         l.append(p)#将5个进程添加到l这个列表
13     for i in l:
14         i.join()#循环这个列表,然后将每个进程join
15     print(Mlist)#当所有的子进程都结束,运行主进程

3.3 管理task

直接操作task对象.未有人的时候,能够cancel掉task.
注意!:
This cancel()
call tells scheduler not to pass execution to this coroutine anymore and
sets its state tocancelled
which then can be checked by cancelled()
method. And here is one caveat worth to mention: when you have external
references to a task object and exception happens in this task, this
exception will not be raised. Instead, an exception is set to this task
and may be checked by exception()
method. Such silent fails are not useful when debugging a code. Thus,
you may want to raise all exceptions instead. To do so you need to call
result()
method of unfinished task explicitly. This can be done in a callback:

举例想要cancel掉,也不想触发exception,那么就检查一下canceled状态.
app["game_loop"].add_done_callback(lambda t: t.result() if not t.cancelled() else None)

5.简练实例

一)创制2个轻便的多进程:

#!usr/bin/env python
#-*- coding:utf-8 -*-

# author:yangva

import multiprocessing,time

def func(name):
    time.sleep(1)
    print('hello',name,time.ctime())

ml = []
for i in range(3):
    p = multiprocessing.Process(target=func,args=('yang',))
    p.start()
    ml.append(p)

for i in ml:
    i.join() #注意这里,进程必须加join方法,不然会导致僵尸进程

  

运维结果:

威尼斯人线上娱乐 1

 

不管怎么说,反正报错了,一样的代码,在python自带的IDLE里索求:

威尼斯人线上娱乐 2

未曾其余东西就终止了。好的,那里要说下了,依据本人个人的接头,当您用pycharm或然IDLE时,pycharm恐怕IDLE在您的管理器里自身也是叁个经过,并且暗许是主进程。所以在pycharm会报错,而在IDLE里运转正是空荡荡,个人理解,对不对目前不谈,早先时期学到子进度时再说。

 

消除办法便是,别的的不改变,加贰个if __name == ‘__main__’判断就行:

威尼斯人线上娱乐 3

 

如此那般就一下子就解决了了,好的,你现在能够体会到那句话了,进度与线程的用法就只是名字不相同,使用的方式也是没多大不一样。不多说,自行体会。而运转结果来看的光阴是同步的,那么那进度才是确实意义上的互动运营。

 

二)自定义类式进度

#!usr/bin/env python
#-*- coding:utf-8 -*-

# author:yangva

import multiprocessing,time

class myprocess(multiprocessing.Process):
    def __init__(self,name):
        super(myprocess,self).__init__()
        self.name = name

    def run(self):
        time.sleep(1)
        print('hello',self.name,time.ctime())

if __name__ == '__main__':
    ml = []
    for i in range(3):
        p = myprocess('yang')
        p.start()
        ml.append(p)

    for j in ml:
        j.join()

  

运行结果:

威尼斯人线上娱乐 4

 

 

接下来setDaemon之类的方法和线程也是完全壹致的。

 

三)每贰个进度都有根进度,换句话,每贰个经过都有父进程

#!usr/bin/env python
#-*- coding:utf-8 -*-

# author:yangva

import multiprocessing,time,os

def info():
    print('mudule name:',__name__)
    print('parent process:',os.getppid()) #父进程号
    print('son process:',os.getpid())     #子进程号

if __name__ == '__main__':
    info()
    print('-----')
    p = multiprocessing.Process(target=info,args=[])
    p.start()
    p.join()

  

运作结果:

 

威尼斯人线上娱乐 5

 

而查看自个儿本机的进程:

威尼斯人线上娱乐 6

 

能够知晓,620四就是pycharm,便是那儿的根进度,而主进度正是自个儿这几个py文件(由__main__能够),接着再往下的子进程等等等的。

 

二、进度访问queue

from multiprocessing import Process
import queue


def f(q):
    q.put([66, None, 'hello word'])

if __name__ == '__main__':
    q = queue.Queue()   # 把这个q传给了子线程
    p = Process(target=f, args=(q,))   # 子线程访问父线程的q
    p.start()
    print(q.get())
    p.join()

#执行结果
Traceback (most recent call last):
  File "C:/Users/dell/PycharmProjects/untitled/process/进程的定义.py", line 77, in <module>
    p.start()
  File "C:\Python36\lib\multiprocessing\process.py", line 105, in start
    self._popen = self._Popen(self)
  File "C:\Python36\lib\multiprocessing\context.py", line 223, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "C:\Python36\lib\multiprocessing\context.py", line 322, in _Popen
    return Popen(process_obj)
  File "C:\Python36\lib\multiprocessing\popen_spawn_win32.py", line 65, in __init__
    reduction.dump(process_obj, to_child)
  File "C:\Python36\lib\multiprocessing\reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
TypeError: can't pickle _thread.lock objects

地点代码通过Manger达成子进度间的通讯。

三.四 等待多个事件

Example 3.4 source
code
在大多气象下,须求在服务器处理客户端的handler中,
等待多少个事件.除了等候客户端的音讯,大概还需求拭目以待不相同的消息发生.比方,
游戏1局的大运到了,须求1个timer的数字信号.可能,要求任何进度的音信,或许其余server的音讯.(使用遍布式新闻系统).
下边那一个事例使用了Condition.那里不保留全体的socket,而是在历次循环结束通过Condition.notify_all来公告.这一个动用pub/sub情势达成.
为了在二个handler中,等待多个事件,首先大家接纳ensure_future来包装一下.

if not recv_task: 
  recv_task = asyncio.ensure_future(ws.receive())
if not tick_task: 
  await tick.acquire() 
  tick_task = asyncio.ensure_future(tick.wait())```

在我们调用Condition.call之前,我们需要获取一下锁.这个锁在调用了tick.wait之后就释放掉.这样其他的协程也可以用了.但是当我们得到一个notification, 会重新获取锁.所以我们在收到notification之后要release一下.

done, pending = await asyncio.wait( [recv_task, tick_task],
return_when=asyncio.FIRST_COMPLETED)“`
以此会阻塞住直到有三个职责达成,这一年会回到四个列表,完结的和还是在运行的.借使task
is done,大家再设置为None,那样下1个循环里会再一回创立.

import asyncio
from aiohttp import web

async def handle(request):
    index = open("index.html", 'rb')
    content = index.read()
    return web.Response(body=content)



tick = asyncio.Condition()

async def wshandler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    recv_task = None
    tick_task = None
    while 1:
        if not recv_task:
            recv_task = asyncio.ensure_future(ws.receive())
        if not tick_task:
            await tick.acquire()
            tick_task = asyncio.ensure_future(tick.wait())

        done, pending = await asyncio.wait(
            [recv_task,
             tick_task],
            return_when=asyncio.FIRST_COMPLETED)

        if recv_task in done:
            msg = recv_task.result()
            if msg.tp == web.MsgType.text:
                print("Got message %s" % msg.data)
                ws.send_str("Pressed key code: {}".format(msg.data))
            elif msg.tp == web.MsgType.close or\
                 msg.tp == web.MsgType.error:
                break
            recv_task = None

        if tick_task in done:
            ws.send_str("game loop ticks")
            tick.release()
            tick_task = None

    return ws

async def game_loop():
    while 1:
        await tick.acquire()
        tick.notify_all()
        tick.release()
        await asyncio.sleep(1)

asyncio.ensure_future(game_loop())

app = web.Application()

app.router.add_route('GET', '/connect', wshandler)
app.router.add_route('GET', '/', handle)

web.run_app(app)

(这些根本是asyncio.Condition的用法)

陆.多进度间的通讯和数据共享

第一大家都早已清楚进度之间是单身的,不得以互通,并且数据交互独立,而在骨子里支付中,一定会遇到需求经过间通讯的景色须求,那么我们怎么搞呢

有三种办法:

  • pipe
  • queue

1)使用queue通信

在三三十二线程那里已经学过queue了,创立queue的法门,q =
queue.Queue(),这种创立是创办的线程queue,并不是进程queue。创立进度queue的措施是:

威尼斯人线上娱乐 7

 

 

#!usr/bin/env python
#-*- coding:utf-8 -*-

# author:yangva

import multiprocessing

def func(q,name,age): #这里必须要把q对象作为参数传入才能实现进程之间通信
    q.put({'name':name,'age':age})

if __name__ == '__main__':
    q = multiprocessing.Queue() #创建进程queue对象
    ml = []
    for i in range(3):
        p = multiprocessing.Process(target=func,args=(q,'yang',21))
        p.start()
        ml.append(p)
    print(q.get()) #获取queue信息
    print(q.get()) 
    print(q.get())
    for i in ml:
        i.join()

  

运作结果:

威尼斯人线上娱乐 8

 

好的,已经由此queue完结通讯,那么细心的情侣恐怕会想,此时的queue到底是同一个吧依旧copy的吧?开首测试,码如下:

 

#!usr/bin/env python
#-*- coding:utf-8 -*-

# author:yangva

import multiprocessing

def func(q,name,age):
    q.put({'name':name,'age':age})
    print('id:',id(q))
if __name__ == '__main__':
    q = multiprocessing.Queue()
    ml = []
    print('id:',id(q))
    for i in range(3):
        p = multiprocessing.Process(target=func,args=(q,'yang',21))
        p.start()
        ml.append(p)
    print(q.get())
    print(q.get())
    print(q.get())
    for i in ml:
        i.join()

  

在Windows平台运营结果:

威尼斯人线上娱乐 9

 

Linux的ubuntu下是那般的:

威尼斯人线上娱乐 10

 

那就糟糕怎么说了,笔者个人的接头,线程和进程那类与Computer硬件(CPU,RAM)等有挂钩的都有不分明因素,姑且感到在Windows平台里queue是copy的,在Linux里是同三个吗,并且据经验人员代表,在macbook上也是同一个。

 

还有个难点, 若是使用的queue是线程式的呢?

代码别的都没变,只改了此处:

威尼斯人线上娱乐 11

 

结果:

威尼斯人线上娱乐 12

 

虽说报错了,但是却有3个关键点,提示的是无法pickle线程锁对象,也正是说刚才大家利用的queue是经过对象,所以能够pickle,注意了,那里就是关键点,使用了pickle,那么也正是说,在Windows平台里是copy的,假使不是copy,就不须求存在pickle对吗?直接拿来用正是呀,干嘛要pickle之后取的时候再反pickle呢对啊?

 

再看Linux下啊,由于Linux暗中认可是python二,所以模块包名稍微有点分歧

威尼斯人线上娱乐 13

结果阻塞住了,然而前边的或然出来了,看到的id果然依旧同样的。

 

那边就有叁点供给专注:(个人驾驭,如有误望指正)

1.进程里的确无法使用线程式queue

二.Windows平台的进度式queue是copy的

三.Linux平台的线程式和进度式都以同3个,不过如若在经过里使用线程式queue会阻塞住

但自个儿个人以为copy更有安全性

 

2)使用pipe通信

 

#!usr/bin/env python
#-*- coding:utf-8 -*-

# author:yangva

import multiprocessing

def func(conn):
    conn.send('约吗?')  #子进程发送数据
    print(conn.recv())  #接受数据,不能加参数1024之类的
    conn.close()        #子进程关闭连接
if __name__ == '__main__':
    parent_conn,son_conn = multiprocessing.Pipe() #创建pipe对象,父进程,子进程
    ml = []
    p = multiprocessing.Process(target=func,args=(son_conn,))
    p.start()
    print(parent_conn.recv())  #父进程接受数据,不能加参数1024之类的
    parent_conn.send('不约')    #发送数据
    p.join()                   #join方法是进程特有

 

  

运作结果:

威尼斯人线上娱乐 14

 

如此那般就关系上了,相信你开掘了,着力和前面的socket大致,可是唯壹的两样是recv()方法无法加参数,不信的话,你加来尝试

回望线程通讯,相信你会感到进程比线程更利于

 

自然pipe也能够有多少个:

#!usr/bin/env python
#-*- coding:utf-8 -*-

# author:yangva

import multiprocessing,time

def func(conn):
    conn.send('约吗?')  #子进程发送数据
    print(conn.recv())
    conn.close()        #子进程关闭连接
if __name__ == '__main__':
    parent_conn,son_conn = multiprocessing.Pipe() #创建pipe对象,父进程,子进程
    ml = []
    for i in range(3):
        p = multiprocessing.Process(target=func,args=(son_conn,))
        p.start()
        ml.append(p)
        print(parent_conn.recv())  #父进程接受数据,不能加参数1024之类的
        parent_conn.send('不约')
    for i in ml:
        i.join()

  

运作结果:

威尼斯人线上娱乐 15

 

七.进程之间数据共享——manager

比较简单,就动用了经过里的manager对象下的依次数据类型,别的的很粗大略的,小编就不注释了

#!usr/bin/env python
#-*- coding:utf-8 -*-

# author:yangva

import multiprocessing

def func(l,d,num):
    l.append(num)
    d[num] = num

if __name__ == '__main__':
    with multiprocessing.Manager() as manager:
        l = manager.list()
        d = manager.dict()
        ml = []
        for i in range(6):
            p = multiprocessing.Process(target=func,args=(l,d,i))
            p.start()
            ml.append(p)
        for i in ml:
            i.join()
        print('d:',d)
        print('l:',l)

  

运营结果:

威尼斯人线上娱乐 16

 

如此这般是或不是就兑现了数码共享了?

 

好的,进度也分析完了

 

3、进度访问`multiprocessing库中的Queue模块`

from multiprocessing import Process,Queue


def f(q):
    q.put([66, None, 'hello word'])

if __name__ == '__main__':
    q = Queue()   # 把这个q传给了子线程
    p = Process(target=f, args=(q,))   # 子线程访问父线程的q
    p.start()
    print(q.get())
    p.join()

#执行结果
[66, None, 'hello word']

父过程也正是克隆二个Q,把团结的Q克隆了1份交给子进度,子进度这一年往Q里面放了壹份数据,然后父进程又能实际的拿走到。可是你克隆了一份是还是不是就和父进度未有涉及了,为啥还是能维系在协同呢?不过实际:等于那三个Q里面包车型地铁数目又把它类别化了,类别化到一个中路的地点,类似于翻译,然后反体系化给那一个父进程那边来了,其实那多少个Q就是经过pickle来类别化的,不是1个真正的Q。

小结:七个线程之间能够修改多少个数量,不加锁,只怕就会出错。以往进度中的Queue,是落到实处了数据的传递,不是在修改同1份数据,只是达成3个历程的数量传给了其余四个进程。

威尼斯人线上娱乐 17

三.5 和线程一同利用

Example 3.5 source
code

这么些事例,大家把asyncio的loop放到其余五个独自线程中.上边也说过了,因为python的GIL的设计,不容许同时运营两个code.所以使用八线程来管理计算瓶颈的标题,并不是3个好主意.然后还有别的一个用到线程原因正是:
要是有个别函数或然库不帮助asyncio,那么就会阻塞住主线程的运营.那种状态下唯壹的方法正是坐落其余1个线程中.

要留意asyncio本人不是threadsafe的,不过提供了七个函数.call_soon_threadsafe和run_coroutine_threadsafe.
当你运营这一个事例的时候,你会看到notify的线程id正是主线程的id,那是因为notify协程运维在主线程中,sleep运营在其它三个线程,所以不会阻塞住主线程.

import asyncio
from aiohttp import web

from concurrent.futures import ThreadPoolExecutor
import threading
from time import sleep


async def handle(request):
    index = open("index.html", 'rb')
    content = index.read()
    return web.Response(body=content)


tick = asyncio.Condition()

async def wshandler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    recv_task = None
    tick_task = None
    while 1:
        if not recv_task:
            recv_task = asyncio.ensure_future(ws.receive())
        if not tick_task:
            await tick.acquire()
            tick_task = asyncio.ensure_future(tick.wait())

        done, pending = await asyncio.wait(
            [recv_task,
             tick_task],
            return_when=asyncio.FIRST_COMPLETED)

        if recv_task in done:
            msg = recv_task.result()
            if msg.tp == web.MsgType.text:
                print("Got message %s" % msg.data)
                ws.send_str("Pressed key code: {}".format(msg.data))
            elif msg.tp == web.MsgType.close or\
                 msg.tp == web.MsgType.error:
                break
            recv_task = None

        if tick_task in done:
            ws.send_str("game loop ticks")
            tick.release()
            tick_task = None

    return ws

def game_loop(asyncio_loop):
    print("Game loop thread id {}".format(threading.get_ident()))
    # a coroutine to run in main thread
    async def notify():
        print("Notify thread id {}".format(threading.get_ident()))
        await tick.acquire()
        tick.notify_all()
        tick.release()

    while 1:
        task = asyncio.run_coroutine_threadsafe(notify(), asyncio_loop)
        # blocking the thread
        sleep(1)
        # make sure the task has finished
        task.result()

print("Main thread id {}".format(threading.get_ident()))

asyncio_loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=1)
asyncio_loop.run_in_executor(executor, game_loop, asyncio_loop)

app = web.Application()

app.router.add_route('GET', '/connect', wshandler)
app.router.add_route('GET', '/', handle)

web.run_app(app)

 

 

3.六 多进度和强大scaling up

二个线程的server能够干活了,不过这一个server唯有3个cpu可用.
为了增加,我们须求周转多个进程,每一个进程包括本人的eventloop.
所以大家须要经过之间通讯的格局.同时在戏耍领域,经常会有大量的总结(寻路什么的).那个职务平日不会火速到位(贰个tick内).
在协程中展开大气耗费时间的估摸没有趣,因为会阻塞住消息循环本身.所以在那种情状下,把大气的乘除交给其它的进度就很有须要了
最简易的主意正是开发银行四个单线程的server.然后,能够应用haproxy那样的load
balancer,来把客户端的总是分散到不一致的进程上去.进城之间的通讯有大多方法.1种是依照网络连接,也能够扩展到多少个server.今后曾经有无数兑现了音讯和存款和储蓄系统的框架(基于asyncio).
比如:

aiomcache
for memcached client
aiozmq
for zeroMQ
aioredis
for Redis storage and pub/sub

还有其余的有的乱7八糟,在git上,大部分是aio打头.
选拔互连网音信,能够丰盛管用的蕴藏数据,大概沟通消息.可是倘使要管理大量实时的数量,而且有雅量经过通讯的情景,就老大了.在那种状态下,1个更确切的法子是采取规范的unix
pipe.asyncio has support for pipes and there is a very low-level
example of the server which uses
pipes
inaiohttp repository.
在那些例子中,我们运用python的高档次的multiprocessing库来触发多个新的进程来举行测算,通过multiprocessing.Queue来举办进度间通讯.不幸的是,近日的multiprocessing完结并不协助asyncio.所以阻塞的调用就会堵塞住event
loop.
这就是利用线程的最棒案例.因为大家在其余四个线程运维multiprocessing的代码.看代码

import asyncio
from aiohttp import web

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from multiprocessing import Queue, Process
import os
from time import sleep


async def handle(request):
    index = open("index.html", 'rb')
    content = index.read()
    return web.Response(body=content)


tick = asyncio.Condition()

async def wshandler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    recv_task = None
    tick_task = None
    while 1:
        if not recv_task:
            recv_task = asyncio.ensure_future(ws.receive())
        if not tick_task:
            await tick.acquire()
            tick_task = asyncio.ensure_future(tick.wait())

        done, pending = await asyncio.wait(
            [recv_task,
             tick_task],
            return_when=asyncio.FIRST_COMPLETED)

        if recv_task in done:
            msg = recv_task.result()
            if msg.tp == web.MsgType.text:
                print("Got message %s" % msg.data)
                ws.send_str("Pressed key code: {}".format(msg.data))
            elif msg.tp == web.MsgType.close or\
                 msg.tp == web.MsgType.error:
                break
            recv_task = None

        if tick_task in done:
            ws.send_str("game loop ticks")
            tick.release()
            tick_task = None

    return ws

def game_loop(asyncio_loop):
    # coroutine to run in main thread
    async def notify():
        await tick.acquire()
        tick.notify_all()
        tick.release()

    queue = Queue()

    # function to run in a different process
    def worker():
        while 1:
            print("doing heavy calculation in process {}".format(os.getpid()))
            sleep(1)
            queue.put("calculation result")

    Process(target=worker).start()

    while 1:
        # blocks this thread but not main thread with event loop
        result = queue.get()
        print("getting {} in process {}".format(result, os.getpid()))
        task = asyncio.run_coroutine_threadsafe(notify(), asyncio_loop)
        task.result()

asyncio_loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=1)
asyncio_loop.run_in_executor(executor, game_loop, asyncio_loop)

app = web.Application()

app.router.add_route('GET', '/connect', wshandler)
app.router.add_route('GET', '/', handle)

web.run_app(app)

worker()在其余三个进度中运营.包含了部分耗费时间计算,把结果放在queue中.获得结果随后,文告主线程的主eventloop那3个等待的client.这些例子十一分简陋,进度未有适当甘休,同时worker或然供给此外3个queue来输入数据.

Important! If you are going to run anotherasyncio
event loop in a different thread or sub-process created from main
thread/process, you need to create a loop explicitly, using
asyncio.new_event_loop()
, otherwise, it will not work.

四、通过Pipe()达成进程间的数量交互,manger落成数据共享

上边的事例是经过进度中的Queue,来开始展览多中国少年共产党享的,其实还有一种艺术贯彻数据共享,那正是管道,pipe,以及数额共享manger。

4.1、Pipe()函数

管道函数会回到由管道两方连日来的一组连接对象,该管道私下认可是双向的(双向的)。

from multiprocessing import Process, Pipe


def f(conn):
    conn.send([66, None, 'hello,word'])  # 发送消息给父进程
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()  # 管道生成返回两个实例,是双向的,这边把第1个作为父连接,第2个作为子连接。也可以,两者角色调换一下
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # 接收子进程的消息
    p.join()

四.二、接受反复和发送多次

from multiprocessing import Process, Pipe


def f(conn):
    conn.send([66, None, 'hello,word'])  # 发送消息给父进程
    conn.send("QQ")  # 发送消息给父进程
    print(conn.recv())   # 接收父进程的消息
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()  # 管道生成返回两个实例,是双向的,这边把第1个作为父连接,第2个作为子连接。也可以,两者角色调换一下
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())
    print(parent_conn.recv())  # 接收两次
    parent_conn.send("微信")   # 发送给子进程
    p.join()

4.3、manger

manger可以成功数据间的共享。

from multiprocessing import Process, Manager
import os


def f(d, l):
    d[os.getpid()] = os.getpid()
    l.append(os.getpid())
    print(l)

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()   # 声明一个字典,这个字典是用manger声明的,不是用dict()声明的
        # manger.dict()是用专门的语法生产一个可在多进程之间进行传递和共享的一个字典
        l = manager.list(range(5))  # 同样声明一个列表
        p_list = []
        for i in range(10):
            p = Process(target=f, args=(d, l))
            p.start()
            p_list.append(p)
        for res in p_list:
            res.join()
        print(d)
        print(l)

线程修改同1份数据的时候供给加锁,进度修改数据吧:不用加锁,因为这些manger已经帮您加锁了,它就私下认可不允许多个进程同时修改1份数据。七个经过未有艺术同时修改1份数据,进程之间是独自的,它和谐也要加锁,因为它把团结的东西还要copy好几份,跟刚刚的老大Queue同样,copy1贰个字典最终合成一个字典

 

 

 

协程
协程,又叫微线程,实际上便是单线程,通过python语法,或模块来落到实处产出。
实质上就是叁个经过二个线程。

经过锁和进度池的施用

威尼斯人线上娱乐 18

1、进程锁

透过multiprocessing中的Lock模块来完毕进度锁

from multiprocessing import Process,Lock   # 导入进程锁


def f(l, i):
    l.acquire()    # 加锁
    try:
        print("hello word", i)
    finally:
        l.release()   # 释放锁

if __name__ == "__main__":
    lock = Lock()     # 定义锁
    for num in range(10):
        Process(target=f, args=(lock, num,)).start()  # 把锁传入进程中

进程中不是互相独立的呢?为啥还要加锁:固然各样进度都是独立运维的,不过难题来了,它们共享壹块显示器。那个锁存在的含义便是荧屏共享。即使经过1想着打字与印刷数据,而经过二想也想打字与印刷数据的事态,就有一点都不小希望乱套了,然后经过这一个锁来调整,去打字与印刷的时候,这些显示屏唯有本身独占,导致显示屏不会乱。

威尼斯人线上娱乐 19

2、进程池apply和apply_saync

2.1、appley

一头实施,也正是串行试行的

from multiprocessing import Pool  # 导入进程池模块pool
import time,os


def foo(i):
    time.sleep(2)
    print("in process", os.getpid())  # 打印进程号


if __name__ == "__main__":
    pool = Pool(processes=5)   # 设置进程池个数为5,也可以写成pool = Pool(5),允许进程池同时放入5个进程,并且这5个进程交给cpu去运行
    for i in range(10):
        pool.apply(func=foo, args=(i,))   # 同步执行挂起进程
    print('end')
    pool.close()
    pool.join()  # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。

2.2、apply_saync

异步施行,也正是并行施行。

from multiprocessing import Pool  # 导入进程池模块pool
import time,os


def foo(i):
    time.sleep(2)
    print("in process", os.getpid())  # 打印进程号


if __name__ == "__main__":
    pool = Pool(processes=5)   # 设置进程池个数为5,也可以写成pool = Pool(5),允许进程池同时放入5个进程,并且这5个进程交给cpu去运行
    for i in range(10):
        pool.apply_async(func=foo, args=(i,))   # 采用异步方式执行foo函数
    print('end')
    pool.close()
    pool.join()  # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。

二.三、异步下回调函数

程序施行实现之后,再回调过来实施这些Bar函数。

from multiprocessing import Process,Pool
import time,os


def foo(i):
    time.sleep(2)
    print("in process", os.getpid())  # 打印子进程的进程号


def bar(arg):
    print('-->exec done:', arg, os.getpid())   # 打印进程号

if __name__ == "__main__":
    pool = Pool(processes=2)
    print("主进程", os.getpid())   # 主进程的进程号
    for i in range(3):
        pool.apply_async(func=foo, args=(i,), callback=bar)   # 执行回调函数callback=Bar
    print('end')
    pool.close()
    pool.join()  # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。

#执行结果
主进程 752
end
in process 2348
-->exec done: None 752
in process 8364
-->exec done: None 752
in process 2348
-->exec done: None 752

注:

  1. 回调函数表达fun=Foo干不完就不实施bar函数,等Foo试行完就去实行Bar
  2. 其2次调函数是主进度去调用的,而不是种种子进度去调用的。
  3. 回调函数的用处:

      比如说你从各种机器上备份落成,在回调函数中自动写三个剧本,说备份完结

  4. 回调函数是主进度调用的案由?

      纵然是子进度去调用那个回调函数,有多少个子进度就有几个接二连三,就算是主进度的话,只供给一遍长连接就足以了,这么些功能就高了

  

 

上海体育场所是用yield完毕了2个多个函数逇并发管理。

 1 from greenlet import greenlet#导入这个模块
 2 def foo():#定义一个函数
 3     print('ok1')#打印
 4     gr2.switch()#将程序切换到下面一个函数,按照名字切
 5     print('ok3')#打印
 6     gr2.switch()#将程序切换到下面一个函数,按照名字切
 7 def bar():
 8     print('ok2')#打印
 9     gr1.switch()#切到上面foo函数
10     print('ok4')
11 gr1=greenlet(foo)#实例化这个函数
12 gr2=greenlet(bar)
13 gr1.switch()#在外面写这个就执行了这个函数

因而greenlet模块的switch来落到实处协程的切换,greenlet模块要求手动去pycharm下载

 1 import gevent#导入这个模块
 2 def foo():
 3     print('running in foo')
 4     gevent.sleep(2)#打印之后睡一秒,模拟io操作
 5     print('switch to foo again')
 6 def bar():
 7     print('switch  to bar')
 8     gevent.sleep(1)#打印之后睡一秒,模拟io操作
 9     print('switch to bar again')
10 gevent.joinall([gevent.spawn(foo),gevent.spawn(bar)])
11 '''
12 这个程序的运行过程是,先执行foo函数,
13 打印之后遇到了IO操作,然后自动切换到下一个函数执行,
14 打印之后又遇到了IO操作,然后切回foo函数发现IO2秒还没有结束,
15 然后又切到了bar函数发现IO结束,打印,再切回foo函数打印
16 '''

地点代码通过gevent模块来兑现写成的IO时期机动切换完成产出的顺序。
gevent需要从pycharm下载。

威尼斯人线上娱乐 20

 


相关文章

发表评论

电子邮件地址不会被公开。 必填项已用*标注

网站地图xml地图