from astropy.io import fits
from astropy import log
import numpy as np
import astropy.units as u
from astropy.nddata import support_nddata
from astropy.wcs import wcs
from astropy.table.table import Table
from smoothy.upi.data import Data
@support_nddata
def remove_superfluous(data, wcs=None, mask=None, unit=None,meta=None, restfrq=None):
if data.shape[0] == 1:
log.info("Removing superfluous dimension "+wcs.axis_type_names[wcs.naxis - 1]+" (kept in the metadata)")
cs=wcs.dropaxis(wcs.naxis - 1)
if mask is not None:
mk=mask[0]
dt=data[0]
else:
return(Data(data,wcs=wcs,unit=unit,mask=mask,meta=meta))
newimg=Data(dt,wcs=cs,unit=unit,mask=mk,meta=meta)
return(remove_superfluous(newimg))
[docs]def HDU_to_Data(hdu):
"""
Create an N-dimensional dataset from an HDU component.
Parameters
----------
hdu : HDU object
HDU to transform into an N-dimensional dataset.
Returns
-------
result: astropy.nddata.NDDataRef with data from the HDU object.
"""
hdu.verify("fix")
data = hdu.data
meta = hdu.header
mask = np.isnan(data)
# Hack to correct wrong uppercased units generated by CASA
try:
bscale = meta['BSCALE']
except KeyError:
bscale = 1.0
try:
bzero = meta['BZERO']
except KeyError:
bzero = 0.0
try:
bsu = meta['BUNIT']
bsu = bsu.lower()
bsu = bsu.replace("jy", "Jy")
bunit = u.Unit(bsu, format="fits")
except KeyError:
bunit = u.Unit("u.Jy/u.beam")
rem_list = []
for i in meta.items():
if i[0].startswith('PC00'):
rem_list.append(i[0])
for e in rem_list:
meta.remove(e)
mywcs = wcs.WCS(meta)
data = data * bscale + bzero
# Create astropy units
#if len(data.shape) == 4:
# Put data in physically-meaninful values, and remove stokes
# TODO: Stokes is removed by summing (is this correct? maybe is averaging?)
#log.info("4D data detected: assuming RA-DEC-FREQ-STOKES (like CASA-generated ones), and dropping STOKES")
# data = data.sum(axis=0) * bscale + bzero
# mask = np.logical_and.reduce(mask,axis=0)
# mywcs = mywcs.dropaxis(3)
#elif len(data.shape) == 3:
# log.info("3D data detected: assuming RA-DEC-FREQ")
#
#elif len(data.shape) == 2:
# log.info("2D data detected: assuming RA-DEC")
# data = data * bscale + bzero
#else:
# log.error("Only 3D data allowed (or 4D in case of polarization)")
# raise TypeError
return remove_superfluous(Data(data, uncertainty=None, mask=mask, wcs=mywcs, meta=meta, unit=bunit))
[docs]def HDU_to_Table(hdu):
"""
Create a data table from a HDU component.
Parameters
----------
hdu : HDU object
HDU to transform into a data table.
Returns
-------
result: astropy.table.Table with data from the HDU.
"""
log.warning("FITS Table ---> AstroPy Table not implemented Yet")
# return atable.ATable(data=hdu.data,meta=hdu.header)
[docs]def Table_to_HDU(tab):
"""
Create a HDU object from a data table.
Parameters
----------
tab : astropy.table.Table
Table to transform into a HDU object.
Returns
-------
result: HDU object with data from the data table.
"""
# if tab.data.masked:???
# dtmp = [col.filled(None) for col in six.itervalues(self.columns)]
hdu = fits.BinTableHDU.from_columns(np.array(tab))
if tab.meta is not None:
for k, v in tab.meta.items():
hdu.header[k] = v
return hdu
[docs]def Data_to_HDU(cube, primary=False):
"""
Create a HDU object from an N-dimensional dataset.
Parameters
----------
cube : numpy.ndarray or astropy.nddata.NDData or or astropy.nddata.NDDataRef
Astronomical data cube.
primary : bool
Whether to pick the primary or an image HDU.
Returns
-------
result: HDU object with data from the data cube.
"""
header = cube.wcs.to_header()
if primary == True:
hdu = fits.PrimaryHDU(cube.data, header=header)
else:
hdu = fits.ImageHDU(cube.data, header=header)
if cube.meta is not None:
for k, v in cube.meta.items():
hdu.header[k] = v
return hdu
[docs]def load_fits(filePath, primary = False):
"""
Loads a FITS file and converts it into an N-Dimensional Dataset.
Parameters
----------
filepath : path of the FITS file.
primary : bool
if True it gets only primmary data-cube.
Returns
-------
Primary NDData image or astropy table, and/or:
N-Dimensional Datasets and Astropy Tables lists
"""
if primary == True:
hdulist = fits.open(filePath, lazy_load_hdus=True)
log.info('Processing PrimaryHDU Object 0')
hduobject = hdulist[0]
if hduobject is None:
log.error('FITS PrimaryHDU is None')
raise ValueError('FITS PrimaryHDU is None')
NDData = HDU_to_Data(hduobject)
hdulist.close()
return NDData
else:
NDData_list = [None]
Table_list = []
hdulist = fits.open(filePath)
for counter, hdu in enumerate(hdulist):
if isinstance(hdu, fits.PrimaryHDU) or isinstance(hdu, fits.ImageHDU):
log.info("Processing HDU " + str(counter) + " (Image)")
try:
ndd = HDU_to_Data(hdu)
if isinstance(hdu, fits.PrimaryHDU):
NDData_list[0] = ndd
primary = ndd
else:
NDData_list[0] = None
except TypeError:
log.info(str(counter) + " (Image) wasn't an Image")
if isinstance(hdu, fits.BinTableHDU):
table = HDU_to_Table(hdu)
Table_list.append(table)
if NDData_list[0] is None:
if len(NDData_list) == 0:
primary = Table_list[0]
else:
del NDData_list[0]
primary = NDData_list[0]
return primary, NDData_list, Table_list