创建一个“队列”对象import Queueq = Queue.Queue(maxsize = 10)Queue.Queue类即是一个队列的同步实现。队列长度可为无限或者有限。可通过Queue的构造函数的可选参maxsize来设定队列长度。如果maxsize小于1就表示队列长度无限。q.put(10)将一个值放入队列中调用队列对象的put()方法在队尾插入一个项目。put()有两个参数,第一个item为必需的,为插入项目的值;第二个block为可选参数,默认为1。如果队列当前为空且block为1,put()方法就使调用线程暂停,直到空出一个数据单元。如果block为0,put方法将引发Full异常。q.get()将一个值从队列中取出调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为True。如果队列为空且block为True,get()就使调用线程暂停,直至有项目可用。如果队列为空且block为False,队列将引发Empty异常。
q.join()和q.task_done()搭配使用, q.task_done():完成任务,每次从queue中get一个数据之后,当处理好相关问题,最后调用该方法,以提示q.join()是否停止阻塞,让线程向前执行或者退出
import queueq = queue.Queue(5)q.put(10)q.put(20)print(q.get()) #10q.task_done()print(q.get()) #20q.task_done()q.join()print("ending!")
其他常用方法:'''此包中的常用方法(q = Queue.Queue()):q.qsize() 返回队列的大小q.empty() 如果队列为空,返回True,反之Falseq.full() 如果队列满了,返回True,反之Falseq.full 与 maxsize 大小对应q.get([block[, timeout]]) 获取队列,timeout等待时间q.get_nowait() 相当q.get(False)非阻塞 q.put(item) 写入队列,timeout等待时间q.put_nowait(item) 相当q.put(item, False)q.task_done() 在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号q.join() 实际上意味着等到队列为空,再执行别的操作'''
队列的先进先出模式:
import queue#q=queue.Queue(3) # 默认是先进先出(FIFO)# q.put(111)# q.put("hello")# q.put(222)# q.put(223,False)## print(q.get())# # print(q.get())# # print(q.get())# ## q.get(False)
队列的先进后出模式:
# 先进后出模式# q=queue.LifoQueue() # Lifo last in first out# # # q.put(111)# q.put(222)# q.put(333)# # print(q.get())# print(q.get())# print(q.get())
队列的优先级:
# 优先级q=queue.PriorityQueue()q.put([4,"hello4"])q.put([1,"hello"])q.put([2,"hello2"])print(q.get())print(q.get())
生产者消费模型:
#生产者消费者模型import time,randomimport queue,threadingq = queue.Queue()def Producer(name): count = 0 while count <10: print("making........") time.sleep(2) q.put(count) print('Producer %s has produced %s baozi..' %(name, count)) count +=1 #q.task_done() #q.join() print("ok......")def Consumer(name): count = 0 while count <10: time.sleep(1) if not q.empty(): data = q.get() #q.task_done() #q.join() print(data) print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data)) else: print("-----no baozi anymore----") count +=1p1 = threading.Thread(target=Producer, args=('A',))c1 = threading.Thread(target=Consumer, args=('B',))# c2 = threading.Thread(target=Consumer, args=('C',))# c3 = threading.Thread(target=Consumer, args=('D',))p1.start()c1.start()# c2.start()# c3.start()#
selectors 模块:
import selectors # 基于selectors模块实现的IO多路复用,建议大家使用import socketsock=socket.socket()sock.bind(("127.0.0.1",8800))sock.listen(5)sock.setblocking(False)sel=selectors.DefaultSelector() #根据具体平台选择最佳IO多路机制,比如在linux,选择epolldef read(conn,mask): try: data=conn.recv(1024) print(data.decode("UTF8")) data2=input(">>>") conn.send(data2.encode("utf8")) except Exception: sel.unregister(conn)def accept(sock,mask): conn, addr = sock.accept() print("conn",conn) sel.register(conn,selectors.EVENT_READ,read)sel.register(sock,selectors.EVENT_READ,accept) # 注册事件while 1: print("wating...") events=sel.select() # 监听 [(key1,mask1),(key2,mask2)] for key,mask in events: # print(key.fileobj) # conn # print(key.data) # read func=key.data obj=key.fileobj func(obj,mask) # 1 accept(sock,mask) # 2 read(conn,mask)