Source code for africanus.model.wsclean.file_model

# -*- coding: utf-8 -*-


import math
import re
import warnings

import numpy as np

hour_re = re.compile(
    r"(?P<sign>[+-]*)" r"(?P<hours>\d+):" r"(?P<mins>\d+):" r"(?P<secs>\d+\.?\d*)"
)

deg_re = re.compile(
    r"(?P<sign>[+-])*" r"(?P<degs>\d+)\." r"(?P<mins>\d+)\." r"(?P<secs>\d+\.?\d*)"
)


def _hour_converter(hour_str):
    m = hour_re.match(hour_str)

    if not m:
        raise ValueError("Error parsing '%s'" % hour_str)

    value = float(m.group("hours")) / 24.0
    value += float(m.group("mins")) / (24.0 * 60.0)
    value += float(m.group("secs")) / (24.0 * 60.0 * 60.0)

    if m.group("sign") == "-":
        value = -value

    return 2.0 * math.pi * value


def _deg_converter(deg_str):
    m = deg_re.match(deg_str)

    if not m:
        raise ValueError(f"Error parsing '{deg_str}'")

    value = float(m.group("degs")) / 360.0
    value += float(m.group("mins")) / (360.0 * 60.0)
    value += float(m.group("secs")) / (360.0 * 60.0 * 60.0)

    if m.group("sign") == "-":
        value = -value

    return 2.0 * math.pi * value


def arcsec2rad(arcseconds=0.0):
    return np.deg2rad(float(arcseconds) / 3600.0)


def spi_converter(spi):
    spi = spi.strip("[] ")

    if not spi:
        return []

    return [float(c) for c in spi.split(",")]


_COLUMN_CONVERTERS = {
    "Name": str,
    "Type": str,
    "Ra": _hour_converter,
    "Dec": _deg_converter,
    "I": float,
    "SpectralIndex": spi_converter,
    "LogarithmicSI": lambda x: bool(x == "true"),
    "ReferenceFrequency": float,
    "MajorAxis": arcsec2rad,
    "MinorAxis": arcsec2rad,
    "Orientation": lambda x=0.0: np.deg2rad(float(x)),
}


# Split on commas, ignoring within [] brackets
_COMMA_SPLIT_RE = re.compile(r",\s*(?=[^\]]*(?:\[|$))")

# Parse columm headers, handling possible defaults
_COL_HEADER_RE = re.compile(
    r"^\s*?(?P<name>.*?)" r"(\s*?=\s*?'(?P<default>.*?)'\s*?){0,1}$"
)


def _parse_col_descriptor(column_descriptor):
    components = [c.strip() for c in column_descriptor.split(",")]

    columns = []
    defaults = []

    for column in components:
        m = _COL_HEADER_RE.search(column)

        if m is None:
            raise ValueError(f"'{column}' is not a valid column header")

        name, default = m.group("name", "default")

        columns.append(name)
        defaults.append(default)

    return columns, defaults


def _parse_header(header):
    format_str, col_desc = (c.strip() for c in header.split("=", 1))

    if format_str != "Format":
        raise ValueError(f"'{format_str}' does not " f"appear to be a wsclean header")

    return _parse_col_descriptor(col_desc)


def _parse_lines(fh, line_nr, column_names, defaults, converters):
    source_data = [[] for _ in range(len(column_names))]

    for line_nr, line in enumerate(fh, line_nr):
        components = [c.strip() for c in re.split(_COMMA_SPLIT_RE, line)]

        if len(components) != len(column_names):
            raise ValueError(
                f"line {line_nr} '{line}' should "
                f"have {len(column_names)} components"
            )

        # Iterate through each column's data
        it = zip(column_names, components, converters, source_data, defaults)

        for name, comp, conv, data_list, default in it:
            if not comp:
                if default is None:
                    try:
                        default = conv()
                    except Exception as e:
                        raise ValueError(
                            f"No value supplied for column '{name}' "
                            f"on line {line_nr} and no default was "
                            f"supplied either. Attempting to "
                            f"generate a default produced the "
                            f"following exception {e}"
                        )

                value = default
            else:
                value = comp

            data_list.append(conv(value))

    columns = dict(zip(*(column_names, source_data)))

    # Zero any spectral model's with nan/inf values
    try:
        name_column = columns["Name"]
        flux_column = columns["I"]
        spi_column = columns["SpectralIndex"]
        log_spi_column = columns["LogarithmicSI"]
    except KeyError as e:
        raise ValueError(f"WSClean Model File missing " f"required column {str(e)}")

    it = zip(name_column, flux_column, spi_column, log_spi_column)

    # Zero flux and spi's in-place
    for i, (name, flux, spi, log_spi) in enumerate(it):
        good = True

        if not math.isfinite(flux):
            warnings.warn(
                f"Non-finite I {flux} encountered "
                f"for source {name}. This source model will "
                f"be zeroed."
            )
            good = False

        if not all(map(math.isfinite, spi)):
            warnings.warn(
                f"Non-finite SpectralIndex {spi} encountered "
                f"for source {name}. This source model will "
                f"be zeroed."
            )
            good = False

        if good:
            continue

        # np.log(1.0) = 0.0
        flux_column[i] = 1.0 if log_spi else 0.0

        for j in range(len(spi)):
            spi[j] = 0.0

    return list(columns.items())


[docs] def load(filename): """ Loads wsclean component model. .. code-block:: python sources = load("components.txt") sources = dict(sources) # Convert to dictionary I = sources["I"] ref_freq = sources["ReferenceFrequency"] See the `WSClean Component List <https://sourceforge.net/p/wsclean/wiki/ComponentList/>`_ for further details. Parameters ---------- filename : str or iterable Filename of wsclean model file or iterable producing the lines of the file. See Also -------- africanus.model.wsclean.spectra Returns ------- list of (name, list of values) tuples list of column (name, value) tuples """ if isinstance(filename, str): fh = open(filename, "r") fh = iter(fh) close_filename = True else: fh = iter(filename) close_filename = False try: # Search for a header until we find a non-empty string header = "" line_nr = 1 for headers in fh: header = headers.split("#", 1)[0].strip() if header: break line_nr += 1 if not header: raise ValueError( f"'{filename}' does not contain " f"a valid wsclean header" ) column_names, defaults = _parse_header(header) try: converters = [_COLUMN_CONVERTERS[n] for n in column_names] except KeyError as e: raise ValueError(f"No converter registered for column {str(e)}") return _parse_lines(fh, line_nr, column_names, defaults, converters) finally: if close_filename: fh.close()