Source code for crumpets.workers.saliency

import random
import itertools as it
from math import ceil

import cv2
import numpy as np
from numpy.random import multivariate_normal

from ...augmentation import make_transform
from ...augmentation import calc_scale_ratio
from .. import ImageWorker


[docs]def check_range(points, h, w): # remove NaNs points = points[np.logical_not(np.isnan(points).any(axis=1))] # x/y >= 0 # noinspection PyUnresolvedReferences points = points[(points >= 0).all(1)] # x < w, y < h ind = np.logical_and(points[:, 0] < w, points[:, 1] < h) return points[ind]
[docs]def interpolate_points(points, h, w): points = check_range(points, h, w) lower = np.floor(points).astype(np.int32) upper = np.ceil(points).astype(np.int32) x = points[:, 0] y = points[:, 1] xl = lower[:, 0] yl = lower[:, 1] xu = upper[:, 0] yu = upper[:, 1] np.clip(xl, 0, w-1, xl) np.clip(yl, 0, h-1, yl) np.clip(xu, 0, w-1, xu) np.clip(yu, 0, h-1, yu) xlyl = (1 - (x - xl)) * (1 - (y - yl)) xlyu = (1 - (x - xl)) * (1 - (yu - y)) xuyl = (1 - (xu - x)) * (1 - (y - yl)) xuyu = (1 - (xu - x)) * (1 - (yu - y)) s = xlyl + xlyu + xuyl + xuyu + 1e-9 return \ it.chain(yl, yu, yl, yu), \ it.chain(xl, xl, xu, xu), \ it.chain(xlyl / s, xlyu / s, xuyl / s, xuyu / s)
[docs]def discretize_points(points, h, w): return check_range(points.round(), h, w).astype(np.int32)
[docs]def show(name, mat): cv2.namedWindow(name, cv2.WINDOW_NORMAL) cv2.resizeWindow(name, 600, 600) cv2.imshow(name, mat) cv2.waitKey()
[docs]class SaliencyWorker(ImageWorker): """ Worker that outputs images and saliency maps created from raw gaze locations. Expects the following keys present in each sample: {"image": encoded image data "experiments": [experiment, ...]} Each experiment is first checked for fixations points under key "fixations". Falls back to key "locations" of raw gaze data if no fixations are found. The following parameters can be configured: - image_params: see ImageWorker - target_image_params: - "sample_ratio" (default: 1): float in [0, 1]; percentage of experiments sampled from the list of all experiments - "jitter" (default: 0): add noise to the individual gaze locations; sigma of a Gaussian distribution, scaled by the size of the target_images: noise ~ N(jitter * target_image_size) - "interpolate" (default: False): use linear interpolation to map gaze locations to the target_image - "blur" (default: 0): apply Gaussian blur with sigma blur * target_image_size to target_image - "maxnorm" (default: False): apply maximum norm to target_image """ def __init__(self, image, target_image, image_params=None, target_image_params=None, image_rng=None, **kwargs): ImageWorker.__init__(self, image, image_params, image_rng, **kwargs) self.add_buffer('target_image', target_image) self.add_params('target_image', target_image_params)
[docs] def prepare(self, sample, batch, buffers): im, params = ImageWorker.prepare(self, sample, batch, buffers) ih, iw = im.shape[:2] map_params = dict(params) map_params.update(self.params['image']) map_params.update(self.params['target_image']) target = buffers['target_image'] r = calc_scale_ratio( (ih, iw), target.shape, map_params.get('scale', 1), map_params.get('scale_mode', 'shortest') )[0] mh, mw = [int(s*ceil(r)) for s in target.shape] map_buffer = np.zeros((mh, mw), np.float32) # create experiment arrays, using only 1st and 2nd column experiments = [ check_range(np.float32( experiment.get('fixations', experiment['locations']) ), ih, iw) # fixations for experiment in sample['experiments'] ] # remove (now) empty experiments experiments = [ experiment for experiment in experiments if experiment.size ] # sample a subset of experiments sample_ratio = map_params.get('sample_ratio', 1.0) n = int(ceil(len(experiments)*sample_ratio)) experiments = random.sample(experiments, n) # create combined points array points = np.concatenate([ experiment for experiment in experiments if experiment.size ]) if not points.size: raise ValueError('no gaze points') # transform points to map_buffer coordinates m = np.eye(3, dtype=np.float32) m[:2] = make_transform( (ih, iw), (mh, mw), map_params.get('angle', 0), map_params.get('scale', 1), map_params.get('aspect', 1), map_params.get('shift', None), map_params.get('hmirror', False), map_params.get('vmirror', False), map_params.get('shear', None), map_params.get('scale_mode', 'shortest') ) points = points.reshape((1, -1, 2)) points = cv2.perspectiveTransform(points, m).reshape((-1, 2)) # add random noise to points jitter = map_params.get('jitter', 0) if jitter: # TODO fix quadratic sigma scaling mean = np.array([0, 0]) cov = np.array([[jitter * mw, 0], [0, jitter * mw]]) points += multivariate_normal(mean, cov, len(points)) points = check_range(points, mh, mw) if not points.size: raise ValueError('no gaze points') interpolate = map_params.get('interpolate', False) if interpolate: y, x, v = interpolate_points(points, mh, mw) map_buffer[y, x] = v else: points = discretize_points(points, mh, mw) map_buffer[points[:, 1], points[:, 0]] = 1 cv2.resize(map_buffer, target.shape, target, 0, 0, interpolation=cv2.INTER_AREA) blur = map_params.get('blur', 0) if blur: target[...] = cv2.GaussianBlur(target, (0, 0), blur*target.shape[1]) if map_params.get('maxnorm'): target /= target.max() + 1e-9