Source code for crumpets.torch.augmentation_cuda

import math
import warnings

import numpy as np
import cv2
import torch
import torch.nn.functional as F

from ..augmentation_cpu import _make_base_lut
from ..augmentation_cpu import _make_contrast_luts
from ..augmentation_cpu import TANH_RANGE
from ..augmentation_cpu import TANH_OFFSET


# don't use OpenCL
# prevents spawning GPU processes that hog memory
try:
    cv2.ocl.setUseOpenCL(False)
except AttributeError:
    pass
# run single-threaded; only warpAffine benefits and scales poorly
cv2.setNumThreads(1)


DTYPE_INFO = {
    'torch.ByteTensor': (
        torch.float32,
        float(-2 ** 8),
        float(2 ** 8 - 1),
    ),
    'torch.cuda.ByteTensor': (
        torch.float32,
        float(-2 ** 8),
        float(2 ** 8 - 1),
    ),
    'torch.ShortTensor': (
        torch.float32,
        float(-2 ** 15),
        float(2 ** 15 - 1),
    ),
    'torch.cuda.ShortTensor': (
        torch.float32,
        float(-2 ** 15),
        float(2 ** 15 - 1),
    ),
    'torch.cuda.FloatTensor': (
        torch.float32,
        -1.0,
        1.0
    ),
    'torch.cuda.DoubleTensor': (
        torch.float64,
        -1.0,
        1.0
    ),
    'torch.cuda.HalfTensor': (
        torch.float32,
        -1.0,
        1.0
    ),
    'torch.FloatTensor': (
        torch.float32,
        -1.0,
        1.0
    ),
    'torch.DoubleTensor': (
        torch.float64,
        -1.0,
        1.0
    ),
    'torch.HalfTensor': (
        torch.float16,
        -1.0,
        1.0
    ),
    'torch.cuda.IntTensor': (
        torch.float32,
        float(-np.iinfo(np.uint32).max - 1),
        float(2 ** 31 - 1)
    ),
    'torch.IntTensor': (
        torch.float32,
        -np.iinfo(np.uint32).max - 1,
        float(2 ** 31 - 1)
    )
}


DEFAULT_LUTS = (
    _make_base_lut(2 ** 12, sigmoid=True),
    *_make_contrast_luts(2 ** 12, sigmoid=True),
)
DTYPE_LUTS = {
    torch.uint8: (
        _make_base_lut(2 ** 8, sigmoid=False),
        *_make_contrast_luts(2 ** 8, sigmoid=False),
    ),
    torch.int64: (
        _make_base_lut(2 ** 12, dtype=np.float64, sigmoid=True),
        *_make_contrast_luts(2 ** 12, dtype=np.float64, sigmoid=True),
    ),
    torch.float64: (
        _make_base_lut(2 ** 12, dtype=np.float64, sigmoid=True),
        *_make_contrast_luts(2 ** 12, dtype=np.float64, sigmoid=True),
    ),
}


def __add_gamma_impl(
        im_tensor,
        augs,
        maxv,
        org_type,
        temp_type,
        lut_size,
        base_lut,
        positive_contrast_lut,
        negative_contrast_lut,
):
    device = im_tensor.device
    num_images, num_channels, h, w = im_tensor.shape
    # create the lookup tables over channels and apply augmentations
    luts = []
    for a in augs:
        # compute the combined gamma = gray * color
        color = a.get('color', True)
        gamma_gray = a.get('gamma_gray')
        gamma_gray = gamma_gray if gamma_gray is not None else 1
        gamma_color = a.get('gamma_color')
        if not color or not gamma_color:
            gamma_color = num_channels * [1]
        if len(gamma_color) == 1:
            gamma_color *= num_channels
        elif len(gamma_color) != num_channels:
            raise ValueError(
                'number of gamma_color values must be broadcastable to number of channels'
            )

        # create the lookup tables
        contrast = a.get('contrast', 0)
        if contrast > 0:
            lut = (1 - contrast) * base_lut + contrast * positive_contrast_lut
        elif contrast < 0:
            lut = (1 + contrast) * base_lut - contrast * negative_contrast_lut
        else:
            lut = base_lut
        for gamma in gamma_color:
            luts.append(lut ** (gamma_gray * gamma) * maxv)

    # put lookup tables on GPU
    lut = torch.tensor(luts, dtype=temp_type, device=device).reshape(-1)
    # view im_tensor as shape [num_images*num_channels, h, w]
    # offset the values in each row so they point to the next lookup table:
    # row_0 = row_0 + 0 * lut_values
    # row_1 = row_1 + 1 * lut_values
    # row_2 = row_2 + 2 * lut_values
    # ...
    # row_n = row_n + n * lut_values
    offset = torch.arange(num_images*num_channels, dtype=im_tensor.dtype)
    offset = (offset * lut_size).reshape(num_images*num_channels, 1).to(device)
    im_tensor = im_tensor.reshape(num_images*num_channels, -1)
    im_tensor += offset
    # apply lookup table
    if org_type == torch.uint8:
        # take fast path for uint8, round for better accuracy
        im_tensor[...] = lut.round_().to(org_type).take(
            im_tensor  # .to(torch.int64)
        )
    else:
        # other types need linear interpolation
        lower = im_tensor.to(torch.int64)
        upper = im_tensor.ceil().to(torch.int64)
        diff = upper.to(temp_type)
        diff -= im_tensor
        im_tensor[...] = lut.take(lower)
        im_tensor.lerp_(lut.take(upper), diff)
        if not org_type.is_floating_point:
            # round for integer types for better accuracy
            im_tensor.round_()


[docs]def add_gamma( im_tensor, augs, maxv=None ): """ A Function that takes a tensor that contains a Batch of Images and a list of dictionaries that contain information about the desired gamma values and takes those gamma values to apply gamma correction to the images. This function is hardware accelerated, so be sure that the im_tensor is located on the GPU. :param im_tensor: the Tensor that contains the Image data :param augs: a list of dictionaries. Each dict should contain a 'color', a 'gamma_gray', a 'gamma_color', and a 'contrast' value to specify the behaviour of the gamma augmentation. For further information see :func:`~crumpets.augmentation.randomize_image` :param maxv: Maximum value of the entries. This value is data type dependent, so be careful with it. It defaults to "None". None indicates that the value is taken according to the data type of the tensor. """ if not im_tensor.is_cuda: warnings.warn("input should be cuda tensor") im_tensor = im_tensor.clone() # tensor needs to be contiguous for take function # im_tensor = im_tensor.contiguous() org_type = im_tensor.dtype temp_type, _, maxv_ = DTYPE_INFO[im_tensor.type()] maxv = maxv or maxv_ base_lut, positive_contrast_lut, negative_contrast_lut = \ DTYPE_LUTS.get(org_type, DEFAULT_LUTS) lut_size = len(base_lut) num_images = im_tensor.shape[0] num_channels = im_tensor.shape[1] if org_type == torch.uint8: # uint8 tensors can use int type for better efficiency im_tensor = im_tensor.to(torch.int64) # batch size can be pretty damn big batch_size = int((2 ** 63 - 1 - maxv) // num_channels) else: # transform input with bigger dtypes into non-linear space im_tensor = im_tensor.to(temp_type) # im_tensor *= 2 * TANH_RANGE / maxv # im_tensor -= TANH_RANGE im_tensor = torch.add(-TANH_RANGE, im_tensor, alpha=2 * TANH_RANGE / maxv) im_tensor.clamp_(-TANH_RANGE, TANH_RANGE) im_tensor.atanh_() # im_tensor *= 1 / 2 / TANH_OFFSET * (lut_size - 1) # im_tensor += 0.5 * (lut_size - 1) im_tensor = torch.add(0.5 * (lut_size - 1), im_tensor, alpha=1 / 2 / TANH_OFFSET * (lut_size - 1)) # floating point types nee to use smaller batches to keep precision if org_type.is_floating_point: batch_size = int(192 // num_channels) else: batch_size = int((2 ** 24 - 1 - maxv) // num_channels) # process in batches to prevent index offset exceeding # precision of temp_type like float32 for i in range(0, num_images, batch_size): __add_gamma_impl( im_tensor[i:i+batch_size], augs, maxv, org_type, temp_type, lut_size, base_lut, positive_contrast_lut, negative_contrast_lut, ) return im_tensor.to(org_type)
[docs]def add_noise_rgb(im, augs, minv=None, maxv=None, internal_ftype=None): """ A Function that takes a tensor that contains a batch of images and a list of dictionaries that contain information about the desired noise and takes this information to add noise according to the that to the images. This noise function tries to mimic the rgb noise of a camera sensor, what means that the green value has a lower noise. This function is hardware accelerated, so be sure that the im is located on the GPU. :param im: the Tensor that contains the Image data :param augs: a list of dictionaries. Each dict should contain a 'noise' value to specify the behaviour of the noise augmentation. For further information see :func:`~crumpets.augmentation.randomize_image` :param minv: Minimum value of the entries. This value is data type dependent, so be careful with it. It defaults to "None". None indicates that the value is taken according to the data type of the tensor. :param maxv: Maximum value of the entries. This value is data type dependent, so be careful with it. It defaults to "None". None indicates that the value is taken according to the data type of the tensor. :param internal_ftype: The type that is used internally to compute the noise. The type defaults to None, what indicates that a fitting type is chosen according to the input type. For most types the internal type is float32. """ if not im.is_cuda: warnings.warn("input should be cuda tensor") n, c, h, w = im.shape noise_type, minv1, maxv1 = DTYPE_INFO[im.type()] noise_type = internal_ftype if internal_ftype else noise_type minv = minv if minv else minv1 maxv = maxv if maxv else maxv1 strengths = [a['noise'] if 'noise' in a else 0 for a in augs] old_im_type = im.dtype im = im.type(noise_type) s = torch.from_numpy(np.asarray(strengths)[:, None, None, None]).to(noise_type).to(im.device) multiplier = torch.tensor([1, 0.5] + [1] * (c - 2), device=im.device, dtype=noise_type) # print(s.shape) # print(multiplier.shape) s = torch.mm(s.reshape(-1, 1), multiplier.reshape(1, -1)).reshape(n, c, 1, 1) noisyrb = torch.empty(n, c - 1, h, w, dtype=noise_type, device=im.device).uniform_(minv, maxv).to(im.device) noisyg = torch.empty(n, 1, h // 2, w // 2, dtype=noise_type, device=im.device).uniform_(minv, maxv).to(im.device) noisyg = torch.nn.functional.interpolate(input=noisyg, size=(h, w), mode="bilinear", align_corners=True) # print(noisyrb[:,0].reshape(n,1,h,w).shape) # print(noisyg.shape) # print(noisyrb[:,1:].reshape(n,c-2,h,w).shape) noisy = torch.cat([ noisyrb[:, 0].reshape(n, 1, h, w), noisyg, noisyrb[:, 1:].reshape(n, c - 2, h, w) ], dim=1) noisy = torch.addcmul(im, 1, s, noisy.reshape(n, c, h, w)) im = noisy.clamp(0, maxv).type(old_im_type) return im
[docs]def add_noise_other(im, augs, minv=None, maxv=None, internal_ftype=None): """ A Function that takes a tensor that contains a batch of images and a list of dictionaries that contain information about the desired noise and adds noise according to that to the images. This function is Hardware accelerated, so be sure that the im tensor is located on the GPU. :param im: the Tensor that contains the image data :param augs: a list of dictionaries. Each dict should contain a 'noise' value to specify the behaviour of the noise augmentation. For further information see :func:`~crumpets.augmentation.randomize_image` :param minv: Minimum value of the entries. This value is data type dependent, so be careful with it. It defaults to "None". None indicates that the value is taken according to the data type of the tensor. :param maxv: Maximum value of the entries. This value is data type dependent, so be careful with it. It defaults to "None". None indicates that the value is taken according to the data type of the tensor. :param internal_ftype: The type that is used internally to compute the noise. For most types the internal type is float32. The type defaults to None, what indicates that a fitting type is chosen according to the input type. """ if not im.is_cuda: warnings.warn("input should be cuda tensor") n, c, h, w = im.shape noise_type, minv1, maxv1 = DTYPE_INFO[im.type()] noise_type = internal_ftype if internal_ftype else noise_type minv = minv if minv else minv1 maxv = maxv if maxv else maxv1 strengths = [a['noise'] if 'noise' in a else 0 for a in augs] old_im_type = im.dtype im = im.type(noise_type) s = torch.from_numpy(np.asarray(strengths)[:, None, None, None]).to(noise_type).to(im.device) noisy = torch.empty(n, c, h, w, dtype=noise_type, device=im.device).uniform_(minv, maxv).to(im.device) noisy = torch.addcmul(im, 1, s, noisy) im = noisy.clamp(minv, maxv).type(old_im_type) return im
[docs]def add_blur(im, augs): """ A Function that takes a tensor that contains a batch of images and a list of dictionaries that contain information about the desired blur and takes this information to blur the image. This function is hardware accelerated, so be sure that the im is located on the GPU. :param im: the Tensor that contains the image data :param augs: a list of dictionaries. Each dict should contain a 'blur' value. This blur indicates the sigma value of the normal distribution filter that is used to blur the image. Also note that the blur value should be relative to the image size, to achieve the same optical blur effect on different image sizes. For further information see :func:`~crumpets.augmentation.randomize_image` """ if not im.is_cuda: warnings.warn("input should be cuda tensor") n, c, h, w = im.shape threshold = 0.0 # 0.1 / 448 # extract relevant augmentation imtype = im.type() if type(augs) is not list: raise Exception("Augmentations should be a list") if n != len(augs): raise Exception( "the number of augmentations should match the batch size, expected: " + str(n) + " but got " + str( len(augs)) + " \naugmentations: " + str(augs)) sigmas = np.asarray([a['blur'] if 'blur' in a and a['blur'] > threshold else 0.0 for a in augs]) if not any(sigmas): return im # sigma is relative to image width sigmas *= im.shape[3] # prepare kernels ksizes = [int(s * 6.6 - 2.3) | 1 if s >= 0.2 else 0 for s in sigmas] # for roughly s <= 0.2 ksize would be < 0 ksizes = [k if k >= 3 else 3 for k in ksizes] maxsize = max(ksizes) sigmas = [sigma or 1e-8 for sigma in sigmas] kernels = [cv2.getGaussianKernel(maxsize, sigma) for ksize, sigma in zip(ksizes, sigmas)] kernels = [torch.from_numpy(k.astype(np.float32)).repeat(c, 1, 1) for k in kernels if k is not None and k.size > 1] kernels = torch.stack(kernels) kernels = kernels.reshape(-1, 1, maxsize, 1).to(im.device) im = F.pad(im.float(), (0, 0, math.floor(maxsize / 2), math.floor(maxsize / 2)), mode='replicate') _, _, newh, neww = im.shape im = im.reshape(1, -1, newh, neww) im = F.conv2d(im, kernels, groups=n * c) im = F.pad(im.float(), (math.floor(maxsize / 2), math.floor(maxsize / 2), 0, 0), mode='replicate') _, _, newh, neww = im.shape im = im.reshape(1, -1, newh, neww) im = F.conv2d(im, kernels.reshape(-1, 1, 1, maxsize), groups=n * c) im = im.type(imtype).reshape(n, c, h, w) return im