import os
import re
import subprocess
from typing import Tuple, Union
import cv2
import numpy as np
[docs]def imgs_to_video(
save_path: str,
img_dir: str = None,
img_arr: np.ndarray = None,
height: int = None,
width: int = None,
out_fps: int = 15,
numbered_imgs: bool = False,
verbose: bool = False,
):
"""
Converts a directory of images or an array / list of image data to a video.
Parameters
----------
save_path : str
Path (.mp4) to the output video file.
img_dir : str
Path to the directory containing the images.
imgs_arr : np.ndarray or list
Array containing the image data. If it's a list, it should contain as many images as number of frames wanted.
If it's a multi-dimensional array, it should be of shape (T x H x W) or (T x H x W x C).
height : int
Height of the images.
width : int
Width of the images.
out_fps : int
Output video frame rate.
numbered_imgs : bool
Whether the image file names are a number sequence.
verbose : bool
Whether to print progress.
"""
assert (
img_dir is not None or img_arr is not None
), "Either img_dir or img_arr must be provided"
if img_dir is not None:
img_list = sorted(os.listdir(img_dir))
if numbered_imgs is True:
img_list = sorted(
img_list, key=lambda x: int(re.findall(r"\d+", x)[0])
) # if len(re.findall(r'\d+', x)) > 0 else 0)
n_images = len(img_list)
if height is None or width is None:
sample_img = cv2.imread(os.path.join(img_dir, img_list[0]))
height, width = sample_img.shape[:2]
else:
n_images = img_arr.shape[0]
height, width = img_arr[0].shape[:2]
is_grayscale = len(img_arr[0].shape) == 2
out_vid = cv2.VideoWriter(
save_path, cv2.VideoWriter_fourcc(*"mp4v"), out_fps, (width, height)
)
print(f"Frame width: {width}, Frame height: {height}, FPS: {out_fps}")
for i in range(n_images):
if img_arr is not None:
img = img_arr[i]
if is_grayscale:
img = img.astype(np.uint8)
img = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR)
else:
img_path = os.path.join(img_dir, img_list[i])
img = cv2.imread(img_path)
if verbose is True:
print(f"Writing {img_path}")
img = img.astype(np.uint8)
out_vid.write(img)
out_vid.release()
[docs]def array_to_video(arr: np.ndarray, save_path: str, fps: int):
"""
Converts an array of image data to a video.
Parameters
----------
arr : np.ndarray
Array containing the image data.
save_path : str
Path to the output video file.
fps : int
Output video frame rate.
"""
imgs_to_video(save_path=save_path, img_arr=arr, out_fps=fps)
[docs]def video_to_array(
video_path: str, grayscale: bool = False, return_fps: bool = False
) -> Union[np.ndarray, Tuple[np.ndarray, int]]:
"""
Converts a video to an array of frames data.
Parameters
----------
video_path : str
Path to the input video file.
grayscale : bool
Whether to convert the frames to grayscale.
return_fps : bool
Whether to return the video frame rate.
Returns
-------
arr : np.ndarray
Array of frames data.
fps : int
Video frame rate.
"""
vid = cv2.VideoCapture(video_path)
W, H = int(vid.get(cv2.CAP_PROP_FRAME_WIDTH)), int(vid.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(vid.get(cv2.CAP_PROP_FPS))
n_frames = int(vid.get(cv2.CAP_PROP_FRAME_COUNT))
print(f"No. of frames: {n_frames}, Frame width: {W}, Frame height: {H}")
if grayscale:
arr = np.zeros((n_frames, H, W), dtype=np.uint8)
else:
arr = np.zeros((n_frames, H, W, 3), dtype=np.uint8)
i = 0
while True:
ret, frame = vid.read()
if ret is False:
break
if grayscale:
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
frame = frame.astype(np.uint8)
arr[i] = frame
i += 1
vid.release()
if return_fps:
return arr, fps
return arr
[docs]def save_video_frames_diffs(
video_path: str, save_path: str, out_fps: int = None, neg_diff: bool = True
):
"""
Saves the differences in intensities between video frames as a video.
Parameters
----------
video_path : str
Path to the input video.
save_path : str
Path to save the output video.
out_fps : int
Output video frame rate.
neg_diff : bool
Whether to consider negative differences.
"""
"""
Saves the pixelwise differences between consecutive frames of a video to a video.
Parameters
----------
video_path : str
Path to the input video file.
save_path : str
Path to the output video file.
out_fps : int
Output video frame rate.
neg_diff : bool
Whether to consider negative differences.
"""
vid = cv2.VideoCapture(video_path)
W, H = int(vid.get(cv2.CAP_PROP_FRAME_WIDTH)), int(vid.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(vid.get(cv2.CAP_PROP_FPS))
if out_fps is None:
out_fps = fps
out_vid = cv2.VideoWriter(save_path, cv2.VideoWriter_fourcc(*"mp4v"), out_fps, (W, H))
prev_frame = np.ones((H, W)) * 255
i = 0
while True:
ret, frame = vid.read()
if ret is False:
break
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
frame = frame.astype(np.float32)
delta = frame - prev_frame
prev_frame = frame
if neg_diff is True:
delta = abs(delta)
else:
delta = np.maximum(delta, 0)
delta = delta * 255 / np.max(delta)
delta = np.uint8(delta)
delta = cv2.cvtColor(delta, cv2.COLOR_GRAY2BGR)
out_vid.write(delta)
i += 1
vid.release()
out_vid.release()
[docs]def resize_video(video_path: str, save_path: str, size: Tuple[int]):
"""
Resizes a video to a given size.
Parameters
----------
video_path : str
Path to the input video.
save_path : str
Path to the output video.
size : Tuple[int]
Size to resize the video to.
"""
assert type(video_path) == str and video_path.endswith(
".mp4"
), "Input video must be in mp4 format"
assert type(save_path) == str and save_path.endswith(
".mp4"
), "Output video must be in mp4 format"
assert type(size) == tuple and len(size) == 2, "Size must be a tuple of length 2"
vid = cv2.VideoCapture(video_path)
fps = int(vid.get(cv2.CAP_PROP_FPS))
out_vid = cv2.VideoWriter(save_path, cv2.VideoWriter_fourcc(*"mp4v"), fps, size)
W, H = int(vid.get(cv2.CAP_PROP_FRAME_WIDTH)), int(vid.get(cv2.CAP_PROP_FRAME_HEIGHT))
print(f"Original frame width: {W}, Original frame height: {H}")
print(f"Resizing to {size[0]}x{size[1]}")
while True:
ret, frame = vid.read()
if ret is False:
break
frame = cv2.resize(frame, size)
out_vid.write(frame)
vid.release()
out_vid.release()
[docs]def download_from_youtube(
urls: Union[Tuple[str], str],
start_times: Union[Tuple[int], str],
end_times: Union[Tuple[int], str],
save_dir: str,
):
"""
Downloads videos from YouTube.
Parameters
----------
urls : Union[Tuple[str], str]
YouTube URLs.
start_times : Union[Tuple[int], str]
Start times to trim the videos.
end_times : Union[Tuple[int], str]
End times to trim the videos.
save_dir : str
Directory to save the videos to.
"""
if isinstance(urls, str):
urls = (urls,)
if isinstance(start_times, int):
start_times = (start_times,)
start_times = [int(time) for time in start_times]
if isinstance(end_times, int):
end_times = (end_times,)
end_times = [int(time) for time in end_times]
assert (
len(urls) == len(start_times) == len(end_times)
), "Length of urls, start_times and end_times must be equal"
os.makedirs(save_dir, exist_ok=True)
for i in range(len(urls)):
print(f"Downloading video {i + 1} of {len(urls)}")
try:
duration = end_times[i] - start_times[i]
command = (
f"ffmpeg $(youtube-dl -g '{urls[i]}' | sed 's/.*/-ss {start_times[i]} -i"
f" &/') -t {duration} -c copy {save_dir}/{i}.mp4"
)
subprocess.call(command, shell=True)
except:
raise Exception(f"Error downloading video {i + 1} of {len(urls)}")