Source code for intake_astro.table

from collections import OrderedDict
import copy
from intake.source.base import DataSource, Schema
from . import __version__


[docs]class FITSTableSource(DataSource): """Read FITS tabular data into dataframes For one or more FITS files, which can be local or remote, with support for partitioning within files. Parameters ---------- url: str or list of str files to load. Can include protocol specifiers and/or glob characters ext: str or int Extension to load. Normally 0 or 1. chunksize: int or None For partitioning within files, use this many rows per partition. This is very inefficient for compressed files, and for remote files, will require at least touching each file to discover the number of rows, before even starting to read the data. Cannot be used with FITS tables with a "heap", i.e., containing variable- length arrays. storage_options: dict or None Additional keyword arguments to pass to the storage back-end. metadata: Arbitrary information to associate with this source. After reading the schema, the source will have attributes: ``header`` - the full FITS header of one of the files as a dict, ``dtype`` - a numpy-like list of field/dtype string pairs, ``shape`` - where the number of rows will only be known if using partitioning or for a single file input. """ name = 'fits_table' container = 'dataframe' version = __version__ partition_access = True def __init__(self, url, ext=0, chunksize=None, storage_options=None, metadata=None): super(FITSTableSource, self).__init__(metadata) self.url = url self.ext = ext self.chunks = chunksize self.storage_options = storage_options or {} self.df = None self.files = None def _get_schema(self): from dask.bytes import open_files import dask.dataframe as dd from dask.base import tokenize import dask if self.df is None: self.files = open_files(self.url, **self.storage_options) name = 'fits-table-' + tokenize(self.url, self.chunks, self.ext) dpart = dask.delayed(_get_fits_section) parts = [] dtype = None length = 0 for part in self.files: if self.chunks: header, dtype, shape = _get_fits_header(part, self.ext) l = shape[0] for start in range(0, l, self.chunks): section = (start, min(start + self.chunks, l)) parts.append(dpart(part, self.ext, section)) length += l else: if dtype is None: header, dtype, shape = _get_fits_header(part, self.ext) if len(self.files) == 1: # if not sectioning, we don't try to find the total # number of rows, so we only know this for exactly # one file length = shape[0] parts.append(dpart(part, self.ext, None)) self.header, self.dtype, self.shape = header, dtype, ( (length or None), shape[1]) self.df = dd.from_delayed(parts, prefix=name, meta=dtype) self._schema = Schema( dtype=self.dtype, shape=self.shape, extra_metadata=self.header, npartitions=self.df.npartitions ) return self._schema
[docs] def to_dask(self): self._get_schema() return self.df
[docs] def read_chunked(self): return self.to_dask()
[docs] def read_partition(self, i): self._get_schema() return self.df.get_partition(i).compute()
[docs] def read(self): self._get_schema() return self.df.compute()
def _close(self): self.df = None self.files = None
def _get_fits_section(fn, ext=0, section=None): import numpy as np import pandas as pd with copy.copy(fn) as f: if section is None: from astropy.table import Table t = Table.read(f, hdu=ext, format='fits') return t.to_pandas() else: start, end = section from astropy.io.fits import open from astropy.io.fits.hdu.table import _FormatP, _FormatQ # copied from hdu._get_tbdata() hdus = open(f, memmap=False, cache=False) hdu = hdus[ext] if (any(type(r) in (_FormatP, _FormatQ) for r in hdu.columns._recformats) and hdu._data_size is not None and hdu._data_size > hdu._theap): raise ValueError('Can only read sections from tables without ' 'heap; use section=None') data_offset = hdu._data_offset + hdu.columns.dtype.itemsize * start raw_data = hdu._get_raw_data((end - start), hdu.columns.dtype, data_offset) data = raw_data.view(np.rec.recarray) hdu._init_tbdata(data) data = data.view(hdu._data_type) data._coldefs = hdu.columns data._character_as_bytes = hdu._character_as_bytes # byteswapping by copy out = np.empty(len(data), dtype=_dtypes(hdu)) for col in out.dtype.fields: out[col][:] = data[col] df = pd.DataFrame(out) return df def _get_fits_header(fn, ext=0): with copy.copy(fn) as f: from astropy.io.fits import open hdu = open(f, memmap=False, cache=False)[ext] return dict(hdu.header.items()), _dtypes(hdu), ( hdu._nrows, len(hdu.columns)) def _dtypes(hdu): dt = hdu.columns.dtype.newbyteorder('=').descr dt2 = [] for i, (field, d) in enumerate(dt): # TODO: may have other special types, perhaps in special function if hdu.header['TFORM%i' % (i + 1)] == "L": dt2.append((field, '|b1')) elif hdu.columns[field].bzero == 32768: # convention for unsigned type dt2.append((field, d.replace('i', 'u'))) else: dt2.append((field, d)) return dt2