Source code for africanus.util.beams

# -*- coding: utf-8 -*-


from collections import OrderedDict
import string
import re

import numpy as np

from africanus.util.casa_types import STOKES_ID_MAP


class FitsAxes(object):
    """
    FitsAxes object, inspired by Tigger's FITSAxes
    """

    def __init__(self, header=None):
        # Create an zero-dimensional object if no header supplied
        self._ndims = ndims = 0 if header is None else header["NAXIS"]

        # Extract header information for each dimension
        axr = list(range(1, ndims + 1))
        self._naxis = [header.get("NAXIS%d" % n) for n in axr]
        self._ctype = [header.get("CTYPE%d" % n, n).strip() for n in axr]
        self._crval = [header.get("CRVAL%d" % n, 0) for n in axr]
        # Convert right pixel from FORTRAN to C indexing
        self._crpix = [header["CRPIX%d" % n] - 1 for n in axr]
        self._cdelt = [header.get("CDELT%d" % n, 1) for n in axr]
        self._cunit = [header.get("CUNIT%d" % n, "").strip().upper() for n in axr]


def axis_and_sign(ax_str, default=None):
    """Extract axis and sign from given axis string"""
    if not ax_str:
        if default:
            return default, 1.0

        raise ValueError("Need default if ax_str is None")

    if not isinstance(ax_str, str):
        raise TypeError("ax_str must be a string")

    return (ax_str[1:], -1.0) if ax_str[0] == "-" else (ax_str, 1.0)


class BeamAxes(FitsAxes):
    """
    Describes the FITS axes of a BEAM cube.

    In general, FORTRAN ordered indices are
    converted to C ordering
    (CRPIX and the individual axis indices)

    Any degree axes are converted to radians and
    grids for each axis are created.

    Inversions of the L, M, X and Y grids are supported
    if a minus sign is detected before the CUNIT in
    the FITS header.

    Parameters
    ----------
    header : dict, optional
        FITS file header
    """

    def __init__(self, header=None):
        super(BeamAxes, self).__init__(header)
        # Check for custom irregular grid format.
        # Currently only implemented for FREQ dimension.
        irregular_grid = [
            np.asarray(
                [
                    header.get("G%s%d" % (self._ctype[i], j), None)
                    for j in range(1, self._naxis[i] + 1)
                ]
            )
            for i in range(self._ndims)
        ]

        # Irregular grids are only valid if values exist for all grid points
        self._irreg = [
            all(x is not None for x in irregular_grid[i]) for i in range(self._ndims)
        ]

        def _regular_grid(i):
            """Construct a regular grid from a FitsAxes object and index"""
            R = np.arange(0.0, float(self._naxis[i]))
            return (R - self._crpix[i]) * self._cdelt[i] + self._crval[i]

        self._grid = [None] * self._ndims

        for i in range(self._ndims):
            # Convert any degree axes to radians
            if self._cunit[i] == "DEG":
                self._cunit[i] = "RAD"
                self._crval[i] = np.deg2rad(self._crval[i])
                self._cdelt[i] = np.deg2rad(self._cdelt[i])

            # Set up the grid
            self._grid[i] = (
                _regular_grid(i) if not self._irreg[i] else irregular_grid[i]
            )

    @property
    def ndims(self):
        return self._ndims

    @property
    def crpix(self):
        return self._crpix

    @property
    def naxis(self):
        return self._naxis

    @property
    def crval(self):
        return self._crval

    @property
    def cdelt(self):
        return self._cdelt

    @property
    def cunit(self):
        return self._cunit

    @property
    def ctype(self):
        return self._ctype

    @property
    def grid(self):
        return self._grid


[docs] def beam_grids(header, l_axis=None, m_axis=None): """ Extracts the FITS indices and grids for the beam dimensions in the supplied FITS ``header``. Specifically the axes specified by 1. ``L`` or ``X`` CTYPE 2. ``M`` or ``Y`` CTYPE 3. ``FREQ`` CTYPE If the first two axes have a negative sign, such as ``-L``, the grid will be inverted. Any grids corresponding to axes with a CUNIT type of ``DEG`` will be converted to radians. Parameters ---------- header : :class:`~astropy.io.fits.Header` or dict FITS header object. l_axis : str FITS axis interpreted as the L axis. `L` and `X` are sensible values here. `-L` will invert the coordinate system on that axis. m_axis : str FITS axis interpreted as the M axis. `M` and `Y` are sensible values here. `-M` will invert the coordinate system on that axis. Returns ------- tuple Returns ((l_axis, l_grid), (m_axis, m_grid), (freq_axis, freq_grid)) where the axis is the FORTRAN indexed FITS axis (1-indexed) and grid contains the values at each pixel along the axis. """ beam_axes = BeamAxes(header) l = m = freq = None # noqa # Find the relevant axes for i in range(beam_axes.ndims): if beam_axes.ctype[i].upper() in ("L", "X", "PX"): l = i # noqa elif beam_axes.ctype[i].upper() in ("M", "Y", "PY"): m = i elif beam_axes.ctype[i] == "FREQ": freq = i # Complain if not found if l is None: raise ValueError("No L/X/PX axis present in FITS header") if m is None: raise ValueError("No M/Y/PY axis present in FITS header") if freq is None: raise ValueError("No FREQ axis present in FITS header") # Sign of L/M axes? l_sign = axis_and_sign(l_axis, "L")[1] m_sign = axis_and_sign(m_axis, "M")[1] # Obtain axes grids l_grid = beam_axes.grid[l] * l_sign m_grid = beam_axes.grid[m] * m_sign freq_grid = beam_axes.grid[freq] return ((l + 1, l_grid), (m + 1, m_grid), (freq + 1, freq_grid))
class FitsFilenameTemplate(string.Template): """ Overrides the ${identifer} braced pattern in the string Template with a $(identifier) braced pattern expected by FITS beam filename schema """ pattern = r""" %(delim)s(?: (?P<escaped>%(delim)s) | # Escape sequence of two delimiters (?P<named>%(id)s) | # delimiter and a Python identifier \((?P<braced>%(id)s)\) | # delimiter and a braced identifier (?P<invalid>) # Other ill-formed delimiter exprs ) """ % { "delim": re.escape(string.Template.delimiter), "id": string.Template.idpattern, } CIRCULAR_CORRELATIONS = ("rr", "rl", "lr", "ll") LINEAR_CORRELATIONS = ("xx", "xy", "yx", "yy") REIM = ("re", "im") def _re_im_filenames(corr, template): filenames = [] for ri in REIM: try: filename = template.substitute( corr=corr.lower(), CORR=corr.upper(), reim=ri.lower(), REIM=ri.upper() ) except KeyError: raise ValueError( "Invalid filename schema '%s'. " "FITS Beam filename schemas " "must follow forms such as " "'beam_$(corr)_$(reim).fits' or " "'beam_$(CORR)_$(REIM).fits." % template.template ) else: filenames.append(filename) return filenames
[docs] def beam_filenames(filename_schema, corr_types): """ Returns a dictionary of beam filename pairs, keyed on correlation,from the cartesian product of correlations and real, imaginary pairs Given ``beam_$(corr)_$(reim).fits`` returns: .. code-block:: python { 'xx' : ['beam_xx_re.fits', 'beam_xx_im.fits'], 'xy' : ['beam_xy_re.fits', 'beam_xy_im.fits'], ... 'yy' : ['beam_yy_re.fits', 'beam_yy_im.fits'], } Given ``beam_$(CORR)_$(REIM).fits`` returns: .. code-block:: python { 'xx' : ['beam_XX_RE.fits', 'beam_XX_IM.fits'], 'xy' : ['beam_XY_RE.fits', 'beam_XY_IM.fits'], ... 'yy' : ['beam_YY_RE.fits', 'beam_YY_IM.fits']), } Parameters ---------- filename_schema : str String containing the filename schema. corr_types : list of integers list of integers defining the correlation type. Returns ------- dict Dictionary of schema ``{correlation : (refile, imfile)}`` mapping correlations to real and imaginary filename pairs """ template = FitsFilenameTemplate(filename_schema) corr_names = [] for corr_type in corr_types: try: corr_name = STOKES_ID_MAP[corr_type] except KeyError: raise ValueError("Unknown Stokes ID %d" % corr_type) else: corr_names.append(corr_name.lower()) return OrderedDict((c, _re_im_filenames(c, template)) for c in corr_names)