Utilities#
Command Line#
|
Parses a string, containing assign statements into a dictionary. |
- africanus.util.cmdline.parse_python_assigns(assign_str)[source]#
Parses a string, containing assign statements into a dictionary.
data = parse_python_assigns("beta=5.6; l=[2,3], s='hello, world'") assert data == { 'beta': 5.6, 'l': [2, 3], 's': 'hello, world' }
- Parameters:
- assign_str: str
Assignment string. Should only contain assignment statements assigning python literals or builtin function calls, to variable names. Multiple assignment statements should be separated by semi-colons.
- Returns:
- dict
Dictionary { name: value } containing assignment results.
Requirements Handling#
|
Decorator returning either the original function, or a dummy function raising a |
- africanus.util.requirements.requires_optional(*requirements)[source]#
Decorator returning either the original function, or a dummy function raising a
MissingPackageException
when called, depending on whether the suppliedrequirements
are present.If packages are missing and called within a test, the dummy function will call
pytest.skip()
.Used in the following way:
try: from scipy import interpolate except ImportError as e: # https://stackoverflow.com/a/29268974/1611416, pep 3110 and 344 scipy_import_error = e else: scipy_import_error = None @requires_optional('scipy', scipy_import_error) def function(*args, **kwargs): return interpolate(...)
- Parameters:
- requirementsiterable of string, None or ImportError
Sequence of package names required by the decorated function. ImportError exceptions (or None, indicating their absence) may also be supplied and will be immediately re-raised within the decorator. This is useful for tracking down problems in user import logic.
- Returns:
- callable
Either the original function if all
requirements
are available or a dummy function that throws aMissingPackageException
or skips a pytest.
Shapes#
|
Aggregate dask |
|
Returns the shape of the correlations, given |
- africanus.util.shapes.aggregate_chunks(chunks, max_chunks)[source]#
Aggregate dask
chunks
together into chunks no larger thanmax_chunks
.chunks, max_c = ((3,4,6,3,6,7),(1,1,1,1,1,1)), (10,3) expected = ((7,9,6,7), (2,2,1,1)) assert aggregate_chunks(chunks, max_c) == expected
- Parameters:
- chunkssequence of tuples or tuple
- max_chunkssequence of ints or int
- Returns:
- sequence of tuples or tuple
- africanus.util.shapes.corr_shape(ncorr, corr_shape)[source]#
Returns the shape of the correlations, given
ncorr
and the type of correlation shape requested- Parameters:
- ncorrinteger
Number of correlations
- corr_shape{‘flat’, ‘matrix’}
Shape of output correlations
- Returns:
- tuple
Shape tuple describing the correlation dimensions
If
flat
returns(ncorr,)
If
matrix
returns(1,)
ifncorr == 1
(2,)
ifncorr == 2
(2,2)
ifncorr == 4
Beams#
|
Returns a dictionary of beam filename pairs, keyed on correlation,from the cartesian product of correlations and real, imaginary pairs |
|
Extracts the FITS indices and grids for the beam dimensions in the supplied FITS |
- africanus.util.beams.beam_filenames(filename_schema, corr_types)[source]#
Returns a dictionary of beam filename pairs, keyed on correlation,from the cartesian product of correlations and real, imaginary pairs
Given
beam_$(corr)_$(reim).fits
returns:{ 'xx' : ['beam_xx_re.fits', 'beam_xx_im.fits'], 'xy' : ['beam_xy_re.fits', 'beam_xy_im.fits'], ... 'yy' : ['beam_yy_re.fits', 'beam_yy_im.fits'], }
Given
beam_$(CORR)_$(REIM).fits
returns:{ 'xx' : ['beam_XX_RE.fits', 'beam_XX_IM.fits'], 'xy' : ['beam_XY_RE.fits', 'beam_XY_IM.fits'], ... 'yy' : ['beam_YY_RE.fits', 'beam_YY_IM.fits']), }
- Parameters:
- filename_schemastr
String containing the filename schema.
- corr_typeslist of integers
list of integers defining the correlation type.
- Returns:
- dict
Dictionary of schema
{correlation : (refile, imfile)}
mapping correlations to real and imaginary filename pairs
- africanus.util.beams.beam_grids(header, l_axis=None, m_axis=None)[source]#
Extracts the FITS indices and grids for the beam dimensions in the supplied FITS
header
. Specifically the axes specified byL
orX
CTYPEM
orY
CTYPEFREQ
CTYPE
If the first two axes have a negative sign, such as
-L
, the grid will be inverted.Any grids corresponding to axes with a CUNIT type of
DEG
will be converted to radians.- Parameters:
- header
Header
or dict FITS header object.
- l_axisstr
FITS axis interpreted as the L axis. L and X are sensible values here. -L will invert the coordinate system on that axis.
- m_axisstr
FITS axis interpreted as the M axis. M and Y are sensible values here. -M will invert the coordinate system on that axis.
- header
- Returns:
- tuple
Returns ((l_axis, l_grid), (m_axis, m_grid), (freq_axis, freq_grid)) where the axis is the FORTRAN indexed FITS axis (1-indexed) and grid contains the values at each pixel along the axis.
Code#
|
Formats some code with line numbers |
|
Memoize based on a key function supplied by the user. |
- africanus.util.code.format_code(code)[source]#
Formats some code with line numbers
- Parameters:
- codestr
Code
- Returns:
- str
Code prefixed with line numbers
- class africanus.util.code.memoize_on_key(key_fn)[source]#
Memoize based on a key function supplied by the user. The key function should return a custom key for memoizing the decorated function, based on the arguments passed to it.
In the following example, the arguments required to generate the _generate_phase_delay_kernel function are the types of the lm, uvw and frequency arrays, as well as the number of correlations, ncorr.
The supplied
key_fn
produces a unique key based on these types and the number of correlations, which is used to cache the generated function.def key_fn(lm, uvw, frequency, ncorrs=4): ''' Produce a unique key for the arguments of _generate_phase_delay_kernel ''' return (lm.dtype, uvw.dtype, frequency.dtype, ncorrs) _code_template = jinja2.Template(''' #define ncorrs {{ncorrs}} __global__ void phase_delay( const {{lm_type}} * lm, const {{uvw_type}} * uvw, const {{freq_type}} * frequency, {{out_type}} * out) { ... } ''') _type_map = { np.float32: 'float', np.float64: 'double' } @memoize_on_key(key_fn) def _generate_phase_delay_kernel(lm, uvw, frequency, ncorrs=4): ''' Generate the phase delay kernel ''' out_dtype = np.result_type(lm.dtype, uvw.dtype, frequency.dtype) code = _code_template.render(lm_type=_type_map[lm.dtype], uvw_type=_type_map[uvw.dtype], freq_type=_type_map[frequency.dtype], ncorrs=ncorrs) return cp.RawKernel(code, "phase_delay")
Methods
__call__
(fn)Call self as a function.
dask#
|
Progress Bar that displays elapsed time as well as an estimate of total time taken. |
- class africanus.util.dask_util.EstimatingProgressBar(minimum=0, width=42, dt=1.0, out=sys.stdout)[source]#
Progress Bar that displays elapsed time as well as an estimate of total time taken.
When starting a dask computation, the bar examines the graph and determines the number of chunks contained by a dask collection.
During computation the number of completed chunks and their the total time taken to complete them are tracked. The average derived from these numbers are used to estimate total compute time, relative to the current elapsed time.
The bar is not particularly accurate and will underestimate near the beginning of computation and seems to slightly overestimate during the buk of computation. However, it may be more accurate than the default dask task bar which tracks number of tasks completed by total tasks.
- Parameters:
- minimumint, optional
Minimum time threshold in seconds before displaying a progress bar. Default is 0 (always display)
- widthint, optional
Width of the bar, default is 42 characters.
- dtfloat, optional
Update resolution in seconds, default is 1.0 seconds.
CUDA#
|
Determine the grid size, given space dimensions sizes and blocks |
Patterns#
|
General Multiton metaclass |
|
Lazy instantiation of a proxied object. |
|
- class africanus.util.patterns.Multiton(*args, **kwargs)[source]#
General Multiton metaclass
Implementation of the Multiton pattern, which always returns a unique object for a unique set of arguments provided to a class constructor. For example, in the following, only a single instance of A with argument 1 is ever created.
class A(metaclass=Multiton): def __init__(self, *args, **kw): self.args = args self.kw = kw assert A(1) is A(1) assert A(1, "bob") is not A(1)
This is useful for ensuring that only a single instance of a heavy-weight resource such as files, sockets, thread/process pools or database connections is created in a single process, for a unique set of arguments.
Notes
Instantiation of object instances is thread-safe.
- class africanus.util.patterns.LazyProxy(fn, *args, **kwargs)[source]#
Lazy instantiation of a proxied object.
A LazyProxy proxies an object which is lazily instantiated on first use. It is primarily useful for embedding references to heavy-weight resources in a dask graph, so they can be pickled and sent to other workers without immediately instantiating those resources.
To this end, the proxy takes as arguments:
a class or factory function that instantiates the desired resource.
*args and **kwargs that should be supplied to the instantiator.
f = LazyProxy(open, "test.txt", mode="r") f.write("Hello World!") f.close()
In addition to the class/factory function, it is possible to specifiy a Finaliser supplied to
weakref.finalize
that is called to cleanup the resource when the LazyProxy is garbage collected. In this case, the first argument should be a tuple of two elements: the factory and the finaliser.# LazyProxy defined with factory function and finaliser function def finalise_file(file): file.close() f2 = LazyProxy((open, finalise_file), "test.txt", mode="r") class WrappedFile: def __init__(self, *args, **kwargs): self.handle = open(*args, **kwargs) def close(self): self.handle.close() # LazyProxy defined with class f1 = LazyProxy((WrappedFile, WrappedFile.close), "test.txt", mode="r")
LazyProxy objects are designed to be embedded in
dask.array.blockwise()
calls. For example:# Specify the start and length of each range file_ranges = np.array([[0, 5], [5, 10], [15, 5] [20, 10]]) # Chunk each range individually da_file_ranges = dask.array(file_ranges, chunks=(1, 2)) # Reference a binary file file_proxy = LazyProxy(open, "data.dat", "rb") def _read(file_proxy, file_range): # Seek to range start and read the length of data start, length = file_range file_proxy.seek(start) return np.asarray(file_proxy.read(length), np.uint8) data = da.blockwise(_read, "x", # Embed the file_proxy in the graph file_proxy, None, # Pass each file range to the _read da_file_ranges, "xy", # output chunks should have the length # of each range adjust_chunks={"x": tuple(file_ranges[:, 1])}, concatenate=True) print(data.compute(processes=True))
- Parameters:
- fnclass or callable or tuple
A callable object that used to create the proxied object. In tuple form, this should consist of two callables. The first should create the proxied object and the second should be a finaliser that performs cleanup on the proxied object when the LazyProxy is garbage collected: it is passed directly to
weakref.finalize
.- *argstuple
Positional arguments passed to the callable object specified in fn that will create the proxied object. The contents of *args should be pickleable.
- **kwargsdict
Keyword arguments passed to the callable object specified in fn that will create the proxied object. The contents of **kwargs should be pickleable.
Notes
Instantiation of the proxied object is thread-safe.
LazyProxy’s are configured to never instantiate within
dask.array.blockwise()
anddask.blockwise.blockwise()
calls.
- class africanus.util.patterns.LazyProxyMultiton(*args, **kwargs)[source]#
Combination of a
LazyProxy
with aMultiton
Ensures that only a single
LazyProxy
is ever created for the given constructor arguments.class A: def __init__(self, value): self.value = value assert LazyProxyMultiton("foo") is LazyProxyMultiton("foo")