Source code for africanus.rime.fast_beam_cubes

# -*- coding: utf-8 -*-

from functools import reduce

import numpy as np
from africanus.util.docs import DocstringTemplate
from africanus.util.numba import njit


@njit(nogil=True, cache=True)
def freq_grid_interp(frequency, beam_freq_map):
    # Interpolated grid coordinate
    beam_nud = beam_freq_map.shape[0]
    freq_data = np.empty((frequency.shape[0], 3),
                         dtype=frequency.dtype)

    for f in range(frequency.shape[0]):
        freq = frequency[f]
        lower = 0
        upper = beam_nud - 1

        while lower <= upper:
            mid = lower + (upper - lower) // 2
            beam_freq = beam_freq_map[mid]

            if beam_freq < freq:
                lower = mid + 1
            elif beam_freq > freq:
                upper = mid - 1
            else:
                lower = mid
                break

        # This handles the lower <= upper in the while loop
        lower = min(lower, upper)
        upper = lower + 1

        # Set up scaling, lower weight, lower grid pos
        if lower == -1:
            freq_data[f, 0] = freq / beam_freq_map[0]
            freq_data[f, 1] = 1.0
            freq_data[f, 2] = 0
        elif upper == beam_nud:
            freq_data[f, 0] = freq / beam_freq_map[beam_nud - 1]
            freq_data[f, 1] = 0.0
            freq_data[f, 2] = beam_nud - 2
        else:
            freq_data[f, 0] = 1.0
            freq_low = beam_freq_map[lower]
            freq_high = beam_freq_map[upper]
            freq_diff = freq_high - freq_low
            freq_data[f, 1] = (freq_high - freq) / freq_diff
            freq_data[f, 2] = lower

    return freq_data


[docs]@njit(nogil=True, cache=True) def beam_cube_dde(beam, beam_lm_extents, beam_freq_map, lm, parallactic_angles, point_errors, antenna_scaling, frequency): nsrc = lm.shape[0] ntime, nants = parallactic_angles.shape nchan = frequency.shape[0] beam_lw, beam_mh, beam_nud = beam.shape[:3] corrs = beam.shape[3:] if beam_lw < 2 or beam_mh < 2 or beam_nud < 2: raise ValueError("beam_lw, beam_mh and beam_nud must be >= 2") # Flatten correlations ncorrs = reduce(lambda x, y: x*y, corrs, 1) lower_l, upper_l = beam_lm_extents[0] lower_m, upper_m = beam_lm_extents[1] ex_dtype = beam_lm_extents.dtype # Maximum l and m indices in float and int lmaxf = ex_dtype.type(beam_lw - 1) mmaxf = ex_dtype.type(beam_mh - 1) lmaxi = beam_lw - 1 mmaxi = beam_mh - 1 lscale = lmaxf / (upper_l - lower_l) mscale = mmaxf / (upper_m - lower_m) one = ex_dtype.type(1) zero = ex_dtype.type(0) # Flatten the beam on correlation fbeam = beam.reshape((beam_lw, beam_mh, beam_nud, ncorrs)) # Allocate output array with correlations flattened fjones = np.empty((nsrc, ntime, nants, nchan, ncorrs), dtype=beam.dtype) # Compute frequency interpolation stuff freq_data = freq_grid_interp(frequency, beam_freq_map) corr_sum = np.zeros((ncorrs,), dtype=beam.dtype) absc_sum = np.zeros((ncorrs,), dtype=beam.real.dtype) beam_scratch = np.zeros((ncorrs,), dtype=beam.dtype) for t in range(ntime): for a in range(nants): sin_pa = np.sin(parallactic_angles[t, a]) cos_pa = np.cos(parallactic_angles[t, a]) for s in range(nsrc): # Extract lm coordinates l, m = lm[s] for f in range(nchan): # Unpack frequency data freq_scale = freq_data[f, 0] # lower and upper frequency weights nud = freq_data[f, 1] inv_nud = 1.0 - nud # lower and upper frequency grid position gc0 = np.int32(freq_data[f, 2]) gc1 = gc0 + 1 # Apply any frequency scaling sl = l * freq_scale sm = m * freq_scale # Add pointing errors tl = sl + point_errors[t, a, f, 0] tm = sm + point_errors[t, a, f, 1] # Rotate lm coordinate angle vl = tl*cos_pa - tm*sin_pa vm = tl*sin_pa + tm*cos_pa # Scale by antenna scaling vl *= antenna_scaling[a, f, 0] vm *= antenna_scaling[a, f, 1] # Shift into the cube coordinate system vl = lscale*(vl - lower_l) vm = mscale*(vm - lower_m) # Clamp the coordinates to the edges of the cube vl = max(zero, min(vl, lmaxf)) vm = max(zero, min(vm, mmaxf)) # Snap to the lower grid coordinates gl0 = np.int32(np.floor(vl)) gm0 = np.int32(np.floor(vm)) # Snap to the upper grid coordinates gl1 = min(gl0 + 1, lmaxi) gm1 = min(gm0 + 1, mmaxi) # Difference between grid and offset coordinates ld = vl - gl0 md = vm - gm0 # Zero accumulation arrays corr_sum[:] = 0 absc_sum[:] = 0 # Accumulate lower cube correlations beam_scratch[:] = fbeam[gl0, gm0, gc0, :] weight = (one - ld)*(one - md)*nud for c in range(ncorrs): absc_sum[c] += weight * np.abs(beam_scratch[c]) corr_sum[c] += weight * beam_scratch[c] beam_scratch[:] = fbeam[gl1, gm0, gc0, :] weight = ld*(one - md)*nud for c in range(ncorrs): absc_sum[c] += weight * np.abs(beam_scratch[c]) corr_sum[c] += weight * beam_scratch[c] beam_scratch[:] = fbeam[gl0, gm1, gc0, :] weight = (one - ld)*md*nud for c in range(ncorrs): absc_sum[c] += weight * np.abs(beam_scratch[c]) corr_sum[c] += weight * beam_scratch[c] beam_scratch[:] = fbeam[gl1, gm1, gc0, :] weight = ld*md*nud for c in range(ncorrs): absc_sum[c] += weight * np.abs(beam_scratch[c]) corr_sum[c] += weight * beam_scratch[c] # Accumulate upper cube correlations beam_scratch[:] = fbeam[gl0, gm0, gc1, :] weight = (one - ld)*(one - md)*inv_nud for c in range(ncorrs): absc_sum[c] += weight * np.abs(beam_scratch[c]) corr_sum[c] += weight * beam_scratch[c] beam_scratch[:] = fbeam[gl1, gm0, gc1, :] weight = ld*(one - md)*inv_nud for c in range(ncorrs): absc_sum[c] += weight * np.abs(beam_scratch[c]) corr_sum[c] += weight * beam_scratch[c] beam_scratch[:] = fbeam[gl0, gm1, gc1, :] weight = (one - ld)*md*inv_nud for c in range(ncorrs): absc_sum[c] += weight * np.abs(beam_scratch[c]) corr_sum[c] += weight * beam_scratch[c] beam_scratch[:] = fbeam[gl1, gm1, gc1, :] weight = ld*md*inv_nud for c in range(ncorrs): absc_sum[c] += weight * np.abs(beam_scratch[c]) corr_sum[c] += weight * beam_scratch[c] for c in range(ncorrs): # Added all correlations, normalise div = np.abs(corr_sum[c]) if div == 0.0: # This case probably works out to a zero assign corr_sum[c] *= absc_sum[c] else: corr_sum[c] *= absc_sum[c] / div # Assign normalised values fjones[s, t, a, f, :] = corr_sum return fjones.reshape((nsrc, ntime, nants, nchan) + corrs)
BEAM_CUBE_DOCS = DocstringTemplate( r""" Evaluates Direction Dependent Effects along a source's path by interpolating the values of a complex beam cube at the source location. Notes ----- 1. Sources are clamped to the provided `beam_lm_extents`. 2. Frequencies outside the cube (i.e. outside beam_freq_map) introduce linear scaling to the lm coordinates of a source. Parameters ---------- beam : $(array_type) Complex beam cube of shape :code:`(beam_lw, beam_mh, beam_nud, corr, corr)`. `beam_lw`, `beam_mh` and `beam_nud` define the size of the cube in the l, m and frequency dimensions, respectively. beam_lm_extents : $(array_type) lm extents of the beam cube of shape :code:`(2, 2)`. ``[[lower_l, upper_l], [lower_m, upper_m]]``. beam_freq_map : $(array_type) Beam frequency map of shape :code:`(beam_nud,)`. This array is used to define interpolation along the :code:`(chan,)` dimension. lm : $(array_type) Source lm coordinates of shape :code:`(source, 2)`. These coordinates are: 1. Scaled if the associated frequency lies outside the beam cube. 2. Offset by pointing errors: ``point_errors`` 3. Rotated by parallactic angles: ``parallactic_angles``. 4. Scaled by antenna scaling factors: ``antenna_scaling``. parallactic_angles : $(array_type) Parallactic angles of shape :code:`(time, ant)`. point_errors : $(array_type) Pointing errors of shape :code:`(time, ant, chan, 2)`. antenna_scaling : $(array_type) Antenna scaling factors of shape :code:`(ant, chan, 2)` frequency : $(array_type) Frequencies of shape :code:`(chan,)`. Returns ------- ddes : $(array_type) Direction Dependent Effects of shape :code:`(source, time, ant, chan, corr, corr)` """) try: beam_cube_dde.__doc__ = BEAM_CUBE_DOCS.substitute( array_type=":class:`numpy.ndarray`") except AttributeError: pass