Source code for unagi.dataset

#!/usr/bin/python3

import glob
import os
import time
import argparse
from functools import partial
from multiprocessing import Pool, cpu_count
from shutil import copy2, rmtree
from typing import List, Tuple

import cv2
import numpy as np

from unagi.utils.img_processing_utils import ImageUtils


[docs]class ImageProcessor: def __init__( self, size_x: int = 128, size_y: int = 128, step_x: int = 128, step_y: int = 128 ): self.size_x = size_x self.size_y = size_y self.step_x = step_x self.step_y = step_y
[docs] def split_img_overlay(self, img: np.ndarray) -> Tuple[List[np.ndarray], int, int]: """Split image to parts (little images) with possible overlay. Parameters ---------- img: np.ndarray input image array Returns ------- Tuple[List[numpy.ndarray], int, int] list of numpy arrays border value along width border value along height Note ---- Walk through the whole image by the window of size size_x * size_y with step step_x, step_y and save all parts in list. If the image sizes are not multiples of the window sizes, the image will be complemented by a frame of suitable size. If step_x, step_y are not equal to size_x, size_y, parts overlay each other, or have spaces between each other. Example ------- unagi.dataset.ImageProcessor.split_img_overlay(img_name, 128, 128, 128, 128) """ max_y, max_x = img.shape[:2] border_y = 0 if max_y % self.size_y != 0: border_y = (self.size_y - (max_y % self.size_y) + 1) // 2 img = cv2.copyMakeBorder( img, border_y, border_y, 0, 0, cv2.BORDER_CONSTANT, value=[255, 255, 255], ) max_y = img.shape[0] border_x = 0 if max_x % self.size_x != 0: border_x = (self.size_x - (max_x % self.size_x) + 1) // 2 img = cv2.copyMakeBorder( img, 0, 0, border_x, border_x, cv2.BORDER_CONSTANT, value=[255, 255, 255], ) max_x = img.shape[1] parts = [] curr_y = 0 while (curr_y + self.size_y) <= max_y: curr_x = 0 # fmt: off while (curr_x + self.size_x) <= max_x: parts.append( img[curr_y: curr_y + self.size_y, curr_x: curr_x + self.size_x] # fmt: skip # noqa ) curr_x += self.step_x curr_y += self.step_y # fmt: on return parts, border_y, border_x
[docs] def save_imgs( self, imgs_in: List[np.ndarray], imgs_gt: List[np.ndarray], fname_in: str ) -> None: """Save image parts to one folder. Save all image parts to folder with name '(original image name) + _parts'. Parameters ---------- imgs_in: List[np.ndarray] list of input image arrays imgs_gt: List[np.ndarray] list of gt image arrays fname_in: str original full image Returns ------- None Example ------- unagi.dataset.ImageProcessor.save_imgs(in_img_list, gt_img_list, in_img) """ dname = os.path.join(fname_in[: fname_in.rfind("_in")] + "_parts") ImageUtils.mkdir_s(dname) for i, img in enumerate(imgs_in): cv2.imwrite(os.path.join(dname, str(i) + "_in.png"), img) for i, img in enumerate(imgs_gt): cv2.imwrite(os.path.join(dname, str(i) + "_gt.png"), img)
[docs] def process_img( self, fname_in: str, ) -> None: """Read train and ground_truth images, split them and save. Parameters ---------- fname_in: str, input image name Returns ------- None See Also -------- ImageProcessor.split_img_overlay(), ImageProcessor.save_imgs() Example ------- unagi.dataset.ImageProcessor.process_img(img_name, 128, 128, 128, 128) """ img_in = cv2.cvtColor(cv2.imread(fname_in), cv2.COLOR_BGR2GRAY) parts_in, _, _ = self.split_img_overlay(img_in) img_gt = cv2.cvtColor( cv2.imread(fname_in.replace("_in", "_gt")), cv2.COLOR_BGR2GRAY ) parts_gt, _, _ = self.split_img_overlay(img_gt) self.save_imgs(parts_in, parts_gt, fname_in)
# wrapper function to call process_img method on the processor object
[docs]def process_img_wrapper(args: Tuple[ImageProcessor, str]) -> None: processor, fname_in = args return processor.process_img(fname_in)
[docs]def main( input_path: str = os.path.join(".", "input"), output_path: str = os.path.join(".", "output"), shuffle: bool = True, size_x: int = 128, size_y: int = 128, step_x: int = 128, step_y: int = 128, processes: int = cpu_count(), ) -> None: """Create train and ground-truth images suitable for unagi training. Parameters ---------- input_path: str, optional path to input images (default is os.path.join(".", "input")) output_path: str, optional path to created images (default is os.path.join(".", "output")) shuffle: bool, optional shuffle the newly created images (default is True) size_x: int, optional width for image part (deafult is `128`). size_y: int, optional height for image part (deafult is `128`). step_x: int, optional width overlay for image part (deafult is `128`). step_y: int, optional height overlay for image part (deafult is `128`). processes: int, optional number of cpu cores to use (default is cpu_count() Returns ------- None See Also -------- ImageProcessor.process_img(), unagi.utils.img_processing_utils.ImageUtils.shuffle_imgs(), unagi.utils.img_processing_utils.ImageUtils.mkdir_s() Note ---- All train image names should end with "_in" like "1_in.png". All ground-truth image should end with "_gt" like "1_gt.png". If for some image there is only train or ground-truth version, script fails. After script finishes, in the output directory there will be two subdirectories: "in" with train images and "gt" with ground-truth images. Example ------- unagi.dataset.main(in_img_path, out_imgs_path, 128, 128, 128, 128) """ start_time = time.time() fnames_in = list( glob.iglob(os.path.join(input_path, "**", "*_in.*"), recursive=True) ) processor = ImageProcessor( size_x=size_x, size_y=size_y, step_x=step_x, step_y=size_y ) p = Pool(processes) try: p.map(process_img_wrapper, [(processor, fname_in) for fname_in in fnames_in]) finally: p.close() ImageUtils.mkdir_s(os.path.join(output_path)) ImageUtils.mkdir_s(os.path.join(output_path, "in")) ImageUtils.mkdir_s(os.path.join(output_path, "gt")) for i, fname in enumerate( glob.iglob(os.path.join(input_path, "**", "*_parts", "*_in.*"), recursive=True) ): copy2(os.path.join(fname), os.path.join(output_path, "in", str(i) + "_in.png")) copy2( os.path.join(fname.replace("_in", "_gt")), os.path.join(output_path, "gt", str(i) + "_gt.png"), ) for dname in glob.iglob(os.path.join(input_path, "**", "*_parts"), recursive=True): rmtree(dname) if shuffle: ImageUtils.shuffle_imgs(output_path) print("finished in {0:.2f} seconds".format(time.time() - start_time))
# create parse_args() function for main()
[docs]def parse_args() -> argparse.Namespace: """Parse command-line arguments for dataset module.""" parser = argparse.ArgumentParser(description="Create dataset to train unagi model") parser.add_argument("--input_path", type=str, default="./input", help="input path") parser.add_argument( "--output_path", type=str, default="./output", help="output path" ) parser.add_argument("--shuffle", type=bool, default=True, help="shuffle data") parser.add_argument("--size_x", type=int, default=128, help="size x") parser.add_argument("--size_y", type=int, default=128, help="size y") parser.add_argument("--step_x", type=int, default=128, help="step x") parser.add_argument("--step_y", type=int, default=128, help="step y") parser.add_argument("--processes", type=int, default=4, help="number of processes") return parser.parse_args()
if __name__ == "__main__": args = parse_args() main( input_path=args.input_path, output_path=args.output_path, shuffle=args.shuffle, size_x=args.size_x, size_y=args.size_y, step_x=args.step_x, step_y=args.step_y, processes=args.processes, )