Source code for africanus.averaging.bda_avg

# -*- coding: utf-8 -*-

from collections import namedtuple

import numpy as np

from africanus.averaging.bda_mapping import (bda_mapper,
                                             RowMapOutput)
from africanus.averaging.shared import (chan_corrs,
                                        merge_flags,
                                        vis_output_arrays)
from africanus.util.docs import DocstringTemplate
from africanus.util.numba import (generated_jit,
                                  intrinsic,
                                  is_numba_type_none)


_row_output_fields = ["antenna1", "antenna2", "time_centroid", "exposure",
                      "uvw", "weight", "sigma"]
RowAverageOutput = namedtuple("RowAverageOutput", _row_output_fields)


@generated_jit(nopython=True, nogil=True, cache=True)
def row_average(meta, ant1, ant2, flag_row=None,
                time_centroid=None, exposure=None, uvw=None,
                weight=None, sigma=None):

    have_flag_row = not is_numba_type_none(flag_row)
    have_time_centroid = not is_numba_type_none(time_centroid)
    have_exposure = not is_numba_type_none(exposure)
    have_uvw = not is_numba_type_none(uvw)
    have_weight = not is_numba_type_none(weight)
    have_sigma = not is_numba_type_none(sigma)

    def impl(meta, ant1, ant2, flag_row=None,
             time_centroid=None, exposure=None, uvw=None,
             weight=None, sigma=None):

        out_rows = meta.time.shape[0]
        counts = np.zeros(out_rows, dtype=np.uint32)

        # These outputs are always present
        ant1_avg = np.empty(out_rows, ant1.dtype)
        ant2_avg = np.empty(out_rows, ant2.dtype)

        # Possibly present outputs for possibly present inputs
        uvw_avg = (
            None if not have_uvw else
            np.zeros((out_rows,) + uvw.shape[1:],
                     dtype=uvw.dtype))

        time_centroid_avg = (
            None if not have_time_centroid else
            np.zeros((out_rows,) + time_centroid.shape[1:],
                     dtype=time_centroid.dtype))

        exposure_avg = (
            None if not have_exposure else
            np.zeros((out_rows,) + exposure.shape[1:],
                     dtype=exposure.dtype))

        weight_avg = (
            None if not have_weight else
            np.zeros((out_rows,) + weight.shape[1:],
                     dtype=weight.dtype))

        sigma_avg = (
            None if not have_sigma else
            np.zeros((out_rows,) + sigma.shape[1:],
                     dtype=sigma.dtype))

        sigma_weight_sum = (
            None if not have_sigma else
            np.zeros((out_rows,) + sigma.shape[1:],
                     dtype=sigma.dtype))

        # Average each array, if present
        # The output is a flattened row-channel array
        # where the values for each row are repeated along the channel
        # Individual runs in this output are described by meta.offset
        # Thus, we only compute the sum in the first position
        for ri in range(meta.map.shape[0]):
            ro = meta.map[ri, 0]

            # Here we can simply assign because input_row baselines
            # should always match output row baselines
            ant1_avg[ro] = ant1[ri]
            ant2_avg[ro] = ant2[ri]

            # Input and output flags must match in order for the
            # current row to contribute to these columns
            if have_flag_row and flag_row[ri] != meta.flag_row[ro]:
                continue

            counts[ro] += 1

            if have_uvw:
                uvw_avg[ro, 0] += uvw[ri, 0]
                uvw_avg[ro, 1] += uvw[ri, 1]
                uvw_avg[ro, 2] += uvw[ri, 2]

            if have_time_centroid:
                time_centroid_avg[ro] += time_centroid[ri]

            if have_exposure:
                exposure_avg[ro] += exposure[ri]

            if have_weight:
                for co in range(weight.shape[1]):
                    weight_avg[ro, co] += weight[ri, co]

            if have_sigma:
                for co in range(sigma.shape[1]):
                    # Use weights if present else natural weights
                    wt = weight[ri, co] if have_weight else 1.0

                    # Aggregate
                    sigma_avg[ro, co] += sigma[ri, co]**2 * wt**2
                    sigma_weight_sum[ro, co] += wt

        # Normalise and copy
        for o in range(len(meta.offsets) - 1):
            bro = meta.offsets[o]
            nchan = meta.offsets[o + 1] - bro
            count = counts[bro]

            for c in range(1, nchan):
                ant1_avg[bro + c] = ant1_avg[bro]
                ant2_avg[bro + c] = ant2_avg[bro]

            if have_uvw:
                # Normalise channel 0 value
                if count > 0:
                    uvw_avg[bro, 0] /= count
                    uvw_avg[bro, 1] /= count
                    uvw_avg[bro, 2] /= count

                # Copy to other channels
                for c in range(1, nchan):
                    uvw_avg[bro + c, 0] += uvw_avg[bro, 0]
                    uvw_avg[bro + c, 1] += uvw_avg[bro, 1]
                    uvw_avg[bro + c, 2] += uvw_avg[bro, 2]

            if have_time_centroid:
                # Normalise channel 0 value
                if count > 0:
                    time_centroid_avg[bro] /= count

                # Copy to other channels
                for c in range(1, nchan):
                    time_centroid_avg[bro + c] = time_centroid_avg[bro]

            if have_exposure:
                # Copy to other channels
                for c in range(1, nchan):
                    exposure_avg[bro + c] = exposure_avg[bro]

            if have_weight:
                # Copy to other channels
                for c in range(1, nchan):
                    for co in range(weight.shape[1]):
                        weight_avg[bro + c, co] = weight_avg[bro, co]

            if have_sigma:
                # Normalise channel 0 values
                for co in range(sigma.shape[1]):
                    sswsum = sigma_weight_sum[bro, co]

                    if sswsum != 0.0:
                        ssva = sigma_avg[bro, co]
                        sigma_avg[bro, co] = np.sqrt(ssva / (sswsum**2))

                # Copy values to other channels
                for c in range(1, nchan):
                    for co in range(sigma.shape[1]):
                        sigma_avg[bro + c, co] = sigma_avg[bro, co]

        return RowAverageOutput(ant1_avg, ant2_avg,
                                time_centroid_avg,
                                exposure_avg, uvw_avg,
                                weight_avg, sigma_avg)

    return impl


_rowchan_output_fields = ["visibilities",
                          "flag",
                          "weight_spectrum",
                          "sigma_spectrum"]
RowChanAverageOutput = namedtuple("RowChanAverageOutput",
                                  _rowchan_output_fields)


class RowChannelAverageException(Exception):
    pass


@intrinsic
def average_visibilities(typingctx, vis, vis_avg, vis_weight_sum,
                         weight, ri, fi, ro, co):

    import numba.core.types as nbtypes

    have_array = isinstance(vis, nbtypes.Array)
    have_tuple = isinstance(vis, (nbtypes.Tuple, nbtypes.UniTuple))

    def avg_fn(vis, vis_avg, vis_ws, wt, ri, fi, ro, co):
        vis_avg[ro, co] += vis[ri, fi, co] * wt
        vis_ws[ro, co] += wt

    return_type = nbtypes.NoneType("none")

    sig = return_type(vis, vis_avg, vis_weight_sum,
                      weight, ri, fi, ro, co)

    def codegen(context, builder, signature, args):
        vis, vis_type = args[0], signature.args[0]
        vis_avg, vis_avg_type = args[1], signature.args[1]
        vis_weight_sum, vis_weight_sum_type = args[2], signature.args[2]
        weight, weight_type = args[3], signature.args[3]
        ri, ri_type = args[4], signature.args[4]
        fi, fi_type = args[5], signature.args[5]
        ro, ro_type = args[6], signature.args[6]
        co, co_type = args[7], signature.args[7]
        return_type = signature.return_type

        if have_array:
            avg_sig = return_type(vis_type,
                                  vis_avg_type,
                                  vis_weight_sum_type,
                                  weight_type,
                                  ri_type, fi_type,
                                  ro_type, co_type)
            avg_args = [vis, vis_avg, vis_weight_sum,
                        weight, ri, fi, ro, co]

            # Compile function and get handle to output
            context.compile_internal(builder, avg_fn,
                                     avg_sig, avg_args)
        elif have_tuple:
            for i in range(len(vis_type)):
                avg_sig = return_type(vis_type.types[i],
                                      vis_avg_type.types[i],
                                      vis_weight_sum_type.types[i],
                                      weight_type,
                                      ri_type, fi_type,
                                      ro_type, co_type)
                avg_args = [builder.extract_value(vis, i),
                            builder.extract_value(vis_avg, i),
                            builder.extract_value(vis_weight_sum, i),
                            weight, ri, fi, ro, co]

                # Compile function and get handle to output
                context.compile_internal(builder, avg_fn,
                                         avg_sig, avg_args)
        else:
            raise TypeError("Unhandled visibility array type")

    return sig, codegen


@intrinsic
def normalise_visibilities(typingctx, vis_avg, vis_weight_sum, ro, co):
    import numba.core.types as nbtypes

    have_array = isinstance(vis_avg, nbtypes.Array)
    have_tuple = isinstance(vis_avg, (nbtypes.Tuple, nbtypes.UniTuple))

    def normalise_fn(vis_avg, vis_ws, ro, co):
        weight_sum = vis_ws[ro, co]

        if weight_sum != 0.0:
            vis_avg[ro, co] /= weight_sum

    return_type = nbtypes.NoneType("none")
    sig = return_type(vis_avg, vis_weight_sum, ro, co)

    def codegen(context, builder, signature, args):
        vis_avg, vis_avg_type = args[0], signature.args[0]
        vis_weight_sum, vis_weight_sum_type = args[1], signature.args[1]
        ro, ro_type = args[2], signature.args[2]
        co, co_type = args[3], signature.args[3]
        return_type = signature.return_type

        if have_array:
            # Normalise single array
            norm_sig = return_type(vis_avg_type,
                                   vis_weight_sum_type,
                                   ro_type, co_type)
            norm_args = [vis_avg, vis_weight_sum, ro, co]

            context.compile_internal(builder, normalise_fn,
                                     norm_sig, norm_args)
        elif have_tuple:
            # Normalise each array in the tuple
            for i in range(len(vis_avg_type)):
                norm_sig = return_type(vis_avg_type.types[i],
                                       vis_weight_sum_type.types[i],
                                       ro_type, co_type)
                norm_args = [builder.extract_value(vis_avg, i),
                             builder.extract_value(vis_weight_sum, i),
                             ro, co]

                # Compile function and get handle to output
                context.compile_internal(builder, normalise_fn,
                                         norm_sig, norm_args)
        else:
            raise TypeError("Unhandled visibility array type")

    return sig, codegen


@generated_jit(nopython=True, nogil=True, cache=True)
def row_chan_average(meta, flag_row=None, weight=None,
                     visibilities=None,
                     flag=None,
                     weight_spectrum=None,
                     sigma_spectrum=None):

    have_vis = not is_numba_type_none(visibilities)
    have_flag = not is_numba_type_none(flag)
    have_flag_row = not is_numba_type_none(flag_row)
    have_flags = have_flag_row or have_flag

    have_weight = not is_numba_type_none(weight)
    have_weight_spectrum = not is_numba_type_none(weight_spectrum)
    have_sigma_spectrum = not is_numba_type_none(sigma_spectrum)

    def impl(meta, flag_row=None, weight=None,
             visibilities=None,
             flag=None,
             weight_spectrum=None,
             sigma_spectrum=None):

        out_rows = meta.time.shape[0]
        nchan, ncorrs = chan_corrs(visibilities, flag,
                                   weight_spectrum, sigma_spectrum,
                                   None, None,
                                   None, None)

        out_shape = (out_rows, ncorrs)

        if not have_flag:
            flag_avg = None
        else:
            flag_avg = np.zeros(out_shape, np.bool_)

        # If either flag_row or flag is present, we need to ensure that
        # effective averaging takes place.
        if have_flags:
            flags_match = np.zeros(meta.map.shape + (ncorrs,), dtype=np.bool_)
            flag_counts = np.zeros(out_shape, dtype=np.uint32)
        else:
            flags_match = None
            flag_counts = None

        counts = np.zeros(out_shape, dtype=np.uint32)

        # Determine output bin counts both unflagged and flagged
        for ri in range(meta.map.shape[0]):
            row_flagged = have_flag_row and flag_row[ri] != 0
            for fi in range(meta.map.shape[1]):
                ro = meta.map[ri, fi]

                for co in range(ncorrs):
                    flagged = (row_flagged or
                               (have_flag and flag[ri, fi, co] != 0))

                    if have_flags and flagged:
                        flag_counts[ro, co] += 1
                    else:
                        counts[ro, co] += 1

        # ------
        # Flags
        # ------

        # Determine whether input samples should contribute to an output bin
        # and, if flags are parent, whether the output bin is flagged

        # This follows from the definition of an effective average:
        #
        # * bad or flagged values should be excluded
        #   when calculating the average
        #
        # Note that if a bin is completely flagged we still compute an average,
        # to which all relevant input samples contribute.
        for ri in range(meta.map.shape[0]):
            row_flagged = have_flag_row and flag_row[ri] != 0
            for fi in range(meta.map.shape[1]):
                ro = meta.map[ri, fi]

                for co in range(ncorrs):
                    if counts[ro, co] > 0:
                        # Output bin should only contain unflagged samples
                        out_flag = False

                        if have_flag:
                            # Set output flags
                            flag_avg[ro, co] = False

                    elif have_flags and flag_counts[ro, co] > 0:
                        # Output bin is completely flagged
                        out_flag = True

                        if have_flag:
                            # Set output flags
                            flag_avg[ro, co] = True
                    else:
                        raise RowChannelAverageException("Zero-filled bin")

                    # We should only add a sample to an output bin
                    # if the input flag matches the output flag.
                    # This is because flagged samples don't contribute
                    # to a bin with some unflagged samples while
                    # unflagged samples never contribute to a
                    # completely flagged bin
                    if have_flags:
                        in_flag = (row_flagged or
                                   (have_flag and flag[ri, fi, co] != 0))
                        flags_match[ri, fi, co] = in_flag == out_flag

        # -------------
        # Visibilities
        # -------------
        if not have_vis:
            vis_avg = None
        else:
            vis_avg, vis_weight_sum = vis_output_arrays(
                visibilities, out_shape)

            # Aggregate
            for ri in range(meta.map.shape[0]):
                for fi in range(meta.map.shape[1]):
                    ro = meta.map[ri, fi]

                    for co in range(ncorrs):
                        if have_flags and not flags_match[ri, fi, co]:
                            continue

                        wt = (weight_spectrum[ri, fi, co]
                              if have_weight_spectrum else
                              weight[ri, co] if have_weight else 1.0)

                        average_visibilities(visibilities,
                                             vis_avg, vis_weight_sum,
                                             wt, ri, fi, ro, co)

            # Normalise
            for ro in range(out_rows):
                for co in range(ncorrs):
                    normalise_visibilities(vis_avg, vis_weight_sum, ro, co)

        # ----------------
        # Weight Spectrum
        # ----------------
        if not have_weight_spectrum:
            weight_spectrum_avg = None
        else:
            weight_spectrum_avg = np.zeros(out_shape, weight_spectrum.dtype)

            # Aggregate
            for ri in range(meta.map.shape[0]):
                for fi in range(meta.map.shape[1]):
                    ro = meta.map[ri, fi]

                    for co in range(ncorrs):
                        if have_flags and not flags_match[ri, fi, co]:
                            continue

                        weight_spectrum_avg[ro, co] += (
                            weight_spectrum[ri, fi, co])

        # ---------------
        # Sigma Spectrum
        # ---------------
        if not have_sigma_spectrum:
            sigma_spectrum_avg = None
        else:
            sigma_spectrum_avg = np.zeros(out_shape, sigma_spectrum.dtype)
            sigma_spectrum_weight_sum = np.zeros_like(sigma_spectrum_avg)

            # Aggregate
            for ri in range(meta.map.shape[0]):
                for fi in range(meta.map.shape[1]):
                    ro = meta.map[ri, fi]

                    for co in range(ncorrs):
                        if have_flags and not flags_match[ri, fi, co]:
                            continue

                        wt = (weight_spectrum[ri, fi, co]
                              if have_weight_spectrum else
                              weight[ri, co] if have_weight else 1.0)

                        ssv = sigma_spectrum[ri, fi, co]**2 * wt**2
                        sigma_spectrum_avg[ro, co] += ssv
                        sigma_spectrum_weight_sum[ro, co] += wt

            # Normalise
            for ro in range(out_rows):
                for co in range(ncorrs):
                    if sigma_spectrum_weight_sum[ro, co] != 0.0:
                        ssv = sigma_spectrum_avg[ro, co]
                        sswsum = sigma_spectrum_weight_sum[ro, co]
                        sigma_spectrum_avg[ro, co] = np.sqrt(ssv / sswsum**2)

        return RowChanAverageOutput(vis_avg, flag_avg,
                                    weight_spectrum_avg,
                                    sigma_spectrum_avg)

    return impl


_chan_output_fields = ["chan_freq", "chan_width", "effective_bw", "resolution"]
ChannelAverageOutput = namedtuple("ChannelAverageOutput", _chan_output_fields)


AverageOutput = namedtuple("AverageOutput",
                           list(RowMapOutput._fields) +
                           _row_output_fields +
                           # _chan_output_fields +
                           _rowchan_output_fields)


[docs]@generated_jit(nopython=True, nogil=True, cache=True) def bda(time, interval, antenna1, antenna2, time_centroid=None, exposure=None, flag_row=None, uvw=None, weight=None, sigma=None, chan_freq=None, chan_width=None, effective_bw=None, resolution=None, visibilities=None, flag=None, weight_spectrum=None, sigma_spectrum=None, max_uvw_dist=None, max_fov=3.0, decorrelation=0.98, time_bin_secs=None, min_nchan=1): def impl(time, interval, antenna1, antenna2, time_centroid=None, exposure=None, flag_row=None, uvw=None, weight=None, sigma=None, chan_freq=None, chan_width=None, effective_bw=None, resolution=None, visibilities=None, flag=None, weight_spectrum=None, sigma_spectrum=None, max_uvw_dist=None, max_fov=3.0, decorrelation=0.98, time_bin_secs=None, min_nchan=1): # Merge flag_row and flag arrays flag_row = merge_flags(flag_row, flag) meta = bda_mapper(time, interval, antenna1, antenna2, uvw, chan_width, chan_freq, max_uvw_dist, flag_row=flag_row, max_fov=max_fov, decorrelation=decorrelation, time_bin_secs=time_bin_secs, min_nchan=min_nchan) row_avg = row_average(meta, antenna1, antenna2, flag_row, # noqa: F841 time_centroid, exposure, uvw, weight=weight, sigma=sigma) row_chan_avg = row_chan_average(meta, # noqa: F841 flag_row=flag_row, visibilities=visibilities, flag=flag, weight_spectrum=weight_spectrum, sigma_spectrum=sigma_spectrum) # Have to explicitly write it out because numba tuples # are highly constrained types return AverageOutput(meta.map, meta.offsets, meta.decorr_chan_width, meta.time, meta.interval, meta.chan_width, meta.flag_row, row_avg.antenna1, row_avg.antenna2, row_avg.time_centroid, row_avg.exposure, row_avg.uvw, row_avg.weight, row_avg.sigma, # None, # chan_data.chan_freq, # None, # chan_data.chan_width, # None, # chan_data.effective_bw, # None, # chan_data.resolution, row_chan_avg.visibilities, row_chan_avg.flag, row_chan_avg.weight_spectrum, row_chan_avg.sigma_spectrum) return impl
BDA_DOCS = DocstringTemplate(""" Averages in time and channel, dependent on baseline length. Parameters ---------- time : $(array_type) Time values of shape :code:`(row,)`. interval : $(array_type) Interval values of shape :code:`(row,)`. antenna1 : $(array_type) First antenna indices of shape :code:`(row,)` antenna2 : $(array_type) Second antenna indices of shape :code:`(row,)` time_centroid : $(array_type), optional Time centroid values of shape :code:`(row,)` exposure : $(array_type), optional Exposure values of shape :code:`(row,)` flag_row : $(array_type), optional Flagged rows of shape :code:`(row,)`. uvw : $(array_type), optional UVW coordinates of shape :code:`(row, 3)`. weight : $(array_type), optional Weight values of shape :code:`(row, corr)`. sigma : $(array_type), optional Sigma values of shape :code:`(row, corr)`. chan_freq : $(array_type), optional Channel frequencies of shape :code:`(chan,)`. chan_width : $(array_type), optional Channel widths of shape :code:`(chan,)`. effective_bw : $(array_type), optional Effective channel bandwidth of shape :code:`(chan,)`. resolution : $(array_type), optional Effective channel resolution of shape :code:`(chan,)`. visibilities : $(array_type) or tuple of $(array_type), optional Visibility data of shape :code:`(row, chan, corr)`. Tuples of visibilities arrays may be supplied, in which case tuples will be output. flag : $(array_type), optional Flag data of shape :code:`(row, chan, corr)`. weight_spectrum : $(array_type), optional Weight spectrum of shape :code:`(row, chan, corr)`. sigma_spectrum : $(array_type), optional Sigma spectrum of shape :code:`(row, chan, corr)`. max_uvw_dist : float, optional Maximum UVW distance. Will be inferred from the UVW coordinates if not supplied. max_fov : float Maximum Field of View Radius. Defaults to 3 degrees. decorrelation : float Acceptable amount of decorrelation. This is a floating point value between 0.0 and 1.0. time_bin_secs : float, optional Maximum number of seconds worth of data that can be aggregated into a bin. Defaults to None in which case the value is only bounded by the decorrelation factor and the field of view. min_nchan : int, optional Minimum number of channels in an averaged sample. Useful in cases where imagers expect at least `min_nchan` channels. Defaults to 1. Notes ----- In all cases arrays starting with :code:`(row, chan)` and :code:`(row,)` dimensions are respectively averaged and expanded into a :code:`(rowchan,)` dimension, as the number of channels varies per output row. The output namedtuple contains an `offsets` array of shape :code:`(out_rows + 1,)` encoding the starting offsets of each output row, as well as a single entry at the end such that :code:`np.diff(offsets)` produces the number of channels for each output row. .. code-block:: python avg = bda(...) time = avg.time[avg.offsets[:-1]] out_chans = np.diff(avg.offsets) The implementation currently requires unique lexicographical combinations of (TIME, ANTENNA1, ANTENNA2). This can usually be achieved by suitably partitioning input data on indexing rows, DATA_DESC_ID and SCAN_NUMBER in particular. Returns ------- namedtuple A namedtuple whose entries correspond to the input arrays. Output arrays will be ``None`` if the inputs were ``None``. See the Notes for an explanation of the output formats. """) try: bda.__doc__ = BDA_DOCS.substitute(array_type=":class:`numpy.ndarray`") except AttributeError: pass