Source code for africanus.experimental.rime.fused.terms.gaussian

from africanus.constants import c as lightspeed
import numpy as np

from africanus.experimental.rime.fused.terms.core import Term


[docs]class Gaussian(Term): """Gaussian Amplitude Term""" def dask_schema(self, uvw, chan_freq, gauss_shape): assert uvw.ndim == 2 assert chan_freq.ndim == 1 assert gauss_shape.ndim == 2 return {"uvw": ("row", "uvw"), "chan_freq": ("chan",), "gauss_shape": ("source", "gauss_shape_params")} def init_fields(self, typingctx, uvw, chan_freq, gauss_shape): guv_dtype = typingctx.unify_types( uvw.dtype, chan_freq.dtype, gauss_shape.dtype) fields = [("gauss_uv", guv_dtype[:, :, :]), ("scaled_freq", chan_freq)] fwhm = 2.0 * np.sqrt(2.0 * np.log(2.0)) fwhminv = 1.0 / fwhm gauss_scale = fwhminv * np.sqrt(2.0) * np.pi / lightspeed def gaussian_init(uvw, chan_freq, gauss_shape): nsrc, _ = gauss_shape.shape nrow, _ = uvw.shape gauss_uv = np.empty((nsrc, nrow, 2), dtype=guv_dtype) scaled_freq = chan_freq*gauss_scale for s in range(nsrc): emaj, emin, angle = gauss_shape[s] # Convert to l-projection, m-projection, ratio el = emaj * np.sin(angle) em = emaj * np.cos(angle) er = emin / (1.0 if emaj == 0.0 else emaj) for r in range(uvw.shape[0]): u = uvw[r, 0] v = uvw[r, 1] gauss_uv[s, r, 0] = (u*em - v*el)*er gauss_uv[s, r, 1] = u*el + v*em return gauss_uv, scaled_freq return fields, gaussian_init def sampler(self): def gaussian_sample(state, s, r, t, f1, f2, a1, a2, c): fu1 = state.gauss_uv[s, r, 0] * state.scaled_freq[c] fv1 = state.gauss_uv[s, r, 1] * state.scaled_freq[c] return np.exp(-(fu1*fu1 + fv1*fv1)) return gaussian_sample