Source code for africanus.calibration.phase_only.dask

# -*- coding: utf-8 -*-

from africanus.calibration.phase_only.phase_only import COMPUTE_JHJ_DOCS
from africanus.calibration.phase_only.phase_only import COMPUTE_JHR_DOCS
from africanus.calibration.utils import check_type
from africanus.calibration.phase_only import compute_jhj as np_compute_jhj
from africanus.calibration.phase_only import compute_jhr as np_compute_jhr
from africanus.util.requirements import requires_optional
from africanus.calibration.utils.utils import DIAG_DIAG

try:
    from dask.array.core import blockwise
except ImportError as e:
    dask_import_error = e
else:
    dask_import_error = None


[docs] @requires_optional("dask.array", dask_import_error) def compute_jhj( time_bin_indices, time_bin_counts, antenna1, antenna2, jones, model, flag ): mode = check_type(jones, model, vis_type="model") if mode != DIAG_DIAG: raise NotImplementedError("Only DIAG-DIAG case has been implemented") jones_shape = ("row", "ant", "chan", "dir", "corr") vis_shape = ("row", "chan", "corr") model_shape = ("row", "chan", "dir", "corr") return blockwise( np_compute_jhj, jones_shape, time_bin_indices, ("row",), time_bin_counts, ("row",), antenna1, ("row",), antenna2, ("row",), jones, jones_shape, model, model_shape, flag, vis_shape, adjust_chunks={"row": antenna1.chunks[0]}, new_axes={"corr2": 2}, # why? dtype=model.dtype, align_arrays=False, )
[docs] @requires_optional("dask.array", dask_import_error) def compute_jhr( time_bin_indices, time_bin_counts, antenna1, antenna2, jones, residual, model, flag ): mode = check_type(jones, residual) if mode != DIAG_DIAG: raise NotImplementedError("Only DIAG-DIAG case has been implemented") jones_shape = ("row", "ant", "chan", "dir", "corr") vis_shape = ("row", "chan", "corr") model_shape = ("row", "chan", "dir", "corr") return blockwise( np_compute_jhr, jones_shape, time_bin_indices, ("row",), time_bin_counts, ("row",), antenna1, ("row",), antenna2, ("row",), jones, jones_shape, residual, vis_shape, model, model_shape, flag, vis_shape, adjust_chunks={"row": antenna1.chunks[0]}, new_axes={"corr2": 2}, # why? dtype=model.dtype, align_arrays=False, )
compute_jhj.__doc__ = COMPUTE_JHJ_DOCS.substitute( array_type=":class:`dask.array.Array`" ) compute_jhr.__doc__ = COMPUTE_JHR_DOCS.substitute( array_type=":class:`dask.array.Array`" )