# -*- coding: utf-8 -*-
from numba import types
import numpy as np
from africanus.util.numba import JIT_OPTIONS, njit, overload
from africanus.util.docs import DocstringTemplate
def ordinary_spectral_model(I, coeffs, log_poly, ref_freq, freq): # noqa: E741
"""Numpy ordinary polynomial implementation"""
coeffs_idx = np.arange(1, coeffs.shape[1] + 1)
# (source, chan, coeffs-comp)
term = (freq[None, :, None] / ref_freq[:, None, None]) - 1.0
term = term ** coeffs_idx[None, None, :]
term = coeffs[:, None, :] * term
return I[:, None] + term.sum(axis=2)
def log_spectral_model(I, coeffs, log_poly, ref_freq, freq): # noqa: E741
"""Numpy logarithmic polynomial implementation"""
coeffs_idx = np.arange(1, coeffs.shape[1] + 1)
# (source, chan, coeffs-comp)
term = np.log(freq[None, :, None] / ref_freq[:, None, None])
term = term ** coeffs_idx[None, None, :]
term = coeffs[:, None, :] * term
return I[:, None] * np.exp(term.sum(axis=2))
def _check_log_poly_shape(coeffs, log_poly):
raise NotImplementedError
@overload(_check_log_poly_shape)
def overload_check_log_poly_shape(coeffs, log_poly):
if isinstance(log_poly, types.npytypes.Array):
def impl(coeffs, log_poly):
if coeffs.shape[0] != log_poly.shape[0]:
raise ValueError("coeffs.shape[0] != log_poly.shape[0]")
elif isinstance(log_poly, types.scalars.Boolean):
def impl(coeffs, log_poly):
pass
else:
raise ValueError("log_poly must be ndarray or bool")
return impl
def _log_polynomial(log_poly, s):
raise NotImplementedError
@overload(_log_polynomial)
def overload_log_polynomial(log_poly, s):
if isinstance(log_poly, types.npytypes.Array):
def impl(log_poly, s):
return log_poly[s]
elif isinstance(log_poly, types.scalars.Boolean):
def impl(log_poly, s):
return log_poly
else:
raise ValueError("log_poly must be ndarray or bool")
return impl
[docs]
@njit(**JIT_OPTIONS)
def spectra(I, coeffs, log_poly, ref_freq, frequency): # noqa: E741
return spectra_impl(I, coeffs, log_poly, ref_freq, frequency)
def spectra_impl(I, coeffs, log_poly, ref_freq, frequency): # noqa: E741
raise NotImplementedError
@overload(spectra_impl, jit_option=JIT_OPTIONS)
def nb_spectra(I, coeffs, log_poly, ref_freq, frequency): # noqa: E741
arg_dtypes = tuple(np.dtype(a.dtype.name) for a in (I, coeffs, ref_freq, frequency))
dtype = np.result_type(*arg_dtypes)
def impl(I, coeffs, log_poly, ref_freq, frequency): # noqa: E741
if not (I.shape[0] == coeffs.shape[0] == ref_freq.shape[0]):
print(I.shape, coeffs.shape, ref_freq.shape)
raise ValueError(
"first dimensions of I, coeffs " "and ref_freq don't match."
)
_check_log_poly_shape(coeffs, log_poly)
nsrc = I.shape[0]
nchan = frequency.shape[0]
ncoeffs = coeffs.shape[1]
spectral_model = np.empty((nsrc, nchan), dtype=dtype)
for s in range(nsrc):
rf = ref_freq[s]
if _log_polynomial(log_poly, s):
for f in range(frequency.shape[0]):
nu = frequency[f]
# Initialise with base polynomial value
spectral_model[s, f] = 0
for c in range(ncoeffs):
term = coeffs[s, c] * np.log(nu / rf) ** (c + 1)
spectral_model[s, f] += term
spectral_model[s, f] = I[s] * np.exp(spectral_model[s, f])
else:
for f in range(frequency.shape[0]):
nu = frequency[f]
# Initialise with base polynomial value
spectral_model[s, f] = I[s]
for c in range(ncoeffs):
term = coeffs[s, c]
term *= ((nu / rf) - 1.0) ** (c + 1)
spectral_model[s, f] += term
return spectral_model
return impl
SPECTRA_DOCS = DocstringTemplate(
r"""
Produces a spectral model from a polynomial expansion of
a wsclean file model. Depending on how `log_poly` is set
ordinary or logarithmic polynomials are used to produce
the expansion:
.. math::
& flux(\lambda) =
I_{0} + \sum\limits_{c=0} \textrm{coeffs}(c)
({\lambda/\lambda_{ref}} - 1)^{c+1}
\\
& flux(\lambda) =
\exp \left( \log I_{0} +
\sum\limits_{c=0} \textrm{coeffs}(c)
\log({\lambda/\lambda_{ref}})^{c+1}
\right) \\
See the `WSClean Component List
<https://sourceforge.net/p/wsclean/wiki/ComponentList/>`_
for further details.
Parameters
----------
I : $(array_type)
flux density in Janskys at the reference frequency
of shape :code:`(source,)`
coeffs : $(array_type)
Polynomial coefficients for each source of
shape :code:`(source, comp)`
log_poly : $(array_type) or bool
boolean array of shape :code:`(source, )`
indicating whether logarithmic (True) or ordinary (False)
polynomials should be used.
ref_freq : $(array_type)
Source reference frequencies of shape :code:`(source,)`
frequency : $(array_type)
frequencies of shape :code:`(chan,)`
See Also
--------
africanus.model.wsclean.load
Returns
-------
spectral_model : $(array_type)
Spectral Model of shape :code:`(source, chan)`
"""
)
try:
spectra.__doc__ = SPECTRA_DOCS.substitute(array_type=":class:`numpy.ndarray`")
except AttributeError:
pass