如何使用线程本地数据

实际案例:

实现了一个web视频监控服务器,服务端采集摄像头数据,客户端使用浏览器通过http请求接收数据,服务器使用推送的方式(multipart/x-mixed-replace)一直使用一个tcp连接向客户端传递数据,这种方式将持续占用一个线程,导致单线程服务器无法处理多客户端请求

改写程序,在每个吕处理一个客户端请求,支持多客户端访问

importos, cv2, time,struct, threading

fromhttp.serverimportHTTPServer, HTTPRequestHandler

fromsocketserverimportTCPServer,ThreadingTCPServer

fromthreadingimportThread, RLock

fromselectimportselect

classJpegStreamer(Thread):#负责采集数据,独立线程,相当于数据源

def__init__(self,camera):

Thread.__init__(self)

self.cap =cv2.VideoCapture(camera)

self.lock = RLock()

self.pipes = {}

defregister(self):

pr, pw = os.pipe()

self.pipes[pr] = pw

returnpr

defunregister(self,pr):

os.close(pr)

os.close(pw)

defcapture(self):

cap =self.cap

whilecap.isOpened():

ret, = cap.read()

ifret:

#ret, data =cv2.imencode('.jpg', )

ret, data = cv2.imencode('.jpg', , (cv2.IMWRITE_JPEG_QUALITY,40))

yielddata.tostring()

defsend(self, ):

n = struct.pack('l', len( ))

iflen(self.pipes):

forpipeinpipes:

os.write(pipe, n)

os.write(pipe, )

defrun(self):

for inself.capture():

self.send( )

classJpegRetriever( ):#从streamer中获取数据

def__init__(self,streamer):

self.streamer =streamer

self.local =threading.local()

defretrieve(self):

whileTrue:

n = struct.unpack('l', ns)[]

yielddata

def__enter__(self):

ifhasattr(self.local,'pipe'):

raiseRuntimeError()

returnself.retrieve()

def__exit__(self,*args):

returnTrue

classHandler( HTTPRequestHandler):#处理http请求

retriever =None

@staticmethod

defsetJpegRetriever(retriever):

Handler.retriever = retriever

defdo_GET(self):

ifself.retrieverisNone:

raiseRuntimeError('no retriver')

ifself.path !='/':

return

self.send_response(200)

self.send_header('Content-type','multipart/x-mixed-replace;boundary=abcde')

self.end_headers()

withself.retrieveras s:

for in s:

self.send_ ( )

defsend_ (self, ):

s ='--abcde '

s +='Content-Type:image/jpeg '

s +='Content-Length:%s '% len( )

if__name__ =='__main__':

streamer = JpegStreamer()

streamer.start()

retriever = JpegRetriever(streamer)

Handler.setJpegRetriever(retriever)

print('Startserver...')

httpd = ThreadingTCPServer(('',9000), Handler)

httpd.serve_forever()

收藏 打印