Source code for africanus.experimental.rime.fused.transformers.parangle
import numpy as np
from numba.core import cgutils, types, errors
from numba import njit, objmode
from africanus.rime.parangles_casa import casa_parallactic_angles
from africanus.experimental.rime.fused.transformers.core import Transformer
[docs]class ParallacticTransformer(Transformer):
OUTPUTS = ["feed_parangle", "beam_parangle"]
def __init__(self, process_pool):
self.pool = process_pool
def init_fields(self, typingctx,
utime, ufeed, uantenna,
antenna_position, phase_dir,
receptor_angle=None):
dt = typingctx.unify_types(utime.dtype, ufeed.dtype,
antenna_position.dtype,
phase_dir.dtype)
fields = [
("feed_parangle", dt[:, :, :, :, :]),
("beam_parangle", dt[:, :, :, :])]
parangle_dt = types.Array(types.float64, 2, "C")
have_ra = not cgutils.is_nonelike(receptor_angle)
if have_ra and (not isinstance(receptor_angle, types.Array) or
receptor_angle.ndim != 2):
raise errors.TypingError("receptor_angle must be a 2D array")
@njit(inline="never")
def parangle_stub(time, antenna, phase_dir):
with objmode(out=parangle_dt):
out = self.pool.apply(casa_parallactic_angles,
(time, antenna, phase_dir))
return out
def parangles(utime, ufeed, uantenna,
antenna_position, phase_dir,
receptor_angle=None):
ntime, = utime.shape
nant, = uantenna.shape
nfeed, = ufeed.shape
# Select out the antennae we're interested in
antenna_position = antenna_position[uantenna]
parangles = parangle_stub(utime, antenna_position, phase_dir)
feed_pa = np.empty((ntime, nfeed, nant, 2, 2), parangles.dtype)
beam_pa = np.empty((ntime, nfeed, nant, 2), parangles.dtype)
if have_ra:
if receptor_angle.ndim != 2:
raise ValueError("receptor_angle.ndim != 2")
if receptor_angle.shape[1] != 2:
raise ValueError("Only 2 receptor angles "
"currently supported")
# Select out the feeds we're interested in
receptor_angle = receptor_angle[ufeed, :]
for t in range(ntime):
for f in range(nfeed):
ra1 = receptor_angle[f, 0] if have_ra else dt(0)
ra2 = receptor_angle[f, 1] if have_ra else dt(0)
for a in range(nant):
pa1 = parangles[t, a]
pa2 = parangles[t, a]
beam_pa[t, f, a, 0] = np.sin(pa1)
beam_pa[t, f, a, 1] = np.cos(pa1)
pa1 += ra1
pa2 += ra2
feed_pa[t, f, a, 0, 0] = np.sin(pa1)
feed_pa[t, f, a, 0, 1] = np.cos(pa1)
feed_pa[t, f, a, 1, 0] = np.sin(pa2)
feed_pa[t, f, a, 1, 1] = np.cos(pa2)
return feed_pa, beam_pa
return fields, parangles
def dask_schema(self, utime, ufeed, uantenna,
antenna_position, phase_dir,
receptor_angle=None):
dt = np.result_type(utime, ufeed, antenna_position,
phase_dir, receptor_angle)
inputs = {"antenna_position": ("antenna", "ant-comp"),
"phase_dir": ("radec",)}
if receptor_angle is not None:
inputs["receptor_angle"] = ("feed", "receptor_angle")
else:
inputs["receptor_angle"] = None
outputs = {
"feed_parangle": np.empty((0,)*5, dt),
"beam_parangle": np.empty((0,)*4, dt)
}
return inputs, outputs