#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from itertools import product
import numpy as np
try:
from scipy import interpolate
from scipy.ndimage import interpolation
except ImportError as e:
scipy_import_error = e
else:
scipy_import_error = None
from africanus.util.requirements import requires_optional
[docs]@requires_optional("scipy", scipy_import_error)
def beam_cube_dde(beam, coords, l_grid, m_grid, freq_grid,
spline_order=1, mode='nearest'):
"""
Computes Direction Dependent Effects (E) by sampling
complex values in ``beam`` at the coordinates ``coords``.
Both real and imaginary beam values are sampled at
the given coordinates and normalised to form a
`mean of circular quantities
<https://en.wikipedia.org/wiki/Mean_of_circular_quantities>`_.
``l_grid``, ``m_grid`` and ``freq_grid`` can be obtained from
:func:`~africanus.util.beams.beam_grids`.
Parameters
----------
beam : :class:`numpy.ndarray`
complex beam cube of shape
:code:`(beam_lw, beam_mh, beam_nud, corr_1, corr_2)` where
``beam_lw`` is the grid width of the l dimension,
``beam_mh`` is the grid height of the m dimension and
``beam_nud`` is the grid depth of the frequency dimension.
Either ``corr_1`` or both ``corr_1`` and ``corr_2`` may be
present, representing 1, 2 or 2x2 correlations respectively.
coords : :class:`numpy.ndarray`
beam cube coordinates of shape :code:`(coords, dim_1, ..., dim_n)`
where ``coord`` always has size 3 and refers to `(l,m,frequency)`.
l_grid : :class:`numpy.ndarray`
Monotonically *increasing* or *decreasing* grid values for
the l axis, with shape :code:`(beam_lw,)`.
If decreasing, the
m_grid : :class:`numpy.ndarray`
Monotonically *increasing* or *decreasing* grid values for
the m axis, with shape :code:`(beam_mh,)`
freq_grid : :class:`numpy.ndarray`
Monotonically increasing grid values for the frequency axis,
with shape :code:`(beam_nud,)`
spline_order : int
Spline order to use in
:func:`scipy.ndimage.interpolation.map_coordinates`.
Defaults to 1 ('linear')
mode : str
Border mode to use in
:func:`scipy.ndimage.interpolation.map_coordinates`
Defaults to 'nearest'
Returns
-------
ddes : :class:`numpy.ndarray`
Sampled complex beam values at the specified coordinates with
shape :code:`(dim_1, ..., dim_n, corr_1, corr_2)`
"""
if not np.iscomplexobj(beam):
raise ValueError("beam is not complex")
l_diff = np.diff(l_grid)
l_inc = np.all(l_diff > 0)
l_dec = np.all(l_diff < 0)
if not (l_inc or l_dec):
raise ValueError("l_grid is not monotonically increasing/decreasing")
m_diff = np.diff(m_grid)
m_inc = np.all(m_diff > 0)
m_dec = np.all(m_diff < 0)
if not (m_inc or m_dec):
raise ValueError("m_grid is not monotonically increasing/decreasing")
freq_diff = np.diff(freq_grid)
freq_inc = np.all(freq_diff > 0)
if not freq_inc:
raise ValueError("freq_grid is not monotonically increasing")
# interp1d works on monotically increasing/decreasing values
#
# .. code-block:: python
#
# values = np.asarray([1.0, 0.7, 0.2, 0.0, -0.4, -1.0])
# values = np.flipud(values)
# grid = np.arange(values.size)
#
# initial = np.stack((values, grid))
# interp = interp1d(values, grid, bounds_error=False,
# fill_value='extrapolate')
# assert np.all(initial == np.stack((values,interp(values))))
l_interp = interpolate.interp1d(l_grid, np.arange(l_grid.size),
'linear', bounds_error=False,
fill_value='extrapolate',
assume_sorted=l_inc)
m_interp = interpolate.interp1d(m_grid, np.arange(m_grid.size),
'linear', bounds_error=False,
fill_value='extrapolate',
assume_sorted=m_inc)
freq_interp = interpolate.interp1d(freq_grid, np.arange(freq_grid.size),
'linear', bounds_error=False,
fill_value='extrapolate',
assume_sorted=True)
head, tail = coords.shape[0], coords.shape[1:]
if not head == 3:
raise ValueError("coord axis must have size 3 "
"representing l, m and frequency")
# Flatten coordinates
coords = coords.reshape(head, -1)
# TODO(sjperkins)
# This scaling code might actually be more suited
# to transform_sources.
# LM coordinates must be scaled if
# they lie outside the beam cube.
# Check for frequency coordinates (index 2)
# that lie below or above
below = coords[2, :] < freq_grid[0]
above = coords[2, :] > freq_grid[-1]
# Scaling factors for frequencies above and below
below_scale = coords[2, below] / freq_grid[0]
above_scale = coords[2, above] / freq_grid[-1]
# Now scale L and M (0 and 1) by frequency scaling
coords[0:2, below] *= below_scale
coords[0:2, above] *= above_scale
# Convert to grid coordinates
grid_coords = np.empty_like(coords)
grid_coords[0, :] = l_interp(coords[0, :])
grid_coords[1, :] = m_interp(coords[1, :])
grid_coords[2, :] = freq_interp(coords[2, :])
# Create beam and result indices
corr_dims = beam.shape[3:]
all_ = slice(None)
result_all = tuple(all_ for _ in range(len(tail)))
corr_indices = list(product(*(range(d) for d in corr_dims)))
if len(corr_indices) > 0:
beam_indices = tuple((all_,) + cp for cp in corr_indices)
result_indices = tuple(result_all + cp for cp in corr_indices)
else:
beam_indices = ((all_,),)
result_indices = ((result_all,),)
# Allocate output array
result = np.empty(tail + corr_dims, dtype=beam.dtype)
prefilter = spline_order == 1
# For each correlation
for bi, ri in zip(beam_indices, result_indices):
re = result[ri].real
im = result[ri].imag
# Interpolate real and imaginary beams
re.flat[:] = interpolation.map_coordinates(beam[bi].real, grid_coords,
order=spline_order,
prefilter=prefilter,
mode=mode)
im.flat[:] = interpolation.map_coordinates(beam[bi].imag, grid_coords,
order=spline_order,
prefilter=prefilter,
mode=mode)
# This computes a mean of circular quantities
# and the following should hold
#
# .. code-block:: python
#
# phase = np.arctan2(re, im)
# re == np.cos(phase)
# im == np.sin(phase)
# Compute the amplitude
amplitude = np.sqrt(re**2 + im**2)
# Handle divide by zero when normalising
amplitude[amplitude == 0.0] = 1.0
# Normalise real and imaginary components
re /= amplitude
im /= amplitude
return result