Source code for visionsim.cli.transforms

from __future__ import annotations

import functools
import os
from pathlib import Path

import numpy as np
import numpy.typing as npt
import OpenEXR  # type: ignore
from rich.progress import Progress, track
from typing_extensions import Iterable, Literal


def _read_exr(path: str | os.PathLike) -> npt.NDArray:
    # imageio and cv2's cannot read an exr file when the data is stored in any other channel than RGB(A)
    # but as of blender 4.x depth maps are correctly saved as single channel exrs, in the V channel.
    with OpenEXR.File(path) as f:
        if len(f.channels()) and list(f.channels().keys())[0] == "RGBA":
            return f.channels()["RGBA"].pixels.transpose(2, 0, 1)
        return np.array([c.pixels for c in f.channels().values()])


def _tonemap_collate(
    batch: Iterable[tuple[int, npt.NDArray, npt.NDArray]], *, hdr_quantile: float = 0.01
) -> tuple[npt.NDArray, npt.NDArray, npt.NDArray, float]:
    """Use default collate function on batch and then tonemap, enabling compute to be done in threads"""
    from visionsim.dataset import default_collate
    from visionsim.utils.color import linearrgb_to_srgb

    idxs, imgs, poses = default_collate(batch)
    high, low = np.quantile(imgs, [1 - hdr_quantile, hdr_quantile])
    imgs = linearrgb_to_srgb(imgs)
    imgs = (np.clip(imgs, 0, 1) * 255).astype(np.uint8)

    return idxs, imgs, poses, high / low


def _estimate_distribution(in_files, transform=None):
    from fastdigest import TDigest

    digest = TDigest()

    for in_file in track(in_files, description="Probing Files..."):
        im = _read_exr(in_file)
        values = transform(im) if transform is not None else im.flatten()
        digest.batch_update(values)
    return digest


[docs] def colorize_depths( input_dir: str | os.PathLike, output_dir: str | os.PathLike, pattern: str = "depth_*.exr", cmap: str = "turbo", ext: str = ".png", vmin: float | None = None, vmax: float | None = None, quantile: float = 0.01, step: int = 1, ): """Convert .exr depth maps into color-coded images for visualization Args: input_dir: directory in which to look for frames output_dir: directory in which to save colorized frames pattern: filenames of frames should match this cmap: which matplotlib colormap to use ext: which format to save colorized frames as vmin: minimum expected depth used to normalize colormap vmax: maximum expected depth used to normalize colormap quantile: if vmin/vmax are None, use this quantile to estimate them step: drop some frames when colorizing, use frames 0+step*n """ # TODO: Multiprocess this # Lazy load imports to improve CLI responsiveness import imageio.v3 as iio import matplotlib as mpl import matplotlib.cm as cm from visionsim.cli import _log, _validate_directories DEPTH_CUTOFF = 10000000000 input_dir, output_dir, in_files = _validate_directories(input_dir, output_dir, pattern) in_files = in_files[::step] def transform_depth(d): # Filter out large depths, this is a render bug in CYCLES # See: https://blender.stackexchange.com/questions/325007 d = d[d < DEPTH_CUTOFF] return d.flatten() if vmin is None or vmax is None: digest = _estimate_distribution(in_files, transform=transform_depth) vmin_, vmax_ = digest.quantile(quantile), digest.quantile(1 - quantile) vmin = vmin_ if vmin is None else vmin vmax = vmax_ if vmax is None else vmax _log.info(f"Found depth range [{vmin_:0.2f}, {vmax_:0.2f}]") _log.info(f"Using depth range [{vmin:0.2f}, {vmax:0.2f}]\n") colormap = getattr(cm, cmap) norm = mpl.colors.Normalize(vmin=vmin, vmax=vmax) for in_file in track(in_files): # Open with imageio, convert to color using matplotlib's cmaps and save as png. depth = _read_exr(in_file) depth[depth >= DEPTH_CUTOFF] = np.nan img = (colormap(norm(depth)) * 255).astype(np.uint8) path = output_dir / Path(in_file).stem iio.imwrite(str(path.with_suffix(ext)), img)
[docs] def colorize_flows( input_dir: str | os.PathLike, output_dir: str | os.PathLike, direction: Literal["forward", "backward"] = "forward", pattern: str = "flow_*.exr", ext: str = ".png", vmax: float | None = None, quantile: float = 0.01, step: int = 1, ): """Convert .exr optical flow maps into color-coded images for visualization Args: input_dir: directory in which to look for frames output_dir: directory in which to save colorized frames direction: direction of flow to colorize pattern: filenames of frames should match this ext: which format to save colorized frames as vmax: maximum expected flow magnitude quantile: if vmax is None, use this quantile to estimate it step: drop some frames when colorizing, use frames 0+step*n """ # TODO: Multiprocess this # Lazy load imports to improve CLI responsiveness import colorsys import imageio.v3 as iio from visionsim.cli import _log, _validate_directories if direction.lower() not in ("forward", "backward"): raise ValueError("Direction needs to be either 'forward' or 'backwards'.") input_dir, output_dir, in_files = _validate_directories(input_dir, output_dir, pattern) in_files = in_files[::step] convert = np.vectorize(colorsys.hsv_to_rgb) def magnitude(flows): fx, fy, bx, by = flows x, y = (fx, fy) if direction.lower() == "forward" else (bx, by) mag = np.sqrt(x**2 + y**2) return mag.flatten() if vmax is None: digest = _estimate_distribution(in_files, transform=magnitude) vmax = digest.quantile(1 - quantile) _log.info(f"Using a maximum magnitude of {vmax:0.2f}\n") for in_file in track(in_files): fx, fy, bx, by = _read_exr(in_file) x, y = (fx, fy) if direction.lower() == "forward" else (bx, by) h = np.arctan2(y, x) / (2 * np.pi) + 0.5 v = np.minimum(np.sqrt(x**2 + y**2) / vmax, 1.0) img = np.stack(convert(h, np.ones_like(h), v), axis=-1) img = (img * 255).astype(np.uint8) path = output_dir / Path(in_file).stem iio.imwrite(str(path.with_suffix(ext)), img)
[docs] def colorize_normals( input_dir: str | os.PathLike, output_dir: str | os.PathLike, pattern: str = "normal_*.exr", ext: str = ".png", step: int = 1, ): """Convert .exr normal maps into color-coded images for visualization Args: input_dir: directory in which to look for frames output_dir: directory in which to save colorized frames pattern: filenames of frames should match this ext: which format to save colorized frames as step: drop some frames when colorizing, use frames 0+step*n """ # TODO: Multiprocess this # Lazy load imports to improve CLI responsiveness import imageio.v3 as iio from visionsim.cli import _validate_directories input_dir, output_dir, in_files = _validate_directories(input_dir, output_dir, pattern) in_files = in_files[::step] for in_file in track(in_files): img = _read_exr(in_file).transpose(1, 2, 0) / 2 + 0.5 img = (img * 255).astype(np.uint8) path = output_dir / Path(in_file).stem iio.imwrite(str(path.with_suffix(ext)), img)
[docs] def colorize_segmentations( input_dir: str | os.PathLike, output_dir: str | os.PathLike, pattern: str = "segmentation_*.exr", ext: str = ".png", num_objects: int | None = None, shuffle: bool = True, seed: int = 1234, step: int = 1, ): """Convert .exr segmentation maps into color-coded images for visualization Args: input_dir: directory in which to look for frames output_dir: directory in which to save colorized frames pattern: filenames of frames should match this ext: which format to save colorized frames as num_objects: number of unique objects to expect in the scene shuffle: if true, colorize items in a random order seed: seed used when shuffling colors step: drop some frames when colorizing, use frames 0+step*n """ # TODO: Multiprocess this # Lazy load imports to improve CLI responsiveness import colorsys import imageio.v3 as iio from visionsim.cli import _log, _validate_directories input_dir, output_dir, in_files = _validate_directories(input_dir, output_dir, pattern) in_files = in_files[::step] if num_objects is None: digest = _estimate_distribution(in_files, transform=np.unique) num_objects = int(digest.max()) _log.info(f"Found {num_objects} objects.\n") indices = np.arange(num_objects) if shuffle: np.random.seed(seed=seed) np.random.shuffle(indices) convert = np.vectorize(colorsys.hsv_to_rgb) r, g, b = convert(np.arange(num_objects) / num_objects, np.ones(num_objects), np.ones(num_objects)) r, g, b = np.insert(r, 0, 0), np.insert(g, 0, 0), np.insert(b, 0, 0) for in_file in track(in_files): idx = _read_exr(in_file).astype(int).squeeze() if idx.shape[-1] != 1 and idx.ndim == 3: idx = idx[..., 0] img = np.stack([r[idx], g[idx], b[idx]], axis=-1) img = (img * 255).astype(np.uint8) path = output_dir / Path(in_file).stem iio.imwrite(str(path.with_suffix(ext)), img)
[docs] def tonemap_exrs( input_dir: str | os.PathLike, output_dir: str | os.PathLike | None = None, batch_size: int = 4, hdr_quantile: float = 0.01, force: bool = False, ): """Convert .exr linear intensity frames into tone-mapped sRGB images Args: input_dir: directory in which to look for frames output_dir: directory in which to save tone mapped frames, if not specified the dynamic range is calculated and no tonemapping occurs batch_size: number of frames to write at once hdr_quantile: calculate dynamic range using brightness quantiles instead of extrema force: if true, overwrite output file(s) if present """ from torch.utils.data import DataLoader from visionsim.cli import _log, _validate_directories from visionsim.dataset import Dataset, ImgDatasetWriter input_path, output_path, *_ = _validate_directories(input_dir, output_dir) dataset = Dataset.from_path(input_path) loader = DataLoader( dataset, batch_size=batch_size, num_workers=os.cpu_count() or 1, collate_fn=functools.partial(_tonemap_collate, hdr_quantile=hdr_quantile), ) hdrs = [] with Progress() as progress: pbar = progress.add_task(description="Processing Frames...", total=len(dataset)) with ImgDatasetWriter( output_path, transforms=dataset.transforms, force=force, pattern="frame_{:06}.png" ) as writer: for idxs, imgs, poses, hdr in loader: writer[idxs] = (imgs, poses) hdrs.append(hdr) progress.update(pbar, advance=len(idxs)) hdrs_ = np.array(hdrs) _log.info(f"Mean dynamic range is {hdrs_.mean():0.2f}, with range ({hdrs_.min():0.2f}, {hdrs_.max():0.2f})")