Source code for africanus.experimental.rime.fused.transformers.parangle

import numpy as np
from numba.core import cgutils, types, errors
from numba import njit, objmode

from africanus.rime.parangles_casa import casa_parallactic_angles
from africanus.experimental.rime.fused.transformers.core import Transformer


[docs] class ParallacticTransformer(Transformer): OUTPUTS = ["feed_parangle", "beam_parangle"] def __init__(self, process_pool): self.pool = process_pool def init_fields( self, typingctx, init_state, antenna_position, phase_dir, receptor_angle=None, ): fdict = init_state.field_dict dt = typingctx.unify_types( fdict["utime"].dtype, fdict["ufeed"].dtype, antenna_position.dtype, phase_dir.dtype, ) fields = [ ("feed_parangle", dt[:, :, :, :, :]), ("beam_parangle", dt[:, :, :, :]), ] parangle_dt = types.Array(types.float64, 2, "C") have_ra = not cgutils.is_nonelike(receptor_angle) if have_ra and ( not isinstance(receptor_angle, types.Array) or receptor_angle.ndim != 2 ): raise errors.TypingError("receptor_angle must be a 2D array") @njit(inline="never") def parangle_stub(time, antenna, phase_dir): with objmode(out=parangle_dt): out = self.pool.apply( casa_parallactic_angles, (time, antenna, phase_dir) ) return out def parangles(init_state, antenna_position, phase_dir, receptor_angle=None): (ntime,) = init_state.utime.shape (nant,) = init_state.uantenna.shape (nfeed,) = init_state.ufeed.shape # Select out the antennae we're interested in antenna_position = antenna_position[init_state.uantenna] parangles = parangle_stub(init_state.utime, antenna_position, phase_dir) feed_pa = np.empty((ntime, nfeed, nant, 2, 2), parangles.dtype) beam_pa = np.empty((ntime, nfeed, nant, 2), parangles.dtype) if have_ra: if receptor_angle.ndim != 2: raise ValueError("receptor_angle.ndim != 2") if receptor_angle.shape[1] != 2: raise ValueError("Only 2 receptor angles currently supported") # Select out the feeds we're interested in receptor_angle = receptor_angle[init_state.ufeed, :] for t in range(ntime): for f in range(nfeed): ra1 = receptor_angle[f, 0] if have_ra else dt(0) ra2 = receptor_angle[f, 1] if have_ra else dt(0) for a in range(nant): pa1 = parangles[t, a] pa2 = parangles[t, a] beam_pa[t, f, a, 0] = np.sin(pa1) beam_pa[t, f, a, 1] = np.cos(pa1) pa1 += ra1 pa2 += ra2 feed_pa[t, f, a, 0, 0] = np.sin(pa1) feed_pa[t, f, a, 0, 1] = np.cos(pa1) feed_pa[t, f, a, 1, 0] = np.sin(pa2) feed_pa[t, f, a, 1, 1] = np.cos(pa2) return feed_pa, beam_pa return fields, parangles def dask_schema(self, antenna_position, phase_dir, receptor_angle=None): dt = np.result_type(antenna_position, phase_dir, receptor_angle) inputs = {"antenna_position": ("antenna", "ant-comp"), "phase_dir": ("radec",)} if receptor_angle is not None: inputs["receptor_angle"] = ("feed", "receptor_angle") else: inputs["receptor_angle"] = None outputs = { "feed_parangle": np.empty((0,) * 5, dt), "beam_parangle": np.empty((0,) * 4, dt), } return inputs, outputs