#!/usr/bin/python3
import glob
import os
import time
from functools import partial
from multiprocessing import Pool, cpu_count
from shutil import copy2, rmtree
from typing import List, Tuple
import cv2
import numpy as np
from .utils.img_processing_utils import mkdir_s
[docs]def split_img_overlay(
img: np.ndarray,
size_x: int = 128,
size_y: int = 128,
step_x: int = 128,
step_y: int = 128,
) -> Tuple[List[np.ndarray], int, int]:
"""Split image to parts (little images) with possible overlay.
Parameters
----------
img: np.ndarray
input image array
size_x: int, optional
width for image part (deafult is `128`).
size_y: int, optional
height for image part (deafult is `128`).
step_x: int, optional
width overlay for image part (deafult is `128`).
step_y: int, optional
height overlay for image part (deafult is `128`).
Returns
-------
Tuple[List[numpy.ndarray], int, int]
list of numpy arrays
border value along width
border value along height
Note
----
Walk through the whole image by the window of size size_x * size_y
with step step_x, step_y and save all parts in list.
If the image sizes are not multiples of the window sizes,
the image will be complemented by a frame of suitable size.
If step_x, step_y are not equal to size_x, size_y, parts overlay
each other, or have spaces between each other.
Example
-------
unagi.dataset.split_img_overlay(img_name, 128, 128, 128, 128)
"""
max_y, max_x = img.shape[:2]
border_y = 0
if max_y % size_y != 0:
border_y = (size_y - (max_y % size_y) + 1) // 2
img = cv2.copyMakeBorder(
img, border_y, border_y, 0, 0, cv2.BORDER_CONSTANT, value=[255, 255, 255]
)
max_y = img.shape[0]
border_x = 0
if max_x % size_x != 0:
border_x = (size_x - (max_x % size_x) + 1) // 2
img = cv2.copyMakeBorder(
img, 0, 0, border_x, border_x, cv2.BORDER_CONSTANT, value=[255, 255, 255]
)
max_x = img.shape[1]
parts = []
curr_y = 0
while (curr_y + size_y) <= max_y:
curr_x = 0
while (curr_x + size_x) <= max_x:
parts.append(img[curr_y: curr_y + size_y, curr_x: curr_x + size_x]) # noqa
curr_x += step_x
curr_y += step_y
return parts, border_y, border_x
[docs]def save_imgs(
imgs_in: List[np.ndarray], imgs_gt: List[np.ndarray], fname_in: str
) -> None:
"""Save image parts to one folder.
Save all image parts to folder with name '(original image name) + _parts'.
Parameters
----------
imgs_in: List[np.ndarray]
list of input image arrays
imgs_gt: List[np.ndarray]
list of gt image arrays
fname_in: str
original full image
Returns
-------
None
Example
-------
unagi.dataset.save_imgs(in_img_list, gt_img_list, in_img)
"""
dname = os.path.join(fname_in[: fname_in.rfind("_in")] + "_parts")
mkdir_s(dname)
for i, img in enumerate(imgs_in):
cv2.imwrite(os.path.join(dname, str(i) + "_in.png"), img)
for i, img in enumerate(imgs_gt):
cv2.imwrite(os.path.join(dname, str(i) + "_gt.png"), img)
[docs]def process_img(
fname_in: str,
size_x: int = 128,
size_y: int = 128,
step_x: int = 128,
step_y: int = 128,
) -> None:
"""Read train and ground_truth images, split them and save.
Parameters
----------
fname_in: str,
input image name
size_x: int
width for image part (deafult is `128`).
size_y: int
height for image part (deafult is `128`).
step_x: int
width overlay for image part (deafult is `128`).
step_y: int
height overlay for image part (deafult is `128`).
Returns
-------
None
See Also
--------
split_img_overlay(), save_imgs()
Example
-------
unagi.dataset.process_img(img_name, 128, 128, 128, 128)
"""
img_in = cv2.cvtColor(cv2.imread(fname_in), cv2.COLOR_BGR2GRAY)
parts_in, _, _ = split_img_overlay(img_in, size_x, size_y, step_x, step_y)
img_gt = cv2.cvtColor(
cv2.imread(fname_in.replace("_in", "_gt")), cv2.COLOR_BGR2GRAY
)
parts_gt, _, _ = split_img_overlay(img_gt, size_x, size_y, step_x, step_y)
save_imgs(parts_in, parts_gt, fname_in)
[docs]def shuffle_imgs(dname: str) -> None:
"""Shuffle input and ground-truth images.
(actual, if You are using different datasets as one).
Parameters
----------
dname: str
directory name with image to shuffle
Returns
-------
None
Example
-------
unagi.dataset.shuffle_imgs(images_dir)
"""
dir_in = os.path.join(dname, "in")
dir_gt = os.path.join(dname, "gt")
n = len(os.listdir(dir_in))
np.random.seed()
for i in range(n):
j = i
while j == i:
j = np.random.randint(0, n)
os.rename(
os.path.join(dir_in, str(i) + "_in.png"), os.path.join(dir_in, "tmp_in.png")
)
os.rename(
os.path.join(dir_in, str(j) + "_in.png"),
os.path.join(dir_in, str(i) + "_in.png"),
)
os.rename(
os.path.join(dir_in, "tmp_in.png"), os.path.join(dir_in, str(j) + "_in.png")
)
os.rename(
os.path.join(dir_gt, str(i) + "_gt.png"), os.path.join(dir_gt, "tmp_gt.png")
)
os.rename(
os.path.join(dir_gt, str(j) + "_gt.png"),
os.path.join(dir_gt, str(i) + "_gt.png"),
)
os.rename(
os.path.join(dir_gt, "tmp_gt.png"), os.path.join(dir_gt, str(j) + "_gt.png")
)
[docs]def main(
input_path: str = os.path.join(".", "input"),
output_path: str = os.path.join(".", "output"),
shuffle: bool = True,
size_x: int = 128,
size_y: int = 128,
step_x: int = 128,
step_y: int = 128,
processes: int = cpu_count(),
) -> None:
"""Create train and ground-truth images suitable for unagi training.
Parameters
----------
input_path: str, optional
path to input images (default is os.path.join(".", "input"))
output_path: str, optional
path to created images (default is os.path.join(".", "output"))
shuffle: bool, optional
shuffle the newly created images (default is True)
size_x: int, optional
width for image part (deafult is `128`).
size_y: int, optional
height for image part (deafult is `128`).
step_x: int, optional
width overlay for image part (deafult is `128`).
step_y: int, optional
height overlay for image part (deafult is `128`).
processes: int, optional
number of cpu cores to use (default is cpu_count()
Returns
-------
None
See Also
--------
process_img(), shuffle_imgs(), unagi.utils.img_processing_utils.mkdir_s()
Note
----
All train image names should end with "_in" like "1_in.png".
All ground-truth image should end with "_gt" like "1_gt.png".
If for some image there is only train or ground-truth version,
script fails.
After script finishes, in the output directory there will be
two subdirectories: "in" with train images and
"gt" with ground-truth images.
Example
-------
unagi.dataset.main(in_img_path, out_imgs_path, 128, 128, 128, 128)
"""
start_time = time.time()
fnames_in = list(
glob.iglob(os.path.join(input_path, "**", "*_in.*"), recursive=True)
)
f = partial(process_img, size_x=size_x, size_y=size_y, step_x=step_x, step_y=step_y)
p = Pool(processes)
try:
p.map(f, fnames_in)
finally:
p.close()
mkdir_s(os.path.join(output_path))
mkdir_s(os.path.join(output_path, "in"))
mkdir_s(os.path.join(output_path, "gt"))
for i, fname in enumerate(
glob.iglob(os.path.join(input_path, "**", "*_parts", "*_in.*"), recursive=True)
):
copy2(os.path.join(fname), os.path.join(output_path, "in", str(i) + "_in.png"))
copy2(
os.path.join(fname.replace("_in", "_gt")),
os.path.join(output_path, "gt", str(i) + "_gt.png"),
)
for dname in glob.iglob(os.path.join(input_path, "**", "*_parts"), recursive=True):
rmtree(dname)
if shuffle:
shuffle_imgs(output_path)
print("finished in {0:.2f} seconds".format(time.time() - start_time))
if __name__ == "__main__":
main()