Source code for africanus.model.wsclean.file_model

# -*- coding: utf-8 -*-

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

from ast import literal_eval
import math
import re

from africanus.compatibility import string_types


hour_re = re.compile(r"(?P<sign>[-]*)"
                     r"(?P<hours>\d+):"
                     r"(?P<mins>\d+):"
                     r"(?P<secs>\d+\.\d+)")

deg_re = re.compile(r"(?P<sign>[-])*"
                    r"(?P<degs>\d+)\."
                    r"(?P<mins>\d+)\."
                    r"(?P<secs>\d+\.\d+)")


def _hour_converter(hour_str):
    m = hour_re.match(hour_str)

    if not m:
        raise ValueError("Error parsing '%s'" % hour_str)

    if m.group("sign") == '-':
        value = float(m.group("hours")) / 24.0
        value += float(m.group("mins")) / (24.0*60.0)
        value += float(m.group("secs")) / (24.0*60.0*60.0)
    else:
        value = float(m.group("hours")) / 24.0
        value -= float(m.group("mins")) / (24.0*60.0)
        value -= float(m.group("secs")) / (24.0*60.0*60.0)

    return 2.0 * math.pi * value


def _deg_converter(deg_str):
    m = deg_re.match(deg_str)

    if not m:
        raise ValueError("Error parsing '%s'" % deg_str)

    if m.group("sign") == '-':
        value = float(m.group("degs")) / 360.0
        value += float(m.group("mins")) / (360.0*60.0)
        value += float(m.group("secs")) / (360.0*60.0*60.0)
    else:
        value = float(m.group("degs")) / 360.0
        value -= float(m.group("mins")) / (360.0*60.0)
        value -= float(m.group("secs")) / (360.0*60.0*60.0)

    return 2.0 * math.pi * value


_COLUMN_CONVERTERS = [
    ('Name', str),
    ('Type', str),
    ('Ra', _hour_converter),
    ('Dec', _deg_converter),
    ('I', float),
    ('SpectralIndex', literal_eval),
    ('LogarithmicSI', lambda x: bool(x == "true")),
    ('ReferenceFrequency', float),
    ('MajorAxis', float),
    ('MinorAxis', float),
    ('Orientation', float),
]


# Split on commas, ignoring within [] brackets
_COMMA_SPLIT_RE = re.compile(r',\s*(?=[^\]]*(?:\[|$))')

# Parse columm headers, handling possible defaults
_COL_HEADER_RE = re.compile(r"^\s*?(?P<name>.*?)"
                            r"(\s*?=\s*?'(?P<default>.*?)'\s*?){0,1}$")


def _parse_col_descriptor(column_descriptor):
    components = [c.strip() for c in column_descriptor.split(",")]

    columns = []
    defaults = []

    for column in components:
        m = _COL_HEADER_RE.search(column)

        if m is None:
            raise ValueError("'%s' is not a valid column header" % column)

        name, default = m.group('name', 'default')

        columns.append(name)
        defaults.append(default)

    return columns, defaults


def _parse_header(header):
    format_str, col_desc = (c.strip() for c in header.split("=", 1))

    if format_str != "Format":
        raise ValueError("'%s' does not appear to be a wsclean header")

    return _parse_col_descriptor(col_desc)


def _parse_lines(fh, line_nr, column_names, defaults, converters):
    source_data = [[] for _ in range(len(column_names))]

    for line_nr, line in enumerate(fh, line_nr):
        components = [c.strip() for c in re.split(_COMMA_SPLIT_RE, line)]

        if len(components) != len(column_names):
            raise ValueError("line %d '%s' should have %d components" %
                             (line_nr, line, len(column_names)))

        # Iterate through each column's data
        it = zip(column_names, components, converters, source_data, defaults)

        for name, comp, conv, data_list, default in it:
            if not comp:
                if default is None:
                    try:
                        default = conv()
                    except Exception as e:
                        raise ValueError("No value supplied for column '%s' "
                                         "on line %d and no default was "
                                         "supplied either. Attempting to "
                                         "generate a default produced the "
                                         "following exception %s"
                                         % (name, line_nr, str(e)))

                value = default
            else:
                value = comp

            data_list.append(conv(value))

    return zip(*(column_names, source_data))


[docs]def load(filename): """ Loads wsclean component model. .. code-block:: python sources = load("components.txt") sources = dict(sources) # Convert to dictionary I = sources["I"] ref_freq = sources["ReferenceFrequency"] See the `WSClean Component List <https://sourceforge.net/p/wsclean/wiki/ComponentList/>`_ for further details. Parameters ---------- filename : str or iterable Filename of wsclean model file or iterable producing the lines of the file. See Also -------- africanus.model.wsclean.spectra Returns ------- list of (name, list of values) tuples list of column (name, value) tuples """ if isinstance(filename, string_types): fh = open(filename, "r") fh = iter(fh) close_filename = True else: fh = iter(filename) close_filename = False try: # Search for a header until we find a non-empty string header = '' line_nr = 1 for headers in fh: header = headers.split("#", 1)[0].strip() if header: break line_nr += 1 if not header: raise ValueError("'%s' does not contain a valid wsclean header" % filename) column_names, defaults = _parse_header(header) conv_names, converters = zip(*_COLUMN_CONVERTERS) if not conv_names == tuple(column_names): raise ValueError("header columns '%s' do not match expected '%s'" % (conv_names, column_names)) return _parse_lines(fh, line_nr, column_names, defaults, converters) finally: if close_filename: fh.close()