from __future__ import annotations
from types import EllipsisType
from typing import (
Sequence,
Tuple,
Callable,
List,
Any,
Iterator,
Generator,
Union,
NewType
)
from copy import copy, deepcopy
Shape = Sequence[int]
Index = Tuple[Union[type(Ellipsis), slice], ...]
SliceList = List[Union[type(Ellipsis), slice]]
[docs]
class TensorLike:
""" Any array type that matches this signature will work with ryomen. Tested include: zarr, numpy, pytorch... """
shape: Shape
def __init__(self):
...
[docs]
def __getitem__(self, item: Any) -> TensorLike:
""" Required Method """
...
[docs]
def __setitem__(self, index: Any, item: Any) -> TensorLike:
""" Required Method """
...
[docs]
def __iter__(self) -> Iterator:
""" Required Method"""
...
def __next__(self) -> TensorLike:
...
def __mul__(self, other) -> TensorLike:
...
[docs]
def size(self) -> Shape:
""" Required Method """
...
[docs]
def flip(self, dim: int) -> TensorLike:
""" not necessary if tensor supports negative strides """
...
def default_collate(x: Sequence[TensorLike]) -> TensorLike | Sequence[TensorLike]:
return x
def default_progress_bar(x: Iterator | Generator) -> Any:
return x
[docs]
class Slicer:
def __init__(
self,
image: TensorLike,
crop_size: Sequence[int],
overlap: Sequence[int],
batch_size: int = 1,
pad: bool = True,
output_transform: Callable[[TensorLike], TensorLike] = lambda x: x,
collate: Callable[
[Sequence[TensorLike]], TensorLike | Sequence[TensorLike]
] = default_collate,
progress_bar: Callable[[Iterator | Generator], Any] = default_progress_bar,
):
"""
Ryomen is a generic cropping utility for separating up large microscopy images into smaller,
equal sized, sub crops.
Ryomen works with any generic array type, as long as it implements the following methods: __getitem__,
__setitem__, __iter__, and __size__. Furthermore, the array type must support either a flip method, or
support negative indexing, and must implement a parameter shape. These criteria basically fit all
numpy-like arrays. Tested array libraries include zarr, numpy, and pytorch.
Basic Usage:
>>> import numpy as np
>>> from ryomen import Slicer
>>> image = np.random.randn((3, 100, 100, 100)) # A large, 3D array with a color channel
>>> # image.shape is [B, C, X, Y, Z]
>>> crop_size, overlap = (10, 10, 10), (2, 2, 2) # we want to crop on the spatial dimensions!
>>>
>>> for crop, index = Slicer(image, crop_size, overlap)
>>> print(crop.shape) # [3, 10, 10, 10]
The function also allows for batching and a custom colate function, usefull if you want to use this
for machine learning evaluation. The default behavior is to agregate all batched images into a list. A
custom collate function should accept a list of arrays and may return anything.
The default behavior looks like this...
>>> import numpy as np
>>> from ryomen import Slicer
>>> image = np.random.randn((3, 100, 100, 100)) # A large, 3D array with a color channel
>>> crop_size, overlap = (10, 10, 10), (2, 2, 2) # we want to crop on the spatial dimensions!
>>> for crop, index = Slicer(maige, crop_size, overlap, batch_size=8)
>>> print(len(crop)) # 8
Now with a custom collate function...
>>> import numpy as np
>>> from ryomen import Slicer
>>> image = np.random.randn((3, 100, 100, 100)) # A large, 3D array with a color channel
>>> crop_size, overlap = (10, 10, 10), (2, 2, 2) # we want to crop on the spatial dimensions!
>>> collate = lambda x: np.stack(x, axis=0)
>>> for crop, index = Slicer(image, crop_size, overlap, batch_size=8, collate=collate)
>>> print(crop.shape) # [8, 3, 10, 10, 10]
This utility also allows for the evaluation of arbitrary functions on each crop prior to collating. This
is useful if you want to normalize each crop. This function should accept an ArrayLike and return an ArrayLike
>>> import numpy as np
>>> from ryomen import Slicer
>>> image = np.random.randn((3, 100, 100, 100)) # A large, 3D array with a color channel
>>> crop_size, overlap = (10, 10, 10), (2, 2, 2) # we want to crop on the spatial dimensions!
>>> collate = lambda x: np.stack(x, axis=0)
>>> normalize = lambda x: x / 255 * 2 - 1
>>> for crop, index = Slicer(image, crop_size, overlap, batch_size=8, collate=collate, output_transform=normalize)
>>> print(crop.shape) # [8, 3, 10, 10, 10]
:param image: N Dimensional Array of any type
:param crop_size: Tuple ints for cropping dimensions
:param overlap: Tuple of ints for the overlap between crops
:param batch_size: batch size of crops. Allows for returning multiple crops in one iteration
:param pad: pads the image by reflection if true. Only supported if input tensor implements a flip method.
:param output_transform: function to apply to each crop before returning
:param collate: function to collate images, the default behavior is to return a list of crops
:param progress_bar: function which wraps the iteration and displays a progress bar. (e.g. tqdm)
"""
self.__N = None
self.__image: TensorLike = image
self.__crop_size = list(crop_size)
self.__overlap = list(overlap)
self.__batch_size = batch_size
self.__output_fn = output_transform
self.__collate_fn = collate
self.__progress_bar = progress_bar
self.__crop_cache = None
# Default, is changed to value of pad after suitable checks
self.__can_pad = False
# Default, is changed to true after suitable checks
self.__support_negative_strides = False
# Zarr arrays don't have a flip method, or support negative indices. However,
# after one indexing, they turn into numpy arrays which DO support negative
# indices. To keep no dependencies, we do not type check, however
# if it looks like a duck...
self.__probably_a_zarr_array = False
# Check that all the inputs are legitimate...
self._check_validity()
# Adjust crop size such that we can return images smaller than crop size.
# This is silent and might be confusing. The user should never have to access the crop size after creation.
self._adjust_crop_size()
# Here we check if we can pad the image with indexing. We require negative indexing support, an implement flip
# method, or it to be a zarr array
self._able_to_pad(pad)
def _adjust_crop_size(self):
"""
checks if user supplied crop size is larger than the image. If so, modifies the crop
size to fit entire image all at once.
"""
self.__user_defined_crop_larger_than_image = False
image_shape = self.__image.shape
for i, size in enumerate(self.__crop_size):
if (
not self.__crop_size[i]
< image_shape[i + len(image_shape) - len(self.__crop_size)]
):
self.__user_defined_crop_larger_than_image = True
self.__crop_size[i] = (
self.__crop_size[i]
if self.__crop_size[i]
< image_shape[i + len(image_shape) - len(self.__crop_size)]
else image_shape[i + len(image_shape) - len(self.__crop_size)]
)
def _able_to_pad(self, pad: bool):
""" checks for negative indexing support, or a flip method. Ugly implementation! """
try: # test for negative stride
self.__image[0:1:-1, ...] # can flip with negative indices
self.__support_negative_strides = True
self.__can_pad = pad
except: # Bare except is bad. I don't think it's a big deal here though. Don't know error a priori
try:
self.__image[0:1, ...][::-1, ...]
self.__probably_a_zarr_array = True
self.__can_pad = pad
except:
if not hasattr(self.__image, "flip"):
raise RuntimeError(
"Your image array does not implement a flip method or support negative strides. Therefore, padding is not supported."
)
else:
self.__can_pad = pad
def _check_validity(self):
""" checks the validity of all inputs """
if not hasattr(self.__image, "size"):
raise ValueError(
f"Input array of type {type(self.__image)} does not have the method: size"
)
if not hasattr(self.__image, "__getitem__"):
raise ValueError(
f"Input array of type {type(self.__image)} does not have the method: __getitem__"
)
if not hasattr(self.__image, "shape"):
raise ValueError(
f"Input array of type {type(self.__image)} does not have the method: shape"
)
shape: Shape = self.__image.shape
# Check if the crop size dimension is reasonable
if len(shape) < len(self.__crop_size):
raise ValueError(
f"image of shape: {shape} cannot be subdivided into crops of shape: {self.__crop_size} with overlap: {self.__overlap}"
)
# Check if the overlap size dimension is reasonable
if len(shape) < len(self.__overlap):
raise ValueError(
f"image of shape: {shape} cannot be subdivided into crops of shape: {self.__crop_size} with overlap: {self.__overlap}"
)
if len(self.__crop_size) != len(self.__overlap):
raise ValueError(f"crop_size and overlap_size must have the same length")
for d, (c, o) in enumerate(zip(self.__crop_size, self.__overlap)):
if c - (o * 2) < 0:
raise ValueError(
f"Overlap in {d} dimension cannot be equal to or larger than crop size... {c=} - {o*2=} = {c - (o * 2)} < {c}"
)
if c <= 0:
raise ValueError(f"crop size must not be negative")
if o < 0:
raise ValueError(f"Overlap must not be negative")
if not isinstance(c, int):
raise TypeError(
f"Crop sizes must be an integer, not {c=} of type {type(c)}"
)
if not isinstance(o, int):
raise TypeError(
f"Overlap sizes must be an integer, not {o=} of type {type(o)}"
)
if not isinstance(self.__batch_size, int):
raise TypeError(f"Batch sizes must be a positive integer > 0")
if self.__batch_size <= 0:
raise ValueError(f"Batch sizes must be a positive integer > 0")
n = len(
list(
self._nd_generator(
self.__crop_size, self.__overlap, self.__image.shape, self.__can_pad
)
)
)
if self.__batch_size > n:
raise RuntimeError(
"requested batch size is greater than the entirety of the image cropped. "
f"All crops collated would result in {n} batches. {n} < {self.__batch_size}"
)
def _get_source_index(self) -> Index:
"""
returns indices of the crop excluding overlap
only used by self.__iter__
"""
source = [Ellipsis]
for o, s in zip(self.__overlap, self.__crop_size):
source += [slice(o, -o if o != 0 else s)]
return tuple(source)
def _get_destination_index(self, xyz: Sequence[int], shape: Shape) -> Index:
"""
returns the indicies necessary to place the non-overlapping region of the image into a new tensor
only used by self.__iter__
"""
shape = list(shape)
while len(shape) > len(self.__crop_size):
shape.pop(0)
destination: Sequence[EllipsisType | slice] = [Ellipsis]
for a, c, o, s in zip(xyz, self.__crop_size, self.__overlap, shape):
a = a if a + c < s + (self.__can_pad * o) else s - c + (self.__can_pad * o)
destination += [slice(a + o, (a + c) - o, 1)]
return tuple(destination)
@staticmethod
def _get_next_slice(
ind: List[int], c: List[int], o: List[int], shape: Shape, pad: bool
) -> Tuple[Index, List[int]]:
"""
given a current index ind, a crop c, overlap o, and image shape
calculate the next index of a crop such that all tiles of the image are eventually
covered
:param ind: list of the current coordinate pos
:param c: crop size
:param o: overlap size
:param shape: shape of ultimate e
:return: Slice of
"""
# Starting at a coordinate defined by ind,
# generated slices for a crop
indices: list[EllipsisType | slice] = [Ellipsis]
for i in range(len(shape)):
_x = ind[i] if ind[i] + c[i] <= shape[i] + (o[i] * pad) else shape[i] - c[i] + (o[i] * pad)
indices.append(slice(_x, _x + c[i], 1))
# Now we adjust the next corner based on crop size and out of bounds checks.
i = 0
while i < len(shape):
# Can we take a full crop step without going out of bounds?
if ind[i] + c[i] - o[i] * 2 <= shape[i] + (o[i] * pad):
ind[i] += c[i] - o[i] * 2
break
# If we cannot, bound to the end.
else:
ind[i] = -o[i] if pad else 0
i += 1
return tuple(indices), ind
def _nd_generator(
self, crop: List[int], overlap: List[int], shape: Shape, pad: bool
) -> Generator[Tuple[Index, Sequence[int]], None, None]:
""" so cursed """
assert len(crop) == len(overlap)
# In the weird case where an overlap is larger than a crop, we should fail here...
for c, o in zip(crop, overlap):
if not c > o * 2:
raise ValueError(f"Crop Size must be larger than overlap * 2. {c=} !> {o=} * 2")
# init a list of zeros as the "first" crop location
x = [-o if pad else 0 for _, o in zip(crop, overlap)]
assert len(shape) >= len(crop)
# We cache all previous indices here, as the cost of looking this up
# is presumably less than the cost of re-running the expensive fn
previously_yielded = set()
# copy because lists are mutable -- we need to mutate it in this scope but not the higher one...
shape = copy(list(shape))
# Leading dimensions are allowed, therefore we pop off all leading dimensions until the shape
# and crop shape are the same!
while len(shape) > len(crop):
shape.pop(0)
ind, x = self._get_next_slice(x, crop, overlap, shape, pad)
while not all((a + c) > b + (o * pad) for a, b, c, o in zip(x, shape, crop, overlap)):
if str(ind) not in previously_yielded:
previously_yielded.add(str(ind))
yield ind, x
ind, x = self._get_next_slice(x, crop, overlap, shape, pad)
if str(ind) not in previously_yielded:
previously_yielded.add(str(ind))
yield ind, x
ind, x = self._get_next_slice(x, crop, overlap, shape, pad)
if str(ind) not in previously_yielded:
previously_yielded.add(str(ind))
yield ind, x
def _flush_output(
self, output_cache
) -> Tuple[
TensorLike | Sequence[TensorLike],
Index | Sequence[Index],
Index | Sequence[Index],
]:
""" pop off the cache and convert to images, sources, and destinations """
output = []
for i in range(len(output_cache)):
output.append(output_cache.pop(0))
images: Sequence[TensorLike] | TensorLike = self.__collate_fn(
[t for t, s, d in output]
)
sources: List[Index] = [tuple(s) for t, s, d in output]
destinations: List[Index] = [tuple(d) for t, s, d in output]
is_list = isinstance(images, list)
all_same_size = len(images) == len(sources) == len(destinations) == 1
if is_list and all_same_size:
return images[0], sources[0], destinations[0]
else:
return images, sources, destinations
@staticmethod
def _minmax(a: slice, m: int) -> slice:
""" utility function to clamp a slice between 0 and m """
if a.start < 0:
delta = abs(a.start)
a = slice(a.start + delta, a.stop + delta, a.step)
elif a.stop > m:
delta = abs(a.stop - m)
a = slice(a.start - delta, a.stop - delta, a.step)
return slice(min(max(a.start, 0), m), min(max(a.stop, 0), m), a.step)
def _flip_array(self, array: TensorLike, index: int):
""" utility function to flip an array """
if self.__probably_a_zarr_array and self.__can_pad:
ind = [
slice(s, None, -1) if i == index else slice(0, None, 1)
for i, s in enumerate(array.shape)
]
return array[tuple(ind)]
elif not self.__support_negative_strides: # can flip...
return array.flip(index)
def _index_image_with_pad(self, index: Index) -> TensorLike:
"""
Terrible function to index the image, and through indexing alone, pads the image with reflections. It
should do this in N Dimensions, but was an absolute nightmare to code, and there are so many edge cases. Ive
tried my best to handle as many as possible, but innevitably some may fall through thte cracks.
:param index:
:return:
"""
if not self.__can_pad:
return self.__image[index]
# axis is smaller (-1) or larger than size
shape = copy(list(self.__image.shape))
while len(shape) > len(self.__crop_size):
shape.pop(0) # remove leading dim
# Remove all Ellipsis. There should only be one at position 0, however this
# implementation makes linters happy...
modified_index: List[slice] = [
s for s in list(copy(index)) if isinstance(s, slice)
]
# First we check if any padding is even necessary...
padding_necessary = False
for i, (ind, s, o) in enumerate(zip(modified_index, shape, self.__overlap)):
if ind.start < 0:
padding_necessary = True
elif ind.stop > s:
padding_necessary = True
if not padding_necessary:
return self.__image[index]
# We have a problem! We need a library agnostic way to create a tensor
# We can do this by indexing another part of the tensor, and doing a deepcopy.
if self.__crop_cache is None:
_output_index, _ = self._get_next_slice(
[0 for _ in self.__crop_size],
c=self.__crop_size,
o=self.__overlap,
shape=shape,
pad=False,
)
output_array = self.__image[_output_index] * 0 # zeros the array
self.__crop_cache = output_array
else:
output_array = self.__crop_cache
first_access = False
n_leading_dimensions = len(shape) - len(self.__crop_size)
for i, (ind, s, o) in enumerate(zip(modified_index, shape, self.__overlap)):
padding_source: SliceList = [Ellipsis]
other_source: SliceList = [Ellipsis]
padding_destination: SliceList = [Ellipsis]
other_destination: SliceList = [Ellipsis]
has_padded = False
for d, (current_slice, c) in enumerate(
zip(modified_index, self.__crop_size)
):
x = c if first_access else shape[d + n_leading_dimensions]
if i + n_leading_dimensions != d: # Dimension where padding is not required
padding_source.append(self._minmax(current_slice, c if first_access else x))
other_source.append(self._minmax(current_slice, c if first_access else x))
padding_destination.append(slice(0, c, 1))
other_destination.append(slice(0, c, 1))
elif ind.start < 0: # Padding required on the left
has_padded = True
padding_source.append(
slice(abs(ind.start) - 1, None, -1)
if self.__support_negative_strides
else slice(0, abs(ind.start), 1)
)
padding_destination.append(slice(0, abs(ind.start), 1))
other_source.append(slice(0, ind.stop, 1))
other_destination.append(slice(abs(ind.start), None, 1))
elif ind.stop > s: # Padding required on the right
has_padded = True
padding_source.append(
slice(x, -(ind.stop - s) - 1, -1)
if self.__support_negative_strides
else slice(-(ind.stop - s), x, 1)
)
other_source.append(slice(ind.stop - s if first_access else current_slice.start, x))
padding_destination.append(slice(-(ind.stop - s), c, 1))
other_destination.append(slice(0, -(ind.stop - s), 1))
if has_padded:
to_access = output_array if first_access else self.__image
pad = deepcopy(to_access[tuple(padding_source)])
if not self.__support_negative_strides:
pad = self._flip_array(pad, i + len(shape) - len(self.__crop_size))
other = deepcopy(to_access[tuple(other_source)])
output_array[tuple(other_destination)] = other
output_array[tuple(padding_destination)] = pad
first_access = True
return output_array
[docs]
def __len__(self):
""" Returns the number of slices this iterator will iterate over """
if self.__N is None:
self.__N = len(
list(
self._nd_generator(
self.__crop_size,
self.__overlap,
self.__image.shape,
pad=self.__can_pad,
)
)
)
return self.__N // self.__batch_size + (1 if self.__N % self.__batch_size else 0)
[docs]
def __iter__(
self,
) -> Generator[
Tuple[
TensorLike | Sequence[TensorLike],
Index | Sequence[Index],
Index | Sequence[Index],
],
None,
None,
]:
"""
Creates a generator which yields a TensorLike Crop, Source Indices, and Destination Indices.
"""
image_shape: Shape = self.__image.shape # C, X, Y, Z
output_cache = []
for ind, xyz in self.__progress_bar(
self._nd_generator(self.__crop_size, self.__overlap, image_shape, self.__can_pad)
):
source: Index = self._get_source_index()
destination: Index = self._get_destination_index(
tuple(a.start for a in ind if isinstance(a, slice)), image_shape
)
crop: TensorLike = self._index_image_with_pad(tuple(ind))
output_cache.append((self.__output_fn(crop), source, destination))
# If we've added more to the output cache than the batch size, we flush the batch away
if len(output_cache) == self.__batch_size:
yield self._flush_output(output_cache)
if len(output_cache) > 0:
yield self._flush_output(output_cache)