Source code for event_aug.spike_encoding

from typing import Union

import cv2
import numpy as np
import torch

from event_aug.utils import array_to_video


[docs]def rate_code( data: Union[np.ndarray, torch.Tensor], n_steps: int = 1, gain: int = 1 ) -> np.ndarray: """ Converts array of continuous input data to spikes using rate coding. Input array is normalized to [0, 1] and the resulting elements are treated as probabilities for Bernoulli trials. Parameters ---------- data: np.ndarray or torch.Tensor 2D array of continuous input data. 3D array in case of time-series data (shape: T x H x W). n_steps: int Number of time-steps to perform rate coding for. Only used for 2D input data. In case of time series data, rate coding is performed for each time-step. gain: int Factor by which to scale normalized input features. Returns ------- np.ndarray 2D array of spikes. 3D array in case of time-series data. """ data = data / np.linalg.norm(data) data = data * gain if not isinstance(data, torch.Tensor): data = torch.from_numpy(data).float() assert len(data.shape) == 2 or len(data.shape) == 3, ( "Input must be a 2D array of probabilities or a 3D array in case of time series" " data" ) if len(data.shape) == 2 and n_steps > 1: new_data = torch.clone(data).unsqueeze(0) data = data.unsqueeze(0) for _ in range(n_steps - 1): new_data = torch.cat((new_data, data)) data = new_data spikes = torch.bernoulli(data) return spikes.numpy()
[docs]def delta_intensity_code_arr( arr: Union[np.ndarray, torch.Tensor], arr_from_file: bool = False, threshold: int = 15, percent_threshold: int = 10, mode: str = "threshold", use_neg_delta: bool = True, exclude_start: bool = False, return_arr: bool = True, save_arr: bool = False, arr_save_path: str = None, save_video: bool = False, video_save_path: str = None, video_fps: int = 25, memory_map: bool = True, ) -> Union[np.ndarray, torch.Tensor]: """ Converts a video data array to spikes. If the difference in the intensity of a pixel in consecutive frames is greater than the threshold, the pixel is assigned an event. Parameters ---------- arr: np.ndarray or torch.Tensor or str Array containing the video data (frames). Should be of shape (T x H x W) or (T x H x W x C). If a string is passed, the array is read from a .npy file. arr_from_file: bool Whether to read the video array from a .npy file. threshold: int Threshold for the difference in intensities of pixels in consecutive frames for assigning events. percent_threshold: int Pixel-wise percentage threshold for the difference in intensities of pixels in consecutive frames for assigning events. mode: str Whether to use 'threshold' or 'percent_threshold'". use_neg_delta: bool Whether to consider decreases in intensity as well along with increases for assigning events. exclude_start: bool Whether to not return the spikes for the first frame which will always be 0 for all pixels. return_arr: bool Whether to return the spikes array. save_arr: bool Whether to save the spikes array to a .npy file. arr_save_path: str Name of the .npy file to save the spikes array to (excluding extension). save_video: bool Whether to save the spikes as a video (.mp4). video_save_path: str Name of the .mp4 file to save the spikes video to. video_fps: int Frames per second of the video. memory_map : bool Whether to use memory mapping to read the spikes video array. Returns ------- np.ndarray or torch.Tensor Array containing the spikes. """ if arr_from_file is True: assert type(arr) == str, "Spikes array must be a path to a .npy file" assert arr.endswith(".npy"), "Spikes array must be a .npy file" if memory_map is True: arr = np.load(arr, mmap_mode="r") else: arr = np.load(arr) assert ( len(arr.shape) == 3 or len(arr.shape) == 4 ), "Input must be an array with shape (T x H x W) or (T x H x W x C)" assert ( threshold is not None or percent_threshold is not None ), "Either threshold or percent_threshold must be specified" mode = mode.lower() assert mode in [ "threshold", "percent_threshold", ], "Mode must be either 'threshold' or 'percent_threshold'" if save_arr is True: assert ( arr_save_path is not None ), "save_path must be specified if save_arr is True" assert arr_save_path.endswith( ".npy" ), "arr_save_path must be without and extension or end with .npy" if save_video is True: assert ( video_save_path is not None ), "video_save_path must be specified if save_video is True" assert video_save_path.endswith( ".mp4" ), "video_save_path must be without and extension or end with .mp4" if isinstance(arr, torch.Tensor): arr = arr.numpy() torch_inp_arr = True else: torch_inp_arr = False multi_channel = len(arr.shape) == 4 T = arr.shape[0] H, W = arr.shape[1], arr.shape[2] spikes = np.zeros((T, H, W), dtype="bool") for i in range(1, arr.shape[0]): prev_frame, curr_frame = np.float32(arr[i - 1]), np.float32( arr[i] ) # np.int16(arr[i - 1]), np.int16(arr[i]) if multi_channel: curr_frame = cv2.cvtColor(curr_frame, cv2.COLOR_RGB2GRAY) prev_frame = cv2.cvtColor(prev_frame, cv2.COLOR_RGB2GRAY) intensity_delta = curr_frame - prev_frame if use_neg_delta: intensity_delta = abs(intensity_delta) if mode == "threshold": spikes[i][intensity_delta > threshold] = 1 else: percent_frame_delta = prev_frame * percent_threshold / 100 spikes[i][intensity_delta > percent_frame_delta] = 1 if exclude_start: return spikes[1:] if save_arr is True: np.save(arr_save_path, spikes) if save_video is True: array_to_video(spikes.astype(np.uint8) * 255, video_save_path, fps=video_fps) if return_arr is True: if torch_inp_arr is True: return torch.from_numpy(spikes) return spikes
[docs]def delta_intensity_code_file( video_path: str, threshold: int = 15, percent_threshold: int = 10, mode: str = "threshold", out_fps: int = None, use_neg_delta: bool = True, exclude_start: bool = False, return_arr: bool = False, save_video: bool = True, video_save_path: str = None, save_arr: bool = False, arr_save_path: str = None, ) -> None: """ Reads a video file, convert the video to spiking form and saves to a file. If the difference in the intensity of a pixel in consecutive frames is greater than the threshold, the pixel is assigned an event. Parameters ---------- video_path: str Path to the input video file. threshold: int Threshold for the difference in intensities of pixels in consecutive frames for assigning events. percent_threshold: int Pixel-wise percentage threshold for the difference in intensities of pixels in consecutive frames for assigning events. mode: str Whether to use 'threshold' or 'percent_threshold'". out_fps: int Output video frame rate. use_neg_delta: bool Whether to consider decreases in intensity as well along with increases for assigning events. exclude_start: bool Whether to not return the spikes for the first frame which will always be 1 for all pixels. return_arr: bool Whether to return an array containing frame-wise spikes. save_video: bool Whether to save the frame-wise spikes as a video (.mp4). video_save_path: str Name of the .mp4 file to save the spikes video to. save_arr: bool Whether to save the spikes array to a .npy file. arr_save_path: str Name of the .npy file to save the spikes array to (excluding extension). """ if save_video is True: assert ( video_save_path is not None ), "video_save_path must be specified if save_video is True" assert video_save_path.endswith(".mp4"), "Output video file must be .mp4" if save_arr is True: assert ( arr_save_path is not None ), "save_path must be specified if save_arr is True" assert arr_save_path.endswith(".npy"), "Output spikes file must be .npy" assert ( threshold is not None or percent_threshold is not None ), "Either threshold or percent_threshold must be specified" mode = mode.lower() assert mode in [ "threshold", "percent_threshold", ], "Mode must be either 'threshold' or 'percent_threshold'" vid = cv2.VideoCapture(video_path) W, H = int(vid.get(cv2.CAP_PROP_FRAME_WIDTH)), int(vid.get(cv2.CAP_PROP_FRAME_HEIGHT)) fps = int(vid.get(cv2.CAP_PROP_FPS)) if out_fps is None: out_fps = fps n_frames = int(vid.get(cv2.CAP_PROP_FRAME_COUNT)) print( f"Frame width: {W}, Frame height: {H}, Input FPS: {fps}, Output FPS: {out_fps}," f" Number of frames: {n_frames}" ) if return_arr is True or save_arr is True: spikes_arr = np.zeros((n_frames, H, W), dtype="bool") if save_video is True: out_vid = cv2.VideoWriter( video_save_path, cv2.VideoWriter_fourcc(*"mp4v"), out_fps, (W, H) ) prev_frame = np.ones((H, W), dtype=np.int16) * 255 i = 0 while True: ret, frame = vid.read() if ret is False: break frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) frame = frame.astype(np.int16) # frame.astype(np.float32) if exclude_start and i == 0: prev_frame = frame i += 1 continue delta = frame - prev_frame prev_frame = frame if use_neg_delta is True: delta = abs(delta) if mode == "threshold": delta[delta > threshold] = 255 delta[delta <= threshold] = 0 else: percent_frame_delta = prev_frame * percent_threshold / 100 delta[delta > percent_frame_delta] = 255 delta[delta <= percent_frame_delta] = 0 delta = delta.astype(np.uint8) if return_arr is True or save_arr is True: spikes_arr[i] = (delta == 255).astype("bool") if save_video is True: delta = cv2.cvtColor(delta, cv2.COLOR_GRAY2BGR) out_vid.write(delta) i += 1 vid.release() if save_video is True: out_vid.release() if save_arr is True: np.save(arr_save_path, spikes_arr) if return_arr is True: return spikes_arr
[docs]def delta_intensity_code_video( video_path: str = None, video_arr: Union[np.ndarray, torch.Tensor] = None, threshold: int = 15, percent_threshold: int = 10, mode: str = "threshold", use_neg_delta: bool = True, out_fps: int = None, exclude_start: bool = True, return_arr: bool = False, save_video: bool = True, video_save_path: str = None, save_arr: bool = False, arr_save_path: str = None, ) -> Union[np.ndarray, torch.Tensor, None]: """ Converts a video to spiking form. If the difference in the intensity of a pixel in consecutive frames is greater than the threshold, the pixel is assigned an event. Works with either a video file or a video array. Parameters ---------- video_path: str Path to the input video file. video_arr: np.ndarray or torch.Tensor Array containing the video data (frames). Should be of shape (T x H x W) or (T x H x W x C). threshold: int Threshold for the difference in intensities of pixels in consecutive frames for assigning events. percent_threshold: int Pixel-wise percentage threshold for the difference in intensities of pixels in consecutive frames for assigning events. mode: str Whether to use 'threshold' or 'percent_threshold'". use_neg_delta: bool Whether to consider decreases in intensity as well along with increases for assigning events. out_fps: int Output video frame rate (required if input video is a file). exclude_start: bool Whether to not return the spikes for the first frame which will always be 0 for all pixels (required if input video is a file). return_arr: bool Whether to return an array containing frame-wise spikes. save_video: bool Whether to save the frame-wise spikes as a video (.mp4). video_save_path: str Name of the .mp4 file to save the spikes video to. save_arr: bool Whether to save the spikes array to a .npy file. arr_save_path: str Name of the .npy file to save the spikes array to (excluding extension). """ assert ( video_path is not None or video_arr is not None ), "Either video_path or video_arr must be provided" if video_path is not None: spikes = delta_intensity_code_file( video_path=video_path, threshold=threshold, percent_threshold=percent_threshold, mode=mode, out_fps=out_fps, use_neg_delta=use_neg_delta, exclude_start=exclude_start, return_arr=return_arr, save_video=save_video, video_save_path=video_save_path, save_arr=save_arr, arr_save_path=arr_save_path, ) return spikes else: spikes = delta_intensity_code_arr( arr=video_arr, threshold=threshold, percent_threshold=percent_threshold, mode=mode, use_neg_delta=use_neg_delta, exclude_start=exclude_start, return_arr=return_arr, save_arr=save_arr, arr_save_path=arr_save_path, save_video=save_video, video_save_path=video_save_path, video_fps=out_fps, ) return spikes