Utilities#

Command Line#

parse_python_assigns(assign_str)

Parses a string, containing assign statements into a dictionary.

africanus.util.cmdline.parse_python_assigns(assign_str)[source]#

Parses a string, containing assign statements into a dictionary.

data = parse_python_assigns("beta=5.6; l=[2,3], s='hello, world'")

assert data == {
    'beta': 5.6,
    'l': [2, 3],
    's': 'hello, world'
}
Parameters:
assign_str: str

Assignment string. Should only contain assignment statements assigning python literals or builtin function calls, to variable names. Multiple assignment statements should be separated by semi-colons.

Returns:
dict

Dictionary { name: value } containing assignment results.

Requirements Handling#

requires_optional(*requirements)

Decorator returning either the original function, or a dummy function raising a MissingPackageException when called, depending on whether the supplied requirements are present.

africanus.util.requirements.requires_optional(*requirements)[source]#

Decorator returning either the original function, or a dummy function raising a MissingPackageException when called, depending on whether the supplied requirements are present.

If packages are missing and called within a test, the dummy function will call pytest.skip().

Used in the following way:

try:
    from scipy import interpolate
except ImportError as e:
    # https://stackoverflow.com/a/29268974/1611416, pep 3110 and 344
    scipy_import_error = e
else:
    scipy_import_error = None

@requires_optional('scipy', scipy_import_error)
def function(*args, **kwargs):
    return interpolate(...)
Parameters:
requirementsiterable of string, None or ImportError

Sequence of package names required by the decorated function. ImportError exceptions (or None, indicating their absence) may also be supplied and will be immediately re-raised within the decorator. This is useful for tracking down problems in user import logic.

Returns:
callable

Either the original function if all requirements are available or a dummy function that throws a MissingPackageException or skips a pytest.

Shapes#

aggregate_chunks(chunks, max_chunks)

Aggregate dask chunks together into chunks no larger than max_chunks.

corr_shape(ncorr, corr_shape)

Returns the shape of the correlations, given ncorr and the type of correlation shape requested

africanus.util.shapes.aggregate_chunks(chunks, max_chunks)[source]#

Aggregate dask chunks together into chunks no larger than max_chunks.

chunks, max_c = ((3,4,6,3,6,7),(1,1,1,1,1,1)), (10,3)
expected = ((7,9,6,7), (2,2,1,1))
assert aggregate_chunks(chunks, max_c) == expected
Parameters:
chunkssequence of tuples or tuple
max_chunkssequence of ints or int
Returns:
sequence of tuples or tuple
africanus.util.shapes.corr_shape(ncorr, corr_shape)[source]#

Returns the shape of the correlations, given ncorr and the type of correlation shape requested

Parameters:
ncorrinteger

Number of correlations

corr_shape{‘flat’, ‘matrix’}

Shape of output correlations

Returns:
tuple

Shape tuple describing the correlation dimensions

  • If flat returns (ncorr,)

  • If matrix returns

    • (1,) if ncorr == 1

    • (2,) if ncorr == 2

    • (2,2) if ncorr == 4

Beams#

beam_filenames(filename_schema, corr_types)

Returns a dictionary of beam filename pairs, keyed on correlation,from the cartesian product of correlations and real, imaginary pairs

beam_grids(header[, l_axis, m_axis])

Extracts the FITS indices and grids for the beam dimensions in the supplied FITS header.

africanus.util.beams.beam_filenames(filename_schema, corr_types)[source]#

Returns a dictionary of beam filename pairs, keyed on correlation,from the cartesian product of correlations and real, imaginary pairs

Given beam_$(corr)_$(reim).fits returns:

{
  'xx' : ['beam_xx_re.fits', 'beam_xx_im.fits'],
  'xy' : ['beam_xy_re.fits', 'beam_xy_im.fits'],
  ...
  'yy' : ['beam_yy_re.fits', 'beam_yy_im.fits'],
}

Given beam_$(CORR)_$(REIM).fits returns:

{
  'xx' : ['beam_XX_RE.fits', 'beam_XX_IM.fits'],
  'xy' : ['beam_XY_RE.fits', 'beam_XY_IM.fits'],
  ...
  'yy' : ['beam_YY_RE.fits', 'beam_YY_IM.fits']),
}
Parameters:
filename_schemastr

String containing the filename schema.

corr_typeslist of integers

list of integers defining the correlation type.

Returns:
dict

Dictionary of schema {correlation : (refile, imfile)} mapping correlations to real and imaginary filename pairs

africanus.util.beams.beam_grids(header, l_axis=None, m_axis=None)[source]#

Extracts the FITS indices and grids for the beam dimensions in the supplied FITS header. Specifically the axes specified by

  1. L or X CTYPE

  2. M or Y CTYPE

  3. FREQ CTYPE

If the first two axes have a negative sign, such as -L, the grid will be inverted.

Any grids corresponding to axes with a CUNIT type of DEG will be converted to radians.

Parameters:
headerHeader or dict

FITS header object.

l_axisstr

FITS axis interpreted as the L axis. L and X are sensible values here. -L will invert the coordinate system on that axis.

m_axisstr

FITS axis interpreted as the M axis. M and Y are sensible values here. -M will invert the coordinate system on that axis.

Returns:
tuple

Returns ((l_axis, l_grid), (m_axis, m_grid), (freq_axis, freq_grid)) where the axis is the FORTRAN indexed FITS axis (1-indexed) and grid contains the values at each pixel along the axis.

Code#

format_code(code)

Formats some code with line numbers

memoize_on_key(key_fn)

Memoize based on a key function supplied by the user.

africanus.util.code.format_code(code)[source]#

Formats some code with line numbers

Parameters:
codestr

Code

Returns:
str

Code prefixed with line numbers

class africanus.util.code.memoize_on_key(key_fn)[source]#

Memoize based on a key function supplied by the user. The key function should return a custom key for memoizing the decorated function, based on the arguments passed to it.

In the following example, the arguments required to generate the _generate_phase_delay_kernel function are the types of the lm, uvw and frequency arrays, as well as the number of correlations, ncorr.

The supplied key_fn produces a unique key based on these types and the number of correlations, which is used to cache the generated function.

def key_fn(lm, uvw, frequency, ncorrs=4):
    '''
    Produce a unique key for the arguments of
     _generate_phase_delay_kernel
    '''
    return (lm.dtype, uvw.dtype, frequency.dtype, ncorrs)

_code_template = jinja2.Template('''
#define ncorrs {{ncorrs}}

__global__ void phase_delay(
    const {{lm_type}} * lm,
    const {{uvw_type}} * uvw,
    const {{freq_type}} * frequency,
    {{out_type}} * out)
{
    ...
}
''')

_type_map = {
    np.float32: 'float',
    np.float64: 'double'
}

@memoize_on_key(key_fn)
def _generate_phase_delay_kernel(lm, uvw, frequency, ncorrs=4):
    ''' Generate the phase delay kernel '''
    out_dtype = np.result_type(lm.dtype, uvw.dtype, frequency.dtype)
    code = _code_template.render(lm_type=_type_map[lm.dtype],
                                 uvw_type=_type_map[uvw.dtype],
                                 freq_type=_type_map[frequency.dtype],
                                 ncorrs=ncorrs)
    return cp.RawKernel(code, "phase_delay")

Methods

__call__(fn)

Call self as a function.

dask#

EstimatingProgressBar([minimum, width, dt, out])

Progress Bar that displays elapsed time as well as an estimate of total time taken.

class africanus.util.dask_util.EstimatingProgressBar(minimum=0, width=42, dt=1.0, out=sys.stdout)[source]#

Progress Bar that displays elapsed time as well as an estimate of total time taken.

When starting a dask computation, the bar examines the graph and determines the number of chunks contained by a dask collection.

During computation the number of completed chunks and their the total time taken to complete them are tracked. The average derived from these numbers are used to estimate total compute time, relative to the current elapsed time.

The bar is not particularly accurate and will underestimate near the beginning of computation and seems to slightly overestimate during the buk of computation. However, it may be more accurate than the default dask task bar which tracks number of tasks completed by total tasks.

Parameters:
minimumint, optional

Minimum time threshold in seconds before displaying a progress bar. Default is 0 (always display)

widthint, optional

Width of the bar, default is 42 characters.

dtfloat, optional

Update resolution in seconds, default is 1.0 seconds.

CUDA#

grids(dims, blocks)

Determine the grid size, given space dimensions sizes and blocks

africanus.util.cuda.grids(dims, blocks)[source]#

Determine the grid size, given space dimensions sizes and blocks

Parameters:
dimstuple of ints

(x, y, z) tuple

Returns:
tuple

(x, y, z) grid size tuple

Patterns#

Multiton(*args, **kwargs)

General Multiton metaclass

LazyProxy(fn, *args, **kwargs)

Lazy instantiation of a proxied object.

LazyProxyMultiton(*args, **kwargs)

Combination of a LazyProxy with a Multiton

class africanus.util.patterns.Multiton(*args, **kwargs)[source]#

General Multiton metaclass

Implementation of the Multiton pattern, which always returns a unique object for a unique set of arguments provided to a class constructor. For example, in the following, only a single instance of A with argument 1 is ever created.

class A(metaclass=Multiton):
    def __init__(self, *args, **kw):
        self.args = args
        self.kw = kw

assert A(1) is A(1)
assert A(1, "bob") is not A(1)

This is useful for ensuring that only a single instance of a heavy-weight resource such as files, sockets, thread/process pools or database connections is created in a single process, for a unique set of arguments.

Notes

Instantiation of object instances is thread-safe.

class africanus.util.patterns.LazyProxy(fn, *args, **kwargs)[source]#

Lazy instantiation of a proxied object.

A LazyProxy proxies an object which is lazily instantiated on first use. It is primarily useful for embedding references to heavy-weight resources in a dask graph, so they can be pickled and sent to other workers without immediately instantiating those resources.

To this end, the proxy takes as arguments:

  1. a class or factory function that instantiates the desired resource.

  2. *args and **kwargs that should be supplied to the instantiator.

The function and arguments for creating a file are wrapped in a LazyProxy. It is only instantiated when f.write is called.#
f = LazyProxy(open, "test.txt", mode="r")
f.write("Hello World!")
f.close()

In addition to the class/factory function, it is possible to specifiy a Finaliser supplied to weakref.finalize that is called to cleanup the resource when the LazyProxy is garbage collected. In this case, the first argument should be a tuple of two elements: the factory and the finaliser.

# LazyProxy defined with factory function and finaliser function
def finalise_file(file):
    file.close()

f2 = LazyProxy((open, finalise_file), "test.txt", mode="r")

class WrappedFile:
    def __init__(self, *args, **kwargs):
        self.handle = open(*args, **kwargs)

    def close(self):
        self.handle.close()

# LazyProxy defined with class
f1 = LazyProxy((WrappedFile, WrappedFile.close), "test.txt", mode="r")

LazyProxy objects are designed to be embedded in dask.array.blockwise() calls. For example:

# Specify the start and length of each range
file_ranges = np.array([[0, 5], [5, 10], [15, 5] [20, 10]])
# Chunk each range individually
da_file_ranges = dask.array(file_ranges, chunks=(1, 2))
# Reference a binary file
file_proxy = LazyProxy(open, "data.dat", "rb")

def _read(file_proxy, file_range):
    # Seek to range start and read the length of data
    start, length = file_range
    file_proxy.seek(start)
    return np.asarray(file_proxy.read(length), np.uint8)

data = da.blockwise(_read, "x",
                    # Embed the file_proxy in the graph
                    file_proxy, None,
                    # Pass each file range to the _read
                    da_file_ranges, "xy",
                    # output chunks should have the length
                    # of each range
                    adjust_chunks={"x": tuple(file_ranges[:, 1])},
                    concatenate=True)

print(data.compute(processes=True))
Parameters:
fnclass or callable or tuple

A callable object that used to create the proxied object. In tuple form, this should consist of two callables. The first should create the proxied object and the second should be a finaliser that performs cleanup on the proxied object when the LazyProxy is garbage collected: it is passed directly to weakref.finalize.

*argstuple

Positional arguments passed to the callable object specified in fn that will create the proxied object. The contents of *args should be pickleable.

**kwargsdict

Keyword arguments passed to the callable object specified in fn that will create the proxied object. The contents of **kwargs should be pickleable.

Notes

  • Instantiation of the proxied object is thread-safe.

  • LazyProxy’s are configured to never instantiate within dask.array.blockwise() and dask.blockwise.blockwise() calls.

class africanus.util.patterns.LazyProxyMultiton(*args, **kwargs)[source]#

Combination of a LazyProxy with a Multiton

Ensures that only a single LazyProxy is ever created for the given constructor arguments.

class A:
    def __init__(self, value):
        self.value = value

assert LazyProxyMultiton("foo") is LazyProxyMultiton("foo")

See LazyProxy and Multiton for further details