# -*- coding : UTF-8 -*-
"""
:filename: sppas.src.annotations.FaceSights.videosights.py
:author: Brigitte Bigi
:contact: develop@sppas.org
:summary: Data structure to manage the 68 sights on faces of a video.
.. _This file is part of SPPAS: http://www.sppas.org/
..
---------------------------------------------------------------------
___ __ __ __ ___
/ | \ | \ | \ / the automatic
\__ |__/ |__/ |___| \__ annotation and
\ | | | | \ analysis
___/ | | | | ___/ of speech
Copyright (C) 2011-2021 Brigitte Bigi
Laboratoire Parole et Langage, Aix-en-Provence, France
Use of this software is governed by the GNU Public License, version 3.
SPPAS is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
SPPAS is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with SPPAS. If not, see <http://www.gnu.org/licenses/>.
This banner notice must not be removed.
---------------------------------------------------------------------
"""
import os
import logging
import codecs
from sppas.src.config import sppasTypeError
from sppas.src.imgdata import sppasCoords
from sppas.src.videodata import sppasCoordsVideoBuffer
from sppas.src.videodata import sppasCoordsVideoWriter
from .sights import Sights
from .sights import sppasSightsImageWriter
# ---------------------------------------------------------------------------
[docs]class sppasSightsVideoBuffer(sppasCoordsVideoBuffer):
"""A video buffer with lists of coordinates, identifiers and sights.
"""
[docs] def __init__(self,
video=None,
size=-1):
"""Create a new instance.
:param video: (str) The video filename
:param size: (int) Number of images of the buffer or -1 for auto
"""
super(sppasSightsVideoBuffer, self).__init__(video, size=size)
# The list of list of Sights instances() and face identifiers
# By default, the identifier is the face number
self.__sights = list()
self.__ids = list()
self.__init_sights()
# -----------------------------------------------------------------------
def __init_sights(self):
# The list of list of identifiers
self.__ids = list()
# The list of list of sights
self.__sights = list()
for i in range(self.get_buffer_size()):
self.__sights.append(list())
self.__ids.append(list())
# -----------------------------------------------------------------------
[docs] def reset(self):
"""Override. Reset all the info related to the buffer content."""
sppasCoordsVideoBuffer.reset(self)
self.__init_sights()
# -----------------------------------------------------------------------
[docs] def next(self):
"""Override. Fill in the buffer with the next images & reset sights.
"""
ret = sppasCoordsVideoBuffer.next(self)
self.__init_sights()
return ret
# -----------------------------------------------------------------------
[docs] def get_ids(self, buffer_index=None):
"""Return the identifiers of all faces of a given image.
:param buffer_index: (int) Index of the image in the buffer
:return: (list of identifiers)
"""
if buffer_index is not None:
buffer_index = self.check_buffer_index(buffer_index)
return self.__ids[buffer_index]
else:
if len(self.__ids) != self.__len__():
raise ValueError("Identifiers were not properly associated to images of the buffer")
return self.__ids
# -----------------------------------------------------------------------
[docs] def get_sights(self, buffer_index=None):
"""Return the sights of all faces of a given image.
:param buffer_index: (int) Index of the image in the buffer
:return: (list of Sights)
"""
if buffer_index is not None:
buffer_index = self.check_buffer_index(buffer_index)
return self.__sights[buffer_index]
else:
if len(self.__sights) != self.__len__():
raise ValueError("Sights were not properly associated to images of the buffer")
return self.__sights
# -----------------------------------------------------------------------
[docs] def set_ids(self, buffer_index, ids):
"""Set the face identifiers of a given image index.
The number of identifiers must match the number of faces.
:param buffer_index: (int) Index of the image in the buffer
:param ids: (list of identifiers) A list of face identifiers
"""
coords_i = self.get_coordinates(buffer_index)
if isinstance(ids, (list, tuple)) is True:
if len(coords_i) != len(ids):
raise ValueError("Expected {:d} identifiers. Got {:d} instead."
"".format(len(coords_i), len(ids)))
self.__ids[buffer_index] = ids
else:
raise sppasTypeError(type(ids), "(list, tuple)")
# -----------------------------------------------------------------------
[docs] def set_sights(self, buffer_index, sights):
"""Set the sights to a given image index.
The number of sights must match the number of faces.
:param buffer_index: (int) Index of the image in the buffer
:param sights: (list of Sights) Set the list of sights
"""
coords_i = self.get_coordinates(buffer_index)
if isinstance(sights, (list, tuple)) is True:
if len(coords_i) != len(sights):
raise ValueError("Expected {:d} sights. Got {:d} instead."
"".format(len(coords_i), len(sights)))
# Check if all sights items are correct
checked = list()
for c in sights:
if c is None:
c = Sights()
else:
if isinstance(c, Sights) is False:
raise sppasTypeError(c, "Sights")
checked.append(c)
# Set sights and identifiers
self.__sights[buffer_index] = checked
else:
raise sppasTypeError(type(sights), "(list, tuple)")
# -----------------------------------------------------------------------
[docs] def get_sight(self, buffer_index, face_index):
"""Return the sights of a face of a given image.
:param buffer_index: (int) Index of the image in the buffer
:param face_index: (int) Index of the face
:return: (Sights)
"""
buffer_index = self.check_buffer_index(buffer_index)
if 0 <= face_index < len(self.__sights[buffer_index]):
return self.__sights[buffer_index][face_index]
raise ValueError("Invalid face index value.")
# -----------------------------------------------------------------------
[docs] def get_id(self, buffer_index, face_index):
"""Return the identifier of a face of a given image.
:param buffer_index: (int) Index of the image in the buffer
:param face_index: (int) Index of the face
:return: (Sights)
"""
buffer_index = self.check_buffer_index(buffer_index)
if 0 <= face_index < len(self.__ids[buffer_index]):
return self.__ids[buffer_index][face_index]
raise ValueError("Invalid face index value.")
# -----------------------------------------------------------------------
[docs] def set_sight(self, buffer_index, face_index, sight):
"""Set the sights to a face of a given image index.
:param buffer_index: (int) Index of the image in the buffer
:param sight: (Sights) the given sight object
"""
buffer_index = self.check_buffer_index(buffer_index)
if isinstance(sight, Sights):
if face_index < len(self.__sights[buffer_index]):
self.__sights[buffer_index][face_index] = sight
else:
raise ValueError("face index error")
else:
raise sppasTypeError(sight, "Sights")
# -----------------------------------------------------------------------
[docs] def set_id(self, buffer_index, coords_index, identifier):
"""Set the id to coordinate of a given image index.
:param buffer_index: (int) Index of the image in the buffer
:param coords_index: (int) Index of the coordinates for this id
:param identifier: (any) Any relevant information
"""
buffer_index = self.check_buffer_index(buffer_index)
if coords_index < len(self.__ids[buffer_index]):
self.__ids[buffer_index][coords_index] = identifier
else:
raise ValueError("Face index error {}".format(coords_index))
# -----------------------------------------------------------------------
[docs] def set_coordinates(self, buffer_index, coords):
"""Set the coordinates to a given image index.
Override to invalidate the corresponding sights and identifiers.
:param buffer_index: (int) Index of the image in the buffer
:param coords: (list of sppasCoords) Set the list of coords
"""
sppasCoordsVideoBuffer.set_coordinates(self, buffer_index, coords)
self.__sights[buffer_index] = [None for i in range(len(coords))]
self.__ids[buffer_index] = [str(i+1) for i in range(len(coords))]
# -----------------------------------------------------------------------
[docs] def append_coordinate(self, buffer_index, coord):
"""Override. Append the coordinates to a given image index.
:param buffer_index: (int) Index of the image in the buffer
:param coord: (sppasCoords) Append the given coord
:return: (int) Index of the new coordinate
"""
sppasCoordsVideoBuffer.append_coordinate(self, buffer_index, coord)
self.__sights[buffer_index].append(None)
self.__ids[buffer_index].append(str(len(self.__sights)))
return len(self.__sights[buffer_index])-1
# -----------------------------------------------------------------------
[docs] def remove_coordinate(self, buffer_index, coord):
"""Remove the coordinates to a given image index.
Override to remove the sights and the identifier too.
:param buffer_index: (int) Index of the image in the buffer
:param coord: (sppasCoords) Remove the given coord
"""
face_idx = self.index_coordinate(buffer_index, coord)
sppasCoordsVideoBuffer.pop_coordinate(self, buffer_index, face_idx)
self.__sights[buffer_index].pop(face_idx)
self.__ids[buffer_index].pop(face_idx)
# -----------------------------------------------------------------------
[docs] def pop_coordinate(self, buffer_index, coord_index):
"""Remove the coordinates to a given image index.
Override to pop the sights and the identifier too.
:param buffer_index: (int) Index of the image in the buffer
:param coord_index: (int) Pop the given coord
"""
buffer_index = self.check_buffer_index(buffer_index)
sppasCoordsVideoBuffer.pop_coordinate(self, buffer_index, coord_index)
self.__sights[buffer_index].pop(coord_index)
self.__ids[buffer_index].pop(coord_index)
# -----------------------------------------------------------------------
[docs] def get_id_coordinate(self, buffer_index, identifier):
"""Return the coordinate of a given identifier in a given image.
:param buffer_index: (int) Index of the image in the buffer
:param identifier: (int) Identifier to search
:return: (sppasCoords) Coordinates or None
"""
buffer_index = self.check_buffer_index(buffer_index)
if identifier in self.__ids[buffer_index]:
coord_idx = self.__ids[buffer_index].index(identifier)
return self.get_coordinate(buffer_index, coord_idx)
return None
# -----------------------------------------------------------------------
[docs] def get_id_sight(self, buffer_index, identifier):
"""Return the sights of a given identifier in a given image.
:param buffer_index: (int) Index of the image in the buffer
:param identifier: (int) Identifier to search
:return: (sppasCoords) Coordinates or None
"""
buffer_index = self.check_buffer_index(buffer_index)
if identifier in self.__ids[buffer_index]:
coord_idx = self.__ids[buffer_index].index(identifier)
return self.get_sight(buffer_index, coord_idx)
return None
# ---------------------------------------------------------------------------
[docs]class sppasSightsVideoReader(object):
"""Read&create list of coords and sights from a CSV file.
The CSV file must have the following columns:
- frame number
- the index of the coords -- face_id in OpenFace2
- timestamp
- confidence -- face detection
- success -- face detection
- buffer number
- index in the buffer
- x, y, w, h
- average confidence -- face landmark
- success -- face landmark
- n -- number of sights
- x_1 .. x_n
- y_1 .. y_n
- optionally score_1 .. score_n
"""
[docs] def __init__(self, csv_file, separator=";"):
"""Set the list of coords & sights defined in the given file.
:param csv_file: (str) coords&sights from a sppasSightsVideoWriter
:param separator: (char) Columns separator in the CSV file
"""
logging.info("Sights CSV file reader")
self.coords = list()
self.sights = list()
self.ids = list()
with codecs.open(csv_file, "r") as csv:
lines = csv.readlines()
if len(lines) > 0:
for line in lines:
columns = line.split(separator)
# 1st coord = new image
if int(columns[1]) in (0, 1):
self.coords.append(list())
self.sights.append(list())
self.ids.append(list())
# columns[4] is 0=failed, 1=success -- face found or not
if int(columns[4]) == 1 and len(columns) > 8:
# identifier (the face number by default)
name = columns[1]
self.ids[len(self.ids) - 1].append(name)
# coordinates
coord = sppasCoords(int(columns[5]),
int(columns[6]),
int(columns[7]),
int(columns[8]),
float(columns[3]))
self.coords[len(self.coords) - 1].append(coord)
# sights
# columns[11] is the average score of sights or "none"
# columns[12] is 0=failed, 1=success -- sights found or not
if len(columns) > 12 and int(columns[12]) == 1:
# number of sight values
nb = int(columns[13])
s = Sights(nb)
# extract all (x, y, score)
for i in range(14, 14+nb):
x = int(columns[i])
y = int(columns[i+nb])
if len(columns) > (14+(2*nb)):
score = float(columns[i+(2*nb)])
else:
score = None
s.set_sight(i-14, x, y, score)
self.sights[len(self.sights) - 1].append(s)
else:
self.sights[len(self.sights) - 1].append(None)
# ---------------------------------------------------------------------------
[docs]class sppasSightsVideoWriter(sppasCoordsVideoWriter):
"""Write a video and optionally coords/sights into files.
"""
[docs] def __init__(self, image_writer=None):
"""Create a new instance.
"""
super(sppasSightsVideoWriter, self).__init__()
# Override
self._img_writer = sppasSightsImageWriter()
if image_writer is not None:
if isinstance(image_writer, sppasSightsImageWriter) is True:
self._img_writer = image_writer
# new member: associate a color to a person
self.__person_colors = dict()
# -----------------------------------------------------------------------
[docs] def write_coords(self, fd, video_buffer, buffer_idx, idx):
"""Override to write the coords AND sights AND ids into the stream.
- frame number
- the index of the coords -- face_id in OpenFace2
- timestamp
- confidence
- success
- buffer number
- index in the buffer
- x, y, w, h,
- the nb of sights: 68
- the 68 x values
- the 68 y values
- eventually, the 68 confidence scores
:param fd: (Stream) File descriptor, String descriptor, stdout, etc
:param video_buffer: (sppasCoordsVideoBuffer)
:param buffer_idx: (int) Buffer number
:param idx: (int) An integer to write
"""
sep = self._img_writer.get_csv_sep()
# Get the lists stored for the i-th image
coords = video_buffer.get_coordinates(idx)
sights = video_buffer.get_sights(idx)
ids = video_buffer.get_ids(idx)
frame_idx = (buffer_idx * video_buffer.get_buffer_size()) + idx
# Write the coords&sights
if len(coords) == 0:
# the same as base class
fd.write("{:d}{:s}".format(frame_idx + 1, sep))
fd.write("0{:s}".format(sep))
fd.write("{:.3f}{:s}".format(float(frame_idx) / self._fps, sep))
fd.write("none{:s}".format(sep))
fd.write("0{:s}".format(sep))
fd.write("0{:s}0{:s}0{:s}0{:s}".format(sep, sep, sep, sep))
fd.write("{:d}{:s}".format(buffer_idx + 1, sep))
fd.write("{:d}{:s}".format(idx, sep))
fd.write("\n")
else:
# write each coords/sights in a new line
for j in range(len(coords)):
fd.write("{:d}{:s}".format(frame_idx + 1, sep))
if j < len(ids):
fd.write("{}{:s}".format(ids[j], sep))
else:
fd.write("{:d}{:s}".format(j + 1, sep))
fd.write("{:.3f}{:s}".format(float(frame_idx) / self._fps, sep))
fd.write("{:f}{:s}".format(coords[j].get_confidence(), sep))
fd.write("1{:s}".format(sep))
fd.write("{:d}{:s}".format(coords[j].x, sep))
fd.write("{:d}{:s}".format(coords[j].y, sep))
fd.write("{:d}{:s}".format(coords[j].w, sep))
fd.write("{:d}{:s}".format(coords[j].h, sep))
fd.write("{:d}{:s}".format(buffer_idx+1, sep))
fd.write("{:d}{:s}".format(idx, sep))
if j < len(sights):
sppasSightsImageWriter.write_coords(fd, sights[j])
fd.write("\n")
# -----------------------------------------------------------------------
[docs] def write_video(self, video_buffer, out_name, pattern):
"""Save the result in video format.
:param video_buffer: (sppasImage) The image to write
:param out_name: (str) The filename of the output video file
:param pattern: (str) Pattern to add to cropped video filename(s)
:return: list of newly created video file names
"""
new_files = list()
if self._img_writer.options.tag is False:
logging.info("Tag option is not enabled. Nothing to do.")
return new_files
all_colors = self._img_writer.get_colors()
iter_images = video_buffer.__iter__()
for i in range(video_buffer.__len__()):
image = next(iter_images)
b, _ = video_buffer.get_buffer_range()
# Get the list of sights stored for the i-th image
sights = video_buffer.get_sights(i)
coords = video_buffer.get_coordinates(i)
person_ids = video_buffer.get_ids(i)
# Create the sppasVideoWriter() if it wasn't already done.
# An image is required to properly fix the video size.
if self._tag_video_writer is None:
self._tag_video_writer, fn = self.create_video_writer(out_name, image, pattern)
new_files.append(fn)
# fix the colors of these coords and sights
colors = list()
for person_id in person_ids:
if person_id not in self.__person_colors:
# get a new color
idx = len(self.__person_colors)
n = len(all_colors['r'])
# Get the idx-th color
r = all_colors['r'][idx % n]
g = all_colors['g'][idx % n]
b = all_colors['b'][idx % n]
rgb = (r, g, b)
self.__person_colors[person_id] = rgb
# append the color for this person
colors.append(self.__person_colors[person_id])
# Tag&write the image with squares at the coords,
# with circled for sights and a rectangle with the name
img = self._img_writer.tag_image(image, coords, colors)
img = self._img_writer.tag_image(img, sights, colors)
self._text_image(img, coords, person_ids, colors)
self._tag_video_writer.write(img)
return new_files
# -----------------------------------------------------------------------
def _text_image(self, img, coords, texts, colors):
"""Put texts at top of given coords with given colors."""
for coord, text, color in zip(coords, texts, colors):
c = sppasCoords(coord.x, coord.y, coord.w, coord.h // 5)
img.surround_coord(c, color, -4, text)
# -----------------------------------------------------------------------
def _tag_and_crop(self, video_buffer, image, idx, img_name, folder, pattern):
new_files = list()
# Get the list of coordinates stored for the i-th image
coords = video_buffer.get_coordinates(idx)
sights = video_buffer.get_sights(idx)
# Draw the sights on a copy of the original image
img = self._img_writer.tag_image(image, sights)
# Tag and write the image
if self._img_writer.options.tag is True:
# Save the image
out_iname = os.path.join(folder, img_name + self._image_ext)
img.write(out_iname)
new_files.append(out_iname)
# Crop the image and write cropped parts
if self._img_writer.options.crop is True:
# Browse through the coords to crop the image
for j, c in enumerate(coords):
# Create the image filename
iname = img_name + "_" + str(j) + pattern + self._image_ext
out_iname = os.path.join(folder, iname)
# Crop the given image to the coordinates and
# resize only if the option width or height is enabled
img_crop = self._img_writer.crop_and_size_image(img, c)
# Add the image to the folder
img_crop.write(out_iname)
new_files.append(out_iname)
return new_files