之前尝试过很多种在网页上实现实时检测视频的方法:
一种是使用python java结合,python检测完视频后对结果视频进行推流,然后java端播放视频流。但是python推流经常出问题,rtmp-nginx搭建麻烦。而且不能自由的使用检测。
还有一种使用tensorflowjs的方法,但是这种方法不安全
所以现在决定用Flask框架,说是框架,其实在我看来也就是一个app.py文件将前后端结合起来,也是第一次接触这个框架但是不难。
github项目地址
运行方法:
首先你需要在你的环境中安装Flask库,我的python版本为3.6 。我加了百度的源安装会快很多
pip install flask -i https://mirror.baidu.com/pypi/simple
然后口罩检测是调用的百度的口罩检测模型,所以还需要下载一下他家相关的包,但是模型不用下载,在线调用的
pip install paddlepaddle -i https://mirror.baidu.com/pypi/simple
pip install paddlehub -i https://mirror.baidu.com/pypi/simple
然后是opencv和numpy
pip install opencv-python -i https://mirror.baidu.com/pypi/simple
pip install numpy -i https://mirror.baidu.com/pypi/simple
接着你就可以运行项目里的app.py文件,会出现以下信息,点击Running on后面的链接打开浏览器即可
结果,这里只实现了一个检测视频功能所以页面很简单:
下面来说说代码,代码非原创,参照别人的博客改的,找不到原博主的链接在哪里了,非常抱歉:
首先是获取摄像头部分,
base_camera.py 这是原博主的代码我还没细看,但是需要用到
import time
import threading
try:
from greenlet import getcurrent as get_ident
except ImportError:
try:
from thread import get_ident
except ImportError:
from _thread import get_ident
class CameraEvent(object):
"""An Event-like class that signals all active clients when a new frame is available. """
def __init__(self):
self.events = {
}
def wait(self):
"""Invoked from each client's thread to wait for the next frame."""
ident = get_ident()
if ident not in self.events:
# this is a new client
# add an entry for it in the self.events dict
# each entry has two elements, a threading.Event() and a timestamp
self.events[ident] = [threading.Event(), time.time()]
return self.events[ident][0].wait()
def set(self):
"""Invoked by the camera thread when a new frame is available."""
now = time.time()
remove = None
for ident, event in self.events.items():
if not event[0].isSet():
# if this client's event is not set, then set it
# also update the last set timestamp to now
event[0].set()
event[1] = now
else:
# if the client's event is already set, it means the client
# did not process a previous frame
# if the event stays set for more than 5 seconds, then assume
# the client is gone and remove it
if now - event[1] > 5:
remove = ident
if remove:
del self.events[remove]
def clear(self):
"""Invoked from each client's thread after a frame was processed."""
self.events[get_ident()][0].clear()
class BaseCamera(object):
thread = None # background thread that reads frames from camera
frame = None # current frame is stored here by background thread
last_access = 0 # time of last client access to the camera
event = CameraEvent()
def __init__(self):
"""Start the background camera thread if it isn't running yet."""
if BaseCamera.thread is None:
BaseCamera.last_access = time.time()
# start background frame thread
BaseCamera.thread = threading.Thread(target=self._thread)
BaseCamera.thread.start()
# wait until frames are available
while self.get_frame() is None:
time.sleep(0)
def get_frame(self):
"""Return the current camera frame."""
BaseCamera.last_access = time.time()
# wait for a signal from the camera thread
BaseCamera.event.wait()
BaseCamera.event.clear()
return BaseCamera.frame
@staticmethod
def frames():
""""Generator that returns frames from the camera."""
raise RuntimeError('Must be implemented by subclasses.')
@classmethod
def _thread(cls):
"""Camera background thread."""
print('Starting camera thread.')
frames_iterator = cls.frames()
for frame in frames_iterator:
BaseCamera.frame = frame
BaseCamera.event.set() # send signal to clients
time.sleep(0)
# if there hasn't been any clients asking for frames in
# the last 10 seconds then stop the thread
if time.time() - BaseCamera.last_access > 10:
frames_iterator.close()
print('Stopping camera thread due to inactivity.')
break
BaseCamera.thread = None
camera_opencv.py 这是我根据原博主代码改的,可以获取视频然后进行口罩检测并将结果绘制在视频画面上,这里的绘制主要用到了yield()函数
import os
import cv2
from base_camera import BaseCamera
class Camera(BaseCamera):
video_source = 0
def __init__(self):
if os.environ.get('OPENCV_CAMERA_SOURCE'):
Camera.set_video_source(int(os.environ['OPENCV_CAMERA_SOURCE']))
super(Camera, self).__init__()
@staticmethod
def set_video_source(source):
Camera.video_source = source
@staticmethod
def frames():
camera = cv2.VideoCapture(Camera.video_source)
if not camera.isOpened():
raise RuntimeError('Could not start camera.')
while True:
# read current frame
_, img = camera.read()
from function.mask import mask_detection
img=mask_detection(frame=img)
# encode as a jpeg image and return it
yield cv2.imencode('.jpg', img)[1].tobytes()
mask.py 这是自己写的检测口罩的代码,因为需要在画面上绘制中文所以还要加载一个字体文件msyh.ttf
import paddlehub as hub
import numpy as np
from PIL import Image, ImageDraw, ImageFont #与cv2 进行转换PIl可以显示汉字cv2不行
import cv2
module = hub.Module(name="pyramidbox_lite_server_mask", version='1.2.0')
camera = cv2.VideoCapture(0)
def mask_detection(frame):
_,frame=camera.read()
frame_copy = frame.copy()
input_dict = {
"data": [frame_copy]}
# 口罩检测
results = module.face_detection(data=input_dict)
for result in results:
labelmask = result['data']['label']
confidence_origin = result['data']['confidence']
confidence = round(confidence_origin, 2)
confidence_desc = str(confidence)
top, right, bottom, left = int(result['data']['top']), int(
result['data']['right']), int(result['data']['bottom']), int(
result['data']['left'])
cv2.rectangle(frame, (left, top), (right, bottom), (255, 255, 6), 5)
print('mask detection is running')
if labelmask == 'NO MASK':
label_cn = '无口罩'
if labelmask == 'MASK':
label_cn = '有口罩'
# 由于opencv无法显示汉字之前使用的方法当照片很小时会报错,此次采用了另一种方法使用PIL进行转换
cv2img = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # cv2和PIL中颜色的hex码的储存顺序不同
pilimg = Image.fromarray(cv2img)
draw = ImageDraw.Draw(pilimg) # 图片上打印
font = ImageFont.truetype("webvideo/function/msyh.ttf", 27, encoding="utf-8") # 参数1:字体文件路径,参数2:字体大小
draw.text((left + 10, bottom), '状态:' + label_cn, (255, 255, 255),
font=font) # 参数1:打印坐标,参数2:文本,参数3:字体颜色,参数4:字体
draw.text((left + 10, bottom + 30), '体温:36.5℃', (255, 255, 255), font=font)
# PIL图片转cv2图片
frame = cv2.cvtColor(np.array(pilimg), cv2.COLOR_RGB2BGR)
# yield cv2.imencode('.jpg', frame)[1].tobytes()
return frame
# print(frame)
app.py 用它来联系前后端
#!/usr/bin/env python
from importlib import import_module
import os
from flask import Flask, render_template, Response
# import camera driver
if os.environ.get('CAMERA'):
Camera = import_module('camera_' + os.environ['CAMERA']).Camera
else:
# from camera import Camera
from camera_opencv import Camera
# from camera_video import Camera
# Raspberry Pi camera module (requires picamera package)
# from camera_pi import Camera
app = Flask(__name__)
@app.route('/')
def index():
"""Video streaming home page."""
return render_template('index.html')
def gen(camera):
"""Video streaming generator function."""
while True:
frame = camera.get_frame()
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n')
@app.route('/video_feed')
def video_feed():
"""Video streaming route. Put this in the src attribute of an img tag."""
return Response(gen(Camera()),
mimetype='multipart/x-mixed-replace; boundary=frame')
if __name__ == '__main__':
app.run(host='0.0.0.0', threaded=True)
index.html 非常简单的网页
<html>
<head>
<title>Video Streaming Demonstration</title>
</head>
<body>
<h1>Video Streaming Demonstration</h1>
<img src="{
{ url_for('video_feed') }}">
</body>
</html>
文章评论