Source code for q3dfit.fitloop

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from q3dfit.exceptions import InitializationError
from q3dfit.fitspec import fitspec

import importlib
from typing import Optional

import os

import numpy as np
from astropy.table import Table

from q3dfit.q3din import q3din
from q3dfit.q3dutil import write_msg
from q3dfit.readcube import Cube
from q3dfit.spectConvol import spectConvol


[docs] def fitloop(ispax: int, colarr: np.ndarray, rowarr: np.ndarray, cube: Cube, q3di: q3din, listlines: Table, specConv: Optional[spectConvol], onefit: bool=False, quiet: bool=True): """ Perform fitting loops over spaxels. This function calls :py:func:`~q3dfit.fitspec.fitspec` to fit the data. Parameters ---------- ispax Value of index over which to loop. colarr Array of column #s of spaxels to be fit (0-offset). rowarr Array of row #s of spaxels to be fit (0-offset). cube :py:class:`~q3dfit.readcube.Cube` object containing data to be fit. q3di :py:class:`~q3dfit.q3din.q3din` object containing fitting parameters. listlines Emission line labels and rest frame wavelengths, as part of an astropy Table output by :py:func:`~q3dfit.linelist.linelist`. specConv Instance of :py:class:`~q3dfit.spectConvol.spectConvol` specifying the instrumental spectral resolution convolution. If None, no convolution is performed. onefit If set, only one fit is performed. Default is False. quiet Optional. Suppress progress messages. Default is False. """ i = colarr[ispax] j = rowarr[ispax] # print(i,j) if cube.dat.ndim == 1: write_msg('[spec]=[1] out of [1]', file=q3di.logfile, quiet=quiet) flux = cube.dat err = cube.err dq = cube.dq elif cube.dat.ndim == 2: write_msg(f'[spec]=[{i+1}] out of [{cube.ncols}]', file=q3di.logfile, quiet=quiet) flux = cube.dat[:, i] err = cube.err[:, i] dq = cube.dq[:, i] else: write_msg(f'[col,row]=[{i+1},{j+1}] out of [{cube.ncols},{cube.nrows}]', file=q3di.logfile, quiet=quiet) flux = cube.dat[i, j, :] err = cube.err[i, j, :] dq = cube.dq[i, j, :] errmax = max(err) ''' if q3di.vormap is not None: tmpi = cube.vorcoords[i, 0] tmpj = cube.vorcoords[i, 1] i = tmpi j = tmpj print(f'Reference coordinate: [col, row]=[{i+1}, {j+1}]', file=logfile) if not quiet: print(f'Reference coordinate: [col, row]=[{i+1}, {j+1}]') ''' outlab = os.path.join(q3di.outdir, q3di.label) if cube.dat.ndim > 1: outlab += '_{:04d}'.format(i+1) if cube.dat.ndim > 2: outlab += '_{:04d}'.format(j+1) # Apply DQ plane # The oned keyword was having trouble because np.nonzero doesn't work for # zero arrays. I think this fixes the issue if dq.ndim > 0: indx_bad = np.nonzero(dq > 0) else: indx_bad = (dq != 0) if indx_bad: indx_bad = 0 else: indx_bad = dq flux[indx_bad] = 0. err[indx_bad] = errmax*100. # Indexing to wavelength array for regions to ignore in fitting indx_cut = [] if q3di.cutrange is not None: if not isinstance(q3di.cutrange, np.ndarray): q3di.cutrange = np.array(q3di.cutrange) if q3di.cutrange.ndim == 1: ncut = 1 else: ncut = q3di.cutrange.shape[0] try: for icut in range(ncut): indx_cut.append( np.intersect1d( (cube.wave >= q3di.cutrange.ravel()[0+icut*2]).nonzero(), (cube.wave <= q3di.cutrange.ravel()[1+icut*2]).nonzero())) except: raise InitializationError('CUTRANGE not properly specified') # Check that the flux is not filled with 0s, infs, or nans somedata = ((flux != 0.).any() and (flux != np.inf).any() and (flux != np.nan).any()) if somedata: # Set up N_comp and fix/free (linevary) dictionaries for this spaxel # These are set up for the first fit and don't need to change # after checkcomp() is called if q3di.dolinefit: ncomp = dict() linevary = dict() # Extract # of components and line fix/free specific to this spaxel # and write as dict. Do this outside of while loop because ncomp # may change with another iteration. Not sure how linevary should # change with iteration yet [TODO] # For # comp, each dict key (line) will have one value. # For linevary, each line will have a dict of arrays: # dict has keys {'flx', 'cwv', 'sig'} with a value for each comp for line in q3di.lines: ncomp[line] = q3di.ncomp[line][i, j] linevary[line] = dict() linevary[line]['flx'] = q3di.linevary[line]['flx'][i, j, :] linevary[line]['cwv'] = q3di.linevary[line]['cwv'][i, j, :] linevary[line]['sig'] = q3di.linevary[line]['sig'][i, j, :] else: ncomp = None linevary = None # First fit dofit = True abortfit = False while dofit: # Default values # These are parameters that are reset after checkcomp is called. # These Nones apply if no line fit is done. listlinesz = None siglim_gas = None siginit_gas = None #tweakcntfit = None # This is if no continuum fit is done zstar = None siginit_stars = None # Are we doing a line fit? if q3di.dolinefit: # initialize for this spaxel: # gas sigma initial guess array # gas sigma limit array # redshifted starting wavelengths for lines # peak flux initial guess array siginit_gas = dict() siglim_gas = dict() listlinesz = dict() for k in q3di.lines: siginit_gas[k] = q3di.siginit_gas[k][i, j, ] siglim_gas[k] = q3di.siglim_gas[k][i, j, ] listlinesz[k] = \ np.array(listlines['lines'][(listlines['name'] == k)], dtype=np.float64) * (1. + q3di.zinit_gas[k][i, j, ]) if q3di.docontfit: # initialize for this spaxel: # stellar redshift initial guess # stellar sigma initial guess if q3di.zinit_stars is not None: zstar = q3di.zinit_stars[i, j] siginit_stars = q3di.siginit_stars[i, j] # option to tweak continuum fit #if q3di.tweakcntfit is not None: # tweakcntfit = q3di.tweakcntfit[i, j, :, :] # regions to ignore in fitting: set to max(err) for indx in indx_cut: if indx.size != 0: dq[indx] = 1 err[indx] = errmax*100. write_msg('FITLOOP: First call to FITSPEC', file=q3di.logfile, quiet=quiet) q3do_init = fitspec(cube.wave, flux, err, dq, q3di, zstar=zstar, siginit_stars=siginit_stars, listlines=listlines, listlinesz=listlinesz, ncomp=ncomp, linevary=linevary, siginit_gas=siginit_gas, siglim_gas=siglim_gas, specConv=specConv, quiet=quiet, #weakcntfit=tweakcntfit, fluxunit=cube.fluxunit_out, waveunit=cube.waveunit_out) # Abort if no good data if q3do_init.nogood: abortfit = True write_msg('FITLOOP: Aborting fit; no good data to fit.', file=q3di.logfile, quiet=quiet) else: if q3di.dolinefit and ncomp is not None: write_msg('FIT STATUS: '+str(q3do_init.fitstatus), file=q3di.logfile, quiet=quiet) # Second fit if not onefit and not abortfit: # default values for no line fit maskwidths = None peakinit = None siginit_gas = None if q3do_init.dolinefit: # get the line fit parameters from the first fit q3do_init.sepfitpars() # initialize for this spaxel: # initial guess for redshifted wavelengths for lines # maskwidths for lines # initial guess for peak fluxes # initial guess for gas sigmas listlinesz = q3do_init.line_fitpars['wave'] maskwidths = q3do_init.line_fitpars['sigma_obs'] # Multiply sigmas from first fit by MASKSIG_SECONDFIT # to get half-widths for masking for col in maskwidths.columns: maskwidths[col] *= q3di.masksig_secondfit peakinit = q3do_init.line_fitpars['fluxpk_obs'] # # If initial guess is too low, set to something minimum to prevent # # fitter from choking (since we limit peak to be >= 0) # peakinit_min = \ # np.sqrt(np.median(q3do_init.line_dat[q3do_init.gd_indx]**2.)) # for line in peakinit.columns: # peakinit[line] = \ # np.where(peakinit[line] < peakinit_min, # peakinit_min, peakinit[line]) siginit_gas = q3do_init.line_fitpars['sigma'] write_msg('FITLOOP: Second call to FITSPEC', file=q3di.logfile, quiet=quiet) if q3di.fcncontfit == 'questfit' and \ hasattr(q3di,'argscontfit'): q3di.argscontfit['rows'] = j+1 q3di.argscontfit['cols'] = i+1 q3do = fitspec(cube.wave, flux, err, dq, q3di, zstar=q3do_init.zstar, siginit_stars=siginit_stars, listlines=listlines, listlinesz=listlinesz, ncomp=ncomp, linevary=linevary, maskwidths=maskwidths, peakinit=peakinit, siginit_gas=siginit_gas, siglim_gas=siglim_gas, #tweakcntfit=tweakcntfit, specConv=specConv, quiet=quiet, fluxunit=cube.fluxunit_out, waveunit=cube.waveunit_out) if q3do_init.dolinefit and ncomp is not None: write_msg('FIT STATUS: '+str(q3do.fitstatus), file=q3di.logfile, quiet=quiet) elif onefit and not abortfit: if q3do_init.dolinefit: q3do_init.sepfitpars() q3do = q3do_init else: q3do = q3do_init # Check components if q3di.dolinefit \ and not onefit \ and not abortfit: q3do.sepfitpars() if q3di.checkcomp and ncomp: ccModule = \ importlib.import_module('q3dfit.' + q3di.fcncheckcomp) fcncheckcomp = getattr(ccModule, q3di.fcncheckcomp) # Note that this modifies the value of ncomp if necessary newncomp = fcncheckcomp(q3do.line_fitpars, q3di.linetie, ncomp, siglim_gas, **q3di.argscheckcomp) if len(newncomp) > 0: for line, nc in newncomp.items(): write_msg(f'FITLOOP: Repeating the fit of {line} with ' + f'{nc} components.', file=q3di.logfile, quiet=quiet) # Check that # components being fit to at least one line # is > 0 nonzerocomp = np.where(np.array(list(ncomp.values())) != 0)[0] if len(nonzerocomp) == 0: write_msg('FITLOOP: No lines with non-zero components. ' + 'Repeating fit with coninuum only.', file=q3di.logfile, quiet=quiet) ncomp = None else: dofit = False else: dofit = False else: dofit = False if not abortfit: # save q3do q3do.col = i+1 q3do.row = j+1 # update q3do with cube properties q3do.fluxnorm = cube.fluxnorm q3do.pixarea_sqas = cube.pixarea_sqas # np.save(outlab, q3do, allow_pickle=True)