#!/usr/bin/python3
import glob
import os
import time
import argparse
from functools import partial
from multiprocessing import Pool, cpu_count
from shutil import copy2, rmtree
from typing import List, Tuple
import cv2
import numpy as np
from unagi.utils.img_processing_utils import ImageUtils
[docs]class ImageProcessor:
def __init__(
self, size_x: int = 128, size_y: int = 128, step_x: int = 128, step_y: int = 128
):
self.size_x = size_x
self.size_y = size_y
self.step_x = step_x
self.step_y = step_y
[docs] def split_img_overlay(self, img: np.ndarray) -> Tuple[List[np.ndarray], int, int]:
"""Split image to parts (little images) with possible overlay.
Parameters
----------
img: np.ndarray
input image array
Returns
-------
Tuple[List[numpy.ndarray], int, int]
list of numpy arrays
border value along width
border value along height
Note
----
Walk through the whole image by the window of size size_x * size_y
with step step_x, step_y and save all parts in list.
If the image sizes are not multiples of the window sizes,
the image will be complemented by a frame of suitable size.
If step_x, step_y are not equal to size_x, size_y, parts overlay
each other, or have spaces between each other.
Example
-------
unagi.dataset.ImageProcessor.split_img_overlay(img_name, 128, 128, 128, 128)
"""
max_y, max_x = img.shape[:2]
border_y = 0
if max_y % self.size_y != 0:
border_y = (self.size_y - (max_y % self.size_y) + 1) // 2
img = cv2.copyMakeBorder(
img,
border_y,
border_y,
0,
0,
cv2.BORDER_CONSTANT,
value=[255, 255, 255],
)
max_y = img.shape[0]
border_x = 0
if max_x % self.size_x != 0:
border_x = (self.size_x - (max_x % self.size_x) + 1) // 2
img = cv2.copyMakeBorder(
img,
0,
0,
border_x,
border_x,
cv2.BORDER_CONSTANT,
value=[255, 255, 255],
)
max_x = img.shape[1]
parts = []
curr_y = 0
while (curr_y + self.size_y) <= max_y:
curr_x = 0
# fmt: off
while (curr_x + self.size_x) <= max_x:
parts.append(
img[curr_y: curr_y + self.size_y, curr_x: curr_x + self.size_x] # fmt: skip # noqa
)
curr_x += self.step_x
curr_y += self.step_y
# fmt: on
return parts, border_y, border_x
[docs] def save_imgs(
self, imgs_in: List[np.ndarray], imgs_gt: List[np.ndarray], fname_in: str
) -> None:
"""Save image parts to one folder.
Save all image parts to folder with name '(original image name) + _parts'.
Parameters
----------
imgs_in: List[np.ndarray]
list of input image arrays
imgs_gt: List[np.ndarray]
list of gt image arrays
fname_in: str
original full image
Returns
-------
None
Example
-------
unagi.dataset.ImageProcessor.save_imgs(in_img_list, gt_img_list, in_img)
"""
dname = os.path.join(fname_in[: fname_in.rfind("_in")] + "_parts")
ImageUtils.mkdir_s(dname)
for i, img in enumerate(imgs_in):
cv2.imwrite(os.path.join(dname, str(i) + "_in.png"), img)
for i, img in enumerate(imgs_gt):
cv2.imwrite(os.path.join(dname, str(i) + "_gt.png"), img)
[docs] def process_img(
self,
fname_in: str,
) -> None:
"""Read train and ground_truth images, split them and save.
Parameters
----------
fname_in: str,
input image name
Returns
-------
None
See Also
--------
ImageProcessor.split_img_overlay(), ImageProcessor.save_imgs()
Example
-------
unagi.dataset.ImageProcessor.process_img(img_name, 128, 128, 128, 128)
"""
img_in = cv2.cvtColor(cv2.imread(fname_in), cv2.COLOR_BGR2GRAY)
parts_in, _, _ = self.split_img_overlay(img_in)
img_gt = cv2.cvtColor(
cv2.imread(fname_in.replace("_in", "_gt")), cv2.COLOR_BGR2GRAY
)
parts_gt, _, _ = self.split_img_overlay(img_gt)
self.save_imgs(parts_in, parts_gt, fname_in)
# wrapper function to call process_img method on the processor object
[docs]def process_img_wrapper(args: Tuple[ImageProcessor, str]) -> None:
processor, fname_in = args
return processor.process_img(fname_in)
[docs]def main(
input_path: str = os.path.join(".", "input"),
output_path: str = os.path.join(".", "output"),
shuffle: bool = True,
size_x: int = 128,
size_y: int = 128,
step_x: int = 128,
step_y: int = 128,
processes: int = cpu_count(),
) -> None:
"""Create train and ground-truth images suitable for unagi training.
Parameters
----------
input_path: str, optional
path to input images (default is os.path.join(".", "input"))
output_path: str, optional
path to created images (default is os.path.join(".", "output"))
shuffle: bool, optional
shuffle the newly created images (default is True)
size_x: int, optional
width for image part (deafult is `128`).
size_y: int, optional
height for image part (deafult is `128`).
step_x: int, optional
width overlay for image part (deafult is `128`).
step_y: int, optional
height overlay for image part (deafult is `128`).
processes: int, optional
number of cpu cores to use (default is cpu_count()
Returns
-------
None
See Also
--------
ImageProcessor.process_img(),
unagi.utils.img_processing_utils.ImageUtils.shuffle_imgs(),
unagi.utils.img_processing_utils.ImageUtils.mkdir_s()
Note
----
All train image names should end with "_in" like "1_in.png".
All ground-truth image should end with "_gt" like "1_gt.png".
If for some image there is only train or ground-truth version,
script fails.
After script finishes, in the output directory there will be
two subdirectories: "in" with train images and
"gt" with ground-truth images.
Example
-------
unagi.dataset.main(in_img_path, out_imgs_path, 128, 128, 128, 128)
"""
start_time = time.time()
fnames_in = list(
glob.iglob(os.path.join(input_path, "**", "*_in.*"), recursive=True)
)
processor = ImageProcessor(
size_x=size_x, size_y=size_y, step_x=step_x, step_y=size_y
)
p = Pool(processes)
try:
p.map(process_img_wrapper, [(processor, fname_in) for fname_in in fnames_in])
finally:
p.close()
ImageUtils.mkdir_s(os.path.join(output_path))
ImageUtils.mkdir_s(os.path.join(output_path, "in"))
ImageUtils.mkdir_s(os.path.join(output_path, "gt"))
for i, fname in enumerate(
glob.iglob(os.path.join(input_path, "**", "*_parts", "*_in.*"), recursive=True)
):
copy2(os.path.join(fname), os.path.join(output_path, "in", str(i) + "_in.png"))
copy2(
os.path.join(fname.replace("_in", "_gt")),
os.path.join(output_path, "gt", str(i) + "_gt.png"),
)
for dname in glob.iglob(os.path.join(input_path, "**", "*_parts"), recursive=True):
rmtree(dname)
if shuffle:
ImageUtils.shuffle_imgs(output_path)
print("finished in {0:.2f} seconds".format(time.time() - start_time))
# create parse_args() function for main()
[docs]def parse_args() -> argparse.Namespace:
"""Parse command-line arguments for dataset module."""
parser = argparse.ArgumentParser(description="Create dataset to train unagi model")
parser.add_argument("--input_path", type=str, default="./input", help="input path")
parser.add_argument(
"--output_path", type=str, default="./output", help="output path"
)
parser.add_argument("--shuffle", type=bool, default=True, help="shuffle data")
parser.add_argument("--size_x", type=int, default=128, help="size x")
parser.add_argument("--size_y", type=int, default=128, help="size y")
parser.add_argument("--step_x", type=int, default=128, help="step x")
parser.add_argument("--step_y", type=int, default=128, help="step y")
parser.add_argument("--processes", type=int, default=4, help="number of processes")
return parser.parse_args()
if __name__ == "__main__":
args = parse_args()
main(
input_path=args.input_path,
output_path=args.output_path,
shuffle=args.shuffle,
size_x=args.size_x,
size_y=args.size_y,
step_x=args.step_x,
step_y=args.step_y,
processes=args.processes,
)