# Source code for africanus.deconv.hogbom.clean

```# -*- coding: utf-8 -*-

import logging

import numba
import numpy as np

try:
import scipy.signal
from scipy import optimize as opt
except ImportError as e:
opt_import_err = e
else:
opt_import_err = None

from africanus.util.requirements import requires_optional

@numba.jit(nopython=True, nogil=True, cache=True)
def twod_gaussian(coords, amplitude, xo, yo, sigma_x, sigma_y, theta, offset):
x = coords[0]
y = coords[1]
xo = float(xo)
yo = float(yo)
a = (np.cos(theta)**2)/(2*sigma_x**2) + (np.sin(theta)**2)/(2*sigma_y**2)
b = -(np.sin(2*theta))/(4*sigma_x**2) + (np.sin(2*theta))/(4*sigma_y**2)
c = (np.sin(theta)**2)/(2*sigma_x**2) + (np.cos(theta)**2)/(2*sigma_y**2)
g = (offset + amplitude *
np.exp(- (a*((x-xo)**2) +
2*b*(x-xo)*(y-yo) +
c*((y-yo)**2))))
return g.flatten()

@requires_optional('scipy', opt_import_err)
def fit_2d_gaussian(psf):
"""
Fit an elliptical Gaussian to the primary lobe of the psf
"""

# Get the full width at half maximum height of the psf

# numba doesn't have argwhere, but it can jit argwhere's
# implementation
# I = np.stack((psf>=0.5*psf.max()).nonzero()).transpose()

loc = np.argwhere(psf >= 0.5*psf.max())
# Create an array with these values at the same indices and zeros otherwise
lk, mk = psf.shape
psf_fit = np.zeros_like(psf)
psf_fit[loc[:, 0], loc[:, 1]] = psf[loc[:, 0], loc[:, 1]]
# Create x and y indices
x = np.linspace(0, psf.shape[0]-1, psf.shape[0])
y = np.linspace(0, psf.shape[1]-1, psf.shape[1])
x, y = np.meshgrid(x, y)
# Set starting point of optimiser
initial_guess = (0.5, lk/2, mk/2, 1.75, 1.4, -4.0, 0)
# Flatten the data
data = psf_fit.ravel()
# Fit the function (Gaussian for now)
popt, pcov = opt.curve_fit(twod_gaussian, (x, y), data, p0=initial_guess)
# Get function with fitted params
data_fitted = twod_gaussian((x, y), *popt)
# Normalise the psf to have a max value of one
data_fitted = data_fitted/data_fitted.max()
return data_fitted.reshape(lk, mk)

@numba.jit(nopython=True, nogil=True, cache=True)
def find_peak(residuals):
abs_residuals = residuals
min_peak = abs_residuals.min()
max_peak = abs_residuals.max()

nx, ny = abs_residuals.shape

minx, miny = -1, -1
maxx, maxy = -1, -1
peak_intensity = -1

for x in range(nx):
for y in range(ny):
intensity = abs_residuals[x, y]

if intensity == min_peak:
minx = x
miny = y

if intensity == max_peak:
maxx = x
maxy = y
peak_intensity = intensity

if minx == -1 or miny == -1:

if maxx == -1 or maxy == -1:

return maxx, maxy, minx, miny, peak_intensity

@numba.jit(nopython=True, nogil=True, cache=True)
def build_cleanmap(clean, intensity, gamma, p, q):
clean[p, q] += intensity*gamma

@numba.jit(nopython=True, nogil=True, cache=True)
def update_residual(residual, intensity, gamma, p, q, npix, psf):
npix = residual.shape[0]  # Assuming square image
residual -= gamma*intensity*psf[npix - 1 - p:2*npix - 1 - p,
npix - 1 - q:2*npix - 1 - q]

[docs]def hogbom_clean(dirty, psf,
gamma=0.1,
threshold="default",
niter="default"):
"""
Performs Hogbom Clean on the  ``dirty`` image given the ``psf``.

Parameters
----------
dirty : np.ndarray
float64 dirty image of shape (ny, nx)
psf : np.ndarray
float64 Point Spread Function of shape (2*ny, 2*nx)
gamma (optional) float
the gain factor (must be less than one)
threshold (optional) : float or str
the threshold to clean to
niter (optional : integer
the maximum number of iterations allowed

Returns
-------
np.ndarray
float64 clean image of shape (ny, nx)
np.ndarray
float64 residual image of shape (ny, nx)
"""
# deep copy dirties to first residuals,
# want to keep the original dirty maps
residuals = dirty.copy()

# Check that psf is twice the size of residuals
if (psf.shape[0] != 2*residuals.shape[0] or
psf.shape[1] != 2*residuals.shape[1]):
raise ValueError("Warning psf not right size")

# Initialise array to store cleaned image
clean = np.zeros_like(residuals)

assert clean.shape[0] == clean.shape[1]
npix = clean.shape[0]

if niter == "default":
niter = 3*npix

p, q, pmin, qmin, intensity = find_peak(residuals)

if threshold == "default":
# Imin + 0.001*(intensity - Imin)
threshold = 0.2*np.abs(intensity)
logging.info("Threshold set at %s", threshold)
else:
# Imin + 0.001*(intensity - Imin)
threshold = threshold*np.abs(intensity)
logging.info("Assuming user set threshold at %s", threshold)

# CLEAN the image
i = 0

while np.abs(intensity) > threshold and i <= niter:
logging.info("min %f max %f peak %f threshold %f" %
(residuals.min(), residuals.max(), intensity, threshold))

# First we set the
build_cleanmap(clean, intensity, gamma, p, q)
# Subtract out pixel
update_residual(residuals, intensity, gamma, p, q, npix, psf)
# Get new indices where residuals is max
p, q, _, _, intensity = find_peak(residuals)
# Increment counter
i += 1
# Warn if niter exceeded
if i > niter:
logging.warn("Number of iterations exceeded")
logging.warn("Minimum residuals = %s", residuals.max())

logging.info("Done cleaning after %d iterations.", i)

return clean, residuals

@requires_optional("scipy", opt_import_err)
def restore(clean, psf, residuals):
"""
Parameters
----------
clean : np.ndarray
float64 clean image of shape (ny, nx)
psf : np.ndarray
float64 Point Spread Function of shape (2*ny, 2*nx)
residuals : np.ndarray
float64 residual image of shape (ny, nx)

Returns
-------
np.ndarray
float64 Restored image of shape (ny, nx)
np.ndarray
float64 Convolved model of shape (ny, nx)
"""

logging.info("Fitting 2D Gaussian")

# get the ideal beam (fit 2D Gaussian to HWFH of psf)
clean_beam = fit_2d_gaussian(psf)

logging.info("Convolving")

# cval=0.0) #Fast using fft
iconv_model = scipy.signal.fftconvolve(clean, clean_beam, mode='same')

logging.info("Convolving done")

# Finally we add the residuals back to the image
restored = iconv_model + residuals

return (restored, iconv_model)

if __name__ == "__main__":
pass
```