Source code for video

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from __future__ import division
from __future__ import print_function

import cv2
import time
from threading import Thread


  from queue import Queue

except ImportError:

  from Queue import Queue

from NumPyNet.image import Image
from NumPyNet.exception import VideoError

__author__ = ['Mattia Ceccarelli', 'Nico Curti']
__email__ = ['', '']

[docs]class VideoCapture (object): ''' OpenCV VideoCapture wrap in detached thread. Parameters ---------- cam_index : integer or str Filename or cam index queue_size : int Integer of maximum number of frame to store into the queue Example ------- >>> cap = VideoCapture() >>> time.sleep(.1) >>> >>> cv2.namedWindow('Camera', cv2.WINDOW_NORMAL) >>> >>> cap.start() >>> >>> while cap.running(): >>> >>> frame = >>>'Camera', ms=1) >>> print('FPS: {:.3f}'.format(cap.fps)) >>> >>> cap.stop() >>> >>> cv2.destroyAllWindows() Notes ----- The object is inspired to the ImUtils implementation. References ---------- - ''' def __init__ (self, cam_index=0, queue_size=128): self._stream = cv2.VideoCapture(cam_index) if self._stream is None or not self._stream.isOpened(): raise VideoError('Can not open or find camera. Given: {}'.format(cam_index)) self._queue = Queue(maxsize=queue_size) self._thread = Thread(target=self._update, args=()) self._thread.daemon = True self._num_frames = 0 self._start = None self._end = None self._stopped = False
[docs] def start (self): ''' Start the video capture in thread ''' self._thread.start() return self
def _update (self): ''' Infinite loop of frame reading. Each frame is inserted into the private queue. ''' self._start = time.time() while not self._stopped: if not self._queue.full(): (grabbed, frame) = if not grabbed: self._stopped = True else: self._num_frames += 1 self._queue.put(frame) else: time.sleep(.1) self._stream.release()
[docs] def read (self): ''' Get a frame as Image object Returns ------- im : Image obj The loaded image ''' im = Image() return im.from_frame(self._queue.get())
[docs] def running (self): ''' Check if new frames are available Returns ------- running : bool True if there are data into the queue, False otherwise ''' tries = 0 while self._queue.qsize() == 0 and not self._stopped and tries < 5: time.sleep(.1) tries += 1 return self._queue.qsize() > 0
[docs] def stop (self): ''' Stop the thread ''' self._stopped = True self._thread.join() self._end = time.time()
@property def elapsed (self): ''' Get the elapsed time from start to up to now Returns ------- elapsed : float Elapsed time ''' return time.time() - self._start @property def fps (self): ''' Get the frame per seconds Returns ------- fps : float Frame per seconds ''' return self._num_frames / self.elapsed
if __name__ == '__main__': cap = VideoCapture() time.sleep(.1) cv2.namedWindow('Camera', cv2.WINDOW_NORMAL) cap.start() while cap.running(): frame ='Camera', ms=1) print('FPS: {:.3f}'.format(cap.fps)) cap.stop() cv2.destroyAllWindows()