博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
python 之队列和生产者消费者模型、基于selectors模块实现的IO多路复用
阅读量:6330 次
发布时间:2019-06-22

本文共 3372 字,大约阅读时间需要 11 分钟。

创建一个“队列”对象import Queueq = Queue.Queue(maxsize = 10)Queue.Queue类即是一个队列的同步实现。队列长度可为无限或者有限。可通过Queue的构造函数的可选参maxsize来设定队列长度。如果maxsize小于1就表示队列长度无限。q.put(10)将一个值放入队列中调用队列对象的put()方法在队尾插入一个项目。put()有两个参数,第一个item为必需的,为插入项目的值;第二个block为可选参数,默认为1。如果队列当前为空且block为1,put()方法就使调用线程暂停,直到空出一个数据单元。如果block为0,put方法将引发Full异常。q.get()将一个值从队列中取出调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为True。如果队列为空且block为True,get()就使调用线程暂停,直至有项目可用。如果队列为空且block为False,队列将引发Empty异常。
 
q.join()和q.task_done()搭配使用, q.task_done():完成任务,每次从queue中get一个数据之后,当处理好相关问题,最后调用该方法,以提示q.join()是否停止阻塞,让线程向前执行或者退出
import queueq = queue.Queue(5)q.put(10)q.put(20)print(q.get())   #10q.task_done()print(q.get())  #20q.task_done()q.join()print("ending!")

 

其他常用方法:'''此包中的常用方法(q = Queue.Queue()):q.qsize() 返回队列的大小q.empty() 如果队列为空,返回True,反之Falseq.full() 如果队列满了,返回True,反之Falseq.full 与 maxsize 大小对应q.get([block[, timeout]]) 获取队列,timeout等待时间q.get_nowait() 相当q.get(False)非阻塞 q.put(item) 写入队列,timeout等待时间q.put_nowait(item) 相当q.put(item, False)q.task_done() 在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号q.join() 实际上意味着等到队列为空,再执行别的操作'''

 

队列的先进先出模式:

import queue#q=queue.Queue(3)  # 默认是先进先出(FIFO)# q.put(111)# q.put("hello")# q.put(222)# q.put(223,False)## print(q.get())# # print(q.get())# # print(q.get())# ## q.get(False)

 

队列的先进后出模式:

#  先进后出模式# q=queue.LifoQueue()  #  Lifo  last in first out# # # q.put(111)# q.put(222)# q.put(333)# # print(q.get())# print(q.get())# print(q.get())

 

队列的优先级:

# 优先级q=queue.PriorityQueue()q.put([4,"hello4"])q.put([1,"hello"])q.put([2,"hello2"])print(q.get())print(q.get())

 

生产者消费模型:

#生产者消费者模型import time,randomimport queue,threadingq = queue.Queue()def Producer(name):  count = 0  while count <10:    print("making........")    time.sleep(2)    q.put(count)    print('Producer %s has produced %s baozi..' %(name, count))    count +=1    #q.task_done()    #q.join()    print("ok......")def Consumer(name):  count = 0  while count <10:    time.sleep(1)    if not q.empty():        data = q.get()        #q.task_done()        #q.join()        print(data)        print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data))    else:        print("-----no baozi anymore----")    count +=1p1 = threading.Thread(target=Producer, args=('A',))c1 = threading.Thread(target=Consumer, args=('B',))# c2 = threading.Thread(target=Consumer, args=('C',))# c3 = threading.Thread(target=Consumer, args=('D',))p1.start()c1.start()# c2.start()# c3.start()#

 

selectors 模块:

import selectors  # 基于selectors模块实现的IO多路复用,建议大家使用import socketsock=socket.socket()sock.bind(("127.0.0.1",8800))sock.listen(5)sock.setblocking(False)sel=selectors.DefaultSelector() #根据具体平台选择最佳IO多路机制,比如在linux,选择epolldef read(conn,mask):    try:        data=conn.recv(1024)        print(data.decode("UTF8"))        data2=input(">>>")        conn.send(data2.encode("utf8"))    except Exception:        sel.unregister(conn)def accept(sock,mask):    conn, addr = sock.accept()    print("conn",conn)    sel.register(conn,selectors.EVENT_READ,read)sel.register(sock,selectors.EVENT_READ,accept)  # 注册事件while 1:    print("wating...")    events=sel.select()   #  监听    [(key1,mask1),(key2,mask2)]    for key,mask in events:        # print(key.fileobj)    # conn        # print(key.data)       # read        func=key.data        obj=key.fileobj        func(obj,mask)  # 1 accept(sock,mask)    # 2 read(conn,mask)

 

转载于:https://www.cnblogs.com/adamans/articles/6841446.html

你可能感兴趣的文章
C# Enum,Int,String的互相转换
查看>>
【原创】自动结束进程脚本
查看>>
组合条件查询+分页
查看>>
一个解决python抓取中文网页乱码的办法
查看>>
URAL1519 Formula 1
查看>>
UVa 1328 Period
查看>>
【原创】(AMD)JavaScript模块化开发(dojo)
查看>>
对列表的操作
查看>>
Reading papers_3(与mean shift相关,ing...)
查看>>
MPMoviePlayerController播放视频
查看>>
ngix匹配规则
查看>>
安装MikTex Portable
查看>>
Visio 2007/2010 左侧"形状"窗口管理
查看>>
设计模式的一些杂谈与反思---functionn和signals
查看>>
Bootstrap V3使用Tab标签
查看>>
重排序、hb、ConcurrentHashMap弱一致性(jdk1.6)
查看>>
查看公司工商注册信息(转载)
查看>>
Halcon算子翻译——dev_set_line_width
查看>>
Webpack 学习记录-02
查看>>
ie兼容
查看>>