# -*- coding: utf-8 -*-
from __future__ import annotations
import copy
import numpy as np
import time
from importlib import import_module
from typing import Literal, Optional
from astropy.constants import c
from astropy.table import Table
from astropy import units as u
from numpy.typing import ArrayLike
from ppxf.ppxf import ppxf
from ppxf.ppxf_util import log_rebin
from scipy.interpolate import interp1d
from . import q3din, q3dmath, q3dout
from q3dfit.q3dutil import lmlabel, write_msg
from q3dfit.spectConvol import spectConvol
[docs]
def fitspec(wlambda: np.ndarray,
flux: np.ndarray,
err: np.ndarray,
dq: np.ndarray,
q3di: q3din.q3din,
zstar: Optional[float]=None,
listlines: Optional[Table]=None,
listlinesz: Optional[dict[str, np.ndarray]]=None,
ncomp: Optional[dict[str, int]]=None,
linevary: Optional[dict[str, dict[str, np.ndarray]]]=None,
maskwidths: Optional[Table | dict[str, np.ndarray]]=None,
peakinit: Optional[Table | dict[str, np.ndarray]]=None,
siginit_gas: Optional[dict[str, np.ndarray]]=None,
siginit_stars: Optional[float]=None,
siglim_gas: Optional[dict[str, np.ndarray]]=None,
specConv: Optional[spectConvol]=None,
fluxunit: Optional[str]=None,
waveunit: Optional[str]=None,
quiet: bool=True
#tweakcntfit=None,
) -> q3dout.q3dout:
"""
This function is the core routine to fit the continuum and emission
lines of a spectrum.
Parameters
----------
wlambda
Spectrum, observed-frame wavelengths.
flux
Spectrum, fluxes.
err
Spectrum, flux errors.
q3di
Instance of :py:class:`~q3dfit.q3din.q3din` initializing the fit.
zstar
Optional. Initial guess for stellar redshift. Use None if no continuum
fit is done.
siginit_stars
Optional. Initial guess for stellar line widths for fitting. It is created
by :py:func:`~q3dfit.q3din.init_contfit` or specified on input. Set to None
if no continuum fit is done.
listlines
Optional. Emission line labels and rest frame wavelengths, as part of an astropy
Table output by :py:func:`~q3dfit.linelist.linelist`. Use None if no emission-line
fit is done.
listlinesz
Optional. Initial guess for emission line observed frame wavelengths. Use
None if no emission-line fit is done.
ncomp
Optional. Number of components fit to each line. Use None if no emission-line
fit is done.
linevary
Optional. Dictionary specifying which parameters to vary for each line.
Nested dictionary with line names as keys, and dictionaries as values.
This is passed to :py:mod:`~q3dfit.lineinit.lineinit`, or another
function specified in :py:attr:`~q3dfit.q3din.q3din.fcnlineinit`.
These dictionaries have keys 'flx', 'cen', and 'sig', with boolean arrays
as values. Default is None.
maskwidths
Optional. Widths, in km/s, of emission-line regions to mask from continuum fit.
If set to None, routine will look for `maskwidths` attribute in
:py:class:`~q3dfit.q3din.q3din`. If this is None and both continuum and emission-
line fits are attempted, routine defaults to value
set in `maskwidths_def` attribute of :py:class:`~q3dfit.q3din.q3din`.
peakinit
Optional. Initial guess for peak emission-line flux densities. If set to
None, routine will look for `peakinit` attribute in
:py:class:`~q3dfit.q3din.q3din`. If this is None and line fit is done,
routine estimates from data.
siginit_gas
Optional. Initial guess for emission line widths for fitting. It is created
by :py:func:`~q3dfit.q3din.init_linefit` or specified on input. Set to None
if no emission-line fit is done.
siglim_gas
Optional. Sigma limits for line fitting. It is created by
:py:func:`~q3dfit.q3din.init_linefit` or specified on input. Set to None
if no emission-line fit is done.
specConv
Optional. Instance of :py:class:`~q3dfit.spectConvol.spectConvol` specifying
the instrumental spectral resolution convolution. If None, no convolution
is done.
fluxunit
Optional. Units of flux density defined in :py:class:`~q3dfit.readcube.Cube`.
Default is None.
waveunit
Optional. Units of wavelength defined in :py:class:`~q3dfit.readcube.Cube`.
Default is None.
quiet
Optional. If False, some progress messages are written to stdout.
Default is True.
Returns
-------
q3dout.q3dout
Instance of :py:class:`~q3dfit.q3dout.q3dout` containing the fit results.
"""
usetype=np.float64
flux = copy.deepcopy(flux)
err = copy.deepcopy(err)
# Some of this logic may be duplicative
if q3di.docontfit and \
q3di.startempfile is not None and \
zstar is not None and \
q3di.fcncontfit != 'questfit':
# Get stellar templates
startempfile = q3di.startempfile
if isinstance(startempfile, bytes):
startempfile = startempfile.decode('utf-8')
template = np.load(startempfile, allow_pickle=True).item()
# Redshift stellar templates
templatelambdaz = np.copy(template['lambda'])
if not q3di.keepstarz:
templatelambdaz *= 1. + zstar
# We also need to adjust 'sigma' if it exists
if 'sigma' in template:
template['sigma'] *= 1. + zstar
# Need option for when template is in vac and data in air ...
if q3di.vacuum and not q3di.startempvac:
templatelambdaz = airtovac(templatelambdaz, logfile=q3di.logfile, quiet=quiet)
if waveunit is not None:
if q3di.startempwaveunit != waveunit:
lambda_tmp = templatelambdaz * u.Unit(q3di.startempwaveunit)
templatelambdaz = lambda_tmp.to(u.Unit(waveunit)).value
# templatelambdaz *= q3di['waveunit']
if specConv is not None and 'sigma' in template:
# Convolve stellar template to match instrumental resolution
# Need to ensure templates and data have the same wavelength units
if waveunit is not None:
if q3di.startempwaveunit != waveunit:
sigma_tmp = template['sigma'] * u.Unit(q3di.startempwaveunit)
template['sigma'] = sigma_tmp.to(u.Unit(waveunit)).value
if template['flux'].ndim == 1:
# If template is 1D, convolve it directly
template['flux'] = specConv.spect_convolver(
templatelambdaz, template['flux'], dsig=template['sigma'])
else:
for i in range(template['flux'].shape[1]):
template['flux'][:,i] = specConv.spect_convolver(
templatelambdaz, template['flux'][:,i], dsig=template['sigma'])
else:
templatelambdaz = wlambda
# ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
# # Pick out regions to fit
# ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
if q3di.fitrange is not None:
fitran_tmp = list(map(np.float64, q3di.fitrange))
else:
fitran_tmp = [wlambda[0], wlambda[len(wlambda)-1]]
# indices locating good data and data within fit range
# these index the full data range.
gd_indx_1 = set(np.where(flux != 0.)[0])
gd_indx_2 = set(np.where(err > 0.)[0])
gd_indx_3 = set(np.where(np.isfinite(flux))[0])
gd_indx_4 = set(np.where(np.isfinite(err))[0])
gd_indx_5 = set(np.where(dq == 0)[0])
gd_indx_6 = set(np.where(wlambda >= min(templatelambdaz))[0])
gd_indx_7 = set(np.where(wlambda <= max(templatelambdaz))[0])
gd_indx_8 = set(np.where(wlambda >= fitran_tmp[0])[0])
gd_indx_9 = set(np.where(wlambda <= fitran_tmp[1])[0])
gd_indx_full = gd_indx_1.intersection(gd_indx_2, gd_indx_3, gd_indx_4,
gd_indx_5, gd_indx_6, gd_indx_7,
gd_indx_8, gd_indx_9)
gd_indx_full = list(gd_indx_full)
# Check that gd_indx_full is not empty, and has more than one point
if len(gd_indx_full) > 1:
# limit actual fit range to good data
fitran = [np.min(wlambda[gd_indx_full]).astype(usetype),
np.max(wlambda[gd_indx_full]).astype(usetype)]
# indices locating data within actual fit range
fitran_indx1 = np.where(wlambda >= fitran[0])[0]
fitran_indx2 = np.where(wlambda <= fitran[1])[0]
fitran_indx = np.intersect1d(fitran_indx1, fitran_indx2)
# indices locating good regions within wlambda[fitran_indx]
gd_indx_full_rezero = gd_indx_full - fitran_indx[0]
max_gd_indx_full_rezero = max(fitran_indx) - fitran_indx[0]
igdfz1 = np.where(gd_indx_full_rezero >= 0)[0]
igdfz2 = np.where(gd_indx_full_rezero <= max_gd_indx_full_rezero)[0]
i_gd_indx_full_rezero = np.intersect1d(igdfz1, igdfz2)
# Final index for addressing ALL "good" pixels
# these address only the fitted data range; i.e., they address gdflux,
# etc.
gd_indx = gd_indx_full_rezero[i_gd_indx_full_rezero]
# Limit data to fit range
gdflux = flux[fitran_indx]
gdlambda = wlambda[fitran_indx]
gderr = err[fitran_indx]
gddq = dq[fitran_indx]
gdinvvar = 1./np.power(gderr, 2.) # inverse variance
# Log rebin galaxy spectrum for PPXF
gdflux_log, gdlambda_log, velscale = log_rebin(fitran, gdflux)
gderrsq_log, _, _ = log_rebin(fitran, np.power(gderr, 2.))
gderr_log = np.sqrt(gderrsq_log)
# gdinvvar_log = 1./np.power(gderr_log, 2.)
# Find where flux is <= 0 or error is <= 0 or infinite or dq != 0
# these index the fitted data range
# We ignore nans because we'll set bad points to nan later
zerinf_indx_1 = np.where(gdflux == 0.)[0]
zerinf_indx_2 = np.where(gderr <= 0.)[0]
zerinf_indx_3 = np.where(np.isinf(gdflux))[0]
zerinf_indx_4 = np.where(np.isinf(gderr))[0]
zerinf_indx_5 = np.where(gddq != 0)[0]
zerinf_indx = np.unique(np.hstack([zerinf_indx_1, zerinf_indx_2,
zerinf_indx_3, zerinf_indx_4,
zerinf_indx_5]))
# Have to treat nans and infs in log space separately
# "NumPy uses the IEEE Standard for Binary Floating-Point for Arithmetic
# (IEEE 754). This means that Not a Number is not equivalent to infinity."
zerinf_indx_1 = np.where(gdflux_log == 0.)[0]
zerinf_indx_2 = np.where(gderr_log <= 0.)[0]
zerinf_indx_3 = np.where(np.isinf(gdflux_log))[0]
zerinf_indx_4 = np.where(np.isinf(gderr_log))[0]
zerinf_indx_5 = np.where(np.isnan(gdflux_log))[0]
zerinf_indx_6 = np.where(np.isnan(gderr_log))[0]
# to-do: log rebin dq and apply here?
zerinf_indx_log = np.unique(np.hstack([zerinf_indx_1, zerinf_indx_2,
zerinf_indx_3, zerinf_indx_4,
zerinf_indx_5, zerinf_indx_6]))
# good indices for log arrays
ctfitran = len(gdflux_log)
gd_indx_log = np.arange(ctfitran)
ctzerinf_log = len(zerinf_indx_log)
if ctzerinf_log > 0:
gd_indx_log = np.setdiff1d(gd_indx_log, zerinf_indx_log)
# Set bad points to nan so lmfit will ignore
ctzerinf = len(zerinf_indx)
if ctzerinf > 0:
gdflux[zerinf_indx] = np.nan
gderr[zerinf_indx] = np.nan
gdinvvar[zerinf_indx] = np.nan
write_msg('{:s}{:0f}{:s}'.format('FITLOOP: Setting ', int(ctzerinf),
' points from zero/inf flux or neg/zero/inf error to np.nan'),
file=q3di.logfile, quiet=quiet)
if ctzerinf_log > 0:
# can't just use np.nan because ppxf will choke on it
gdflux_log[zerinf_indx_log] = 0.
gderr_log[zerinf_indx_log] = 100.*max(gderr_log)
# gdinvvar_log[zerinf_indx_log] = np.nan
# ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
# Initialize fit
# ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
q3do = q3dout.q3dout(gdlambda, gdflux, gderr, fitrange=fitran,
gd_indx=gd_indx, fitran_indx=fitran_indx)
else:
gdflux = np.array((0.))
q3do = q3dout.q3dout(0., 0., 0., nogood=True)
if fluxunit is not None:
q3do.fluxunit = fluxunit
if waveunit is not None:
q3do.waveunit = waveunit
# timer
fit_time0 = time.time()
# ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
# Fit continuum
# ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
if q3di.docontfit and not q3do.nogood:
q3do.init_contfit(zstar=zstar)
# Some defaults. These only apply in case of fitting with stellar model
# + additive polynomial.
# stel_mod = 0.
# poly_mod = 0.
# Mask emission lines
# Column names are line labels, rows are components
# Check on ncomp is needed in case that we started with a line fit
# but all components were masked out by checkcomp
if q3di.dolinefit and not q3di.nolinemask and ncomp is not None:
if maskwidths is None:
if q3di.maskwidths is not None:
maskwidths = q3di.maskwidths
else:
maskwidths = \
Table(np.full([q3di.maxncomp, listlines['name'].size],
q3di.maskwidths_def, dtype=usetype),
names=listlines['name'])
# This loop overwrites nans in the case that ncomp gets lowered
# by checkcomp; these nans cause masklin to choke
for line in listlines['name']:
for comp in range(ncomp[line], q3di.maxncomp):
maskwidths[line][comp] = 0.
listlinesz[line][comp] = 0.
# Convert maskwidths to a Table if it's input as a dict in q3din
if isinstance(maskwidths, dict):
maskwidths = Table(maskwidths)
# Mask emission lines in linear space
q3do.ct_indx = masklin(gdlambda, listlinesz, maskwidths,
nomaskran=q3di.nomaskran)
# Mask emission lines in log space
ct_indx_log = masklin(np.exp(gdlambda_log), listlinesz,
maskwidths, nomaskran=q3di.nomaskran)
else:
q3do.ct_indx = np.arange(len(gdlambda))
ct_indx_log = np.arange(len(gdlambda_log))
q3do.ct_indx = np.intersect1d(q3do.ct_indx, gd_indx)
ct_indx_log = np.intersect1d(ct_indx_log, gd_indx_log)
# ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
# # Option 1: Input function that is not ppxf
# ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
if q3di.fcncontfit != 'ppxf':
# get fitting function
module = import_module('q3dfit.contfit')
fcncontfit = getattr(module, q3di.fcncontfit)
if q3di.argscontfit is not None:
argscontfit = q3di.argscontfit
else:
argscontfit = dict()
if q3di.fcncontfit == 'fitqsohost':
argscontfit['fitran'] = fitran
argscontfit['medflux'] = np.median(gdflux[q3do.ct_indx])
if q3di.fcncontfit == 'questfit':
argscontfit['fluxunit'] = fluxunit
#argscontfit['waveunit'] = waveunit
if 'refit' in argscontfit.keys():
argscontfit['zstar'] = (copy.copy(zstar)).astype(usetype)
if argscontfit['refit'] == 'ppxf':
argscontfit['template_wave'] = templatelambdaz
argscontfit['template_flux'] = template['flux']
argscontfit['index_log'] = ct_indx_log
argscontfit['flux_log'] = gdflux_log
argscontfit['err_log'] = gderr_log
argscontfit['siginit_stars'] = siginit_stars
argscontfit['waveunit'] = waveunit
if hasattr(q3di, 'av_star'):
argscontfit['av_star'] = q3di.av_star
if argscontfit['refit'] == 'questfit':
argscontfit['fluxunit'] = fluxunit
#argscontfit['waveunit'] = waveunit
if q3di.fcncontfit == 'linfit_plus_FeII':
argscontfit['specConv'] = specConv
q3do.cont_fit, q3do.ct_coeff, zstarout, zstarouterr = \
fcncontfit(gdlambda.astype(usetype),
gdflux.astype(usetype),
gdinvvar.astype(usetype),
q3do.ct_indx, logfile=q3di.logfile,
quiet=quiet, **argscontfit)
#
if zstarout is not None:
q3do.zstar = copy.copy(zstarout)
if zstarouterr is not None:
q3do.zstar_err = copy.copy(zstarouterr)
# ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
# # Option 2: PPXF
# ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
elif q3di.startempfile is not None:
# Interpolate template to same grid as data
temp_log = q3dmath.interptemp(gdlambda_log, np.log(templatelambdaz),
template['flux'])
# Check polynomial degree
add_poly_degree = 4
mult_poly_degree = 0
if 'add_poly_degree' in q3di.argscontfit:
add_poly_degree = q3di.argscontfit['add_poly_degree']
if 'mult_poly_degree' in q3di.argscontfit:
mult_poly_degree = q3di.argscontfit['mult_poly_degree']
if 'bounds' in q3di.argscontfit:
bounds = q3di.argscontfit['bounds']
else:
# bounds = [[-1000., 1000.], [0., 1000.]]
bounds = None
# Rest wavelength if reddening is applied
# must be in Angstroms
redlam = np.exp(gdlambda_log)/(1. + q3do.zstar)
if waveunit is not None:
if waveunit == 'micron':
redlam *= 1.e4
# run ppxf
#import pdb; pdb.set_trace()
pp = ppxf(temp_log, gdflux_log, gderr_log, velscale,
[0, siginit_stars],
bounds=bounds,
goodpixels=ct_indx_log,
degree=add_poly_degree,
mdegree=mult_poly_degree,
reddening=q3di.av_star,
lam=redlam,
quiet=quiet)
# Errors in best-fit velocity and dispersion.
# From PPXF docs:
# These errors are meaningless unless Chi^2/DOF~1.
# However if one *assumes* that the fit is good ...
solerr = copy.copy(pp.error)
q3do.ct_rchisq = pp.chi2
solerr *= np.sqrt(pp.chi2)
# best-fit in log space
continuum_log = pp.bestfit
# Resample the best fit into linear space
cinterp = interp1d(gdlambda_log, continuum_log,
kind='cubic', fill_value="extrapolate")
q3do.cont_fit = cinterp(np.log(gdlambda))
if add_poly_degree > 0:
pinterp = interp1d(gdlambda_log, pp.apoly,
kind='cubic', fill_value="extrapolate")
cont_fit_poly = pinterp(np.log(gdlambda))
else:
cont_fit_poly = np.zeros_like(gdlambda)
if mult_poly_degree > 0:
mpinterp = interp1d(gdlambda_log, pp.mpoly,
kind='cubic', fill_value="extrapolate")
cont_fit_mpoly = mpinterp(np.log(gdlambda))
else:
cont_fit_mpoly = np.zeros_like(gdlambda)
ct_coeff = dict()
ct_coeff['polymod'] = cont_fit_poly
ct_coeff['mpolymod'] = cont_fit_mpoly
# this includes the multiplicative polynomial
ct_coeff['stelmod'] = q3do.cont_fit - cont_fit_poly
ct_coeff['polyweights'] = pp.polyweights
ct_coeff['stelweights'] = pp.weights
ct_coeff['av'] = pp.reddening
ct_coeff['sigma'] = pp.sol[1]
ct_coeff['sigma_err'] = solerr[1]
q3do.ct_coeff = ct_coeff
# Adjust stellar redshift based on fit
# From ppxf docs:
# IMPORTANT: The precise relation between the output pPXF velocity
# and redshift is Vel = c*np.log(1 + z).
# See Section 2.3 of Cappellari (2017) for a detailed explanation.
q3do.zstar += np.exp(pp.sol[0]/c.to('km/s').value)-1.
q3do.zstar_err = (np.exp(solerr[0]/c.to('km/s').value))-1.
# ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
# # Option to tweak cont. fit with local polynomial fits
# ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
# if q3di.tweakcntfit is not None:
# q3do.cont_fit_pretweak = q3do.cont_fit.copy()
# # Arrays holding emission-line-masked data
# ct_lambda = gdlambda[q3do.ct_indx]
# ct_flux = gdflux[q3do.ct_indx]
# ct_err = gderr[q3do.ct_indx]
# ct_cont = q3do.cont_fit[q3do.ct_indx]
# for i in range(len(tweakcntfit[0,:])):
# # Indices into full data
# tmp_ind1 = np.where(gdlambda >= tweakcntfit[i,0])[0]
# tmp_ind2 = np.where(gdlambda <= tweakcntfit[i,1])[0]
# tmp_ind = np.intersect1d(tmp_ind1,tmp_ind2)
# ct_ind = len(tmp_ind)
# # Indices into masked data
# tmp_ctind1 = np.where(ct_lambda >= tweakcntfit[i,0])[0]
# tmp_ctind2 = np.where(ct_lambda <= tweakcntfit[i,1])[0]
# tmp_ctind = np.intersect1d(tmp_ctind1,tmp_ctind2)
# ct_ctind = len(tmp_ctind)
# if ct_ind > 0 and ct_ctind > 0:
# parinfo = list(np.repeat({'value':0.},tweakcntfit[2,i]+1))
# # parinfo = replicate({value:0d},tweakcntfit[2,i]+1)
# pass # this is just a placeholder for now
# # tmp_pars = mpfitfun('poly',ct_lambda[tmp_ctind],$
# # ct_flux[tmp_ctind] - ct_cont[tmp_ctind],$
# # ct_err[tmp_ctind],parinfo=parinfo,/quiet)
# # continuum[tmp_ind] += poly(gdlambda[tmp_ind],tmp_pars)
# else:
# continuum_pretweak = continuum.copy()
if q3di.dividecont:
continuum = gdflux.copy() / q3do.cont_fit - 1
gdinvvar_nocnt = gdinvvar.copy() * np.power(q3do.cont_fit, 2.)
# gderr_nocnt = gderr / continuum
q3do.ct_method = 'divide'
else:
continuum = gdflux.copy() - q3do.cont_fit
gdinvvar_nocnt = gdinvvar.copy()
q3do.ct_method = 'subtract'
fit_time1 = time.time()
write_msg('{:s}{:0.1f}{:s}'.format('FITSPEC: Continuum fit took ',
fit_time1-fit_time0, ' s.'), file=q3di.logfile, quiet=quiet)
else:
continuum = gdflux.copy()
# ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
# Fit emission lines
# ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
if q3di.dolinefit and not q3do.nogood and ncomp is not None:
q3do.init_linefit(listlines, q3di.lines, q3di.maxncomp,
line_dat=continuum.astype(usetype))
# Make sure line within fitrange
# To deal with case of truncated data where continuum can be fit but
# line not within good data range
line_in_good_range = False
for line in q3di.lines:
if line_in_good_range:
break
for icomp in range(0, ncomp[line]):
if listlinesz[line][icomp] >= min(gdlambda) and \
listlinesz[line][icomp] <= max(gdlambda):
line_in_good_range = True
break
if line_in_good_range:
# Initial guesses for emission line peak fluxes (above continuum)
peakinit_min = np.sqrt(np.median(q3do.line_dat[gd_indx]**2.))
if peakinit is None:
#if q3di.peakinit is not None and isinstance(q3di.peakinit, Table):
# peakinit = q3di.peakinit
#else:
# initialize peakinit
peakinit = Table(np.full([q3di.maxncomp, listlines['name'].size],
0., dtype=usetype), names=listlines['name'])
# peakinit = {line: np.zeros(q3di.maxncomp) for line in q3di.lines}
# apply some light filtering
# https://stackoverflow.com/questions/20618804/how-to-smooth-a-curve-in-the-right-way/20642478
# https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.savgol_filter.html
from scipy.signal import savgol_filter
# from scipy.ndimage import gaussian_filter
line_dat_sm = savgol_filter(q3do.line_dat, 11, 3)
# gaussian_filter(q3do.line_dat,1)
# fline = interp1d(gdlambda, line_dat_sm, kind='linear')
for line in q3di.lines:
# Check that line wavelength is in data range
# Use first component as a proxy for all components
if listlinesz[line][0] >= min(gdlambda) and \
listlinesz[line][0] <= max(gdlambda):
# peakinit[line] = fline(listlinesz[line][0:ncomp[line]])
# look for peak within dz = +/-0.001
for icomp in range(0, ncomp[line]):
# https://stackoverflow.com/questions/12141150/from-list-of-integers-get-number-closest-to-a-given-value
lamwin = \
[min(gdlambda, key=lambda x:
abs(x - listlinesz[line][icomp]*0.999)),
min(gdlambda, key=lambda x:
abs(x - listlinesz[line][icomp]*1.001))]
ilamwin = [np.where(gdlambda == lamwin[0])[0][0],
np.where(gdlambda == lamwin[1])[0][0]]
# e.g. in the MIR, the wavelength range spanned by
# (listlinesz[line][icomp]*0.999,
# listlinesz[line][icomp]*1.001) can be smaller
# than one spectral element
if ilamwin[1]-ilamwin[0] < 10:
dlam = \
gdlambda[int(0.5*(ilamwin[0] +
ilamwin[1]))]\
- gdlambda[int(0.5*(ilamwin[0] +
ilamwin[1]))-1]
lamwin = \
[min(gdlambda, key=lambda x:
abs(x - (listlinesz[line][icomp] -
5.*dlam))),
min(gdlambda, key=lambda x:
abs(x - (listlinesz[line][icomp] +
5.*dlam)))]
ilamwin = \
[np.where(gdlambda == lamwin[0])[0][0],
np.where(gdlambda == lamwin[1])[0][0]]
peakinit[line][icomp] = \
np.nanmax(line_dat_sm
[ilamwin[0]:ilamwin[1]])
# If the smoothed version gives all nans, try
# unsmoothed
if np.isnan(peakinit[line][icomp]) and \
q3do.line_dat is not None:
peakinit[line][icomp] = \
np.nanmax(q3do.line_dat[ilamwin[0]:ilamwin[1]])
# if it's still all nans, just set to something low
if np.isnan(peakinit[line][icomp]):
peakinit[line][icomp] = peakinit_min
# re-cast if needed
peakinit[line] = peakinit[line].astype(usetype)
for line in q3di.lines:
# If initial guess is too low, set to something minimum to prevent
# fitter from choking (since we limit peak to be >= 0)
peakinit[line] = np.where(peakinit[line] < peakinit_min,
peakinit_min, peakinit[line])
# Fill out parameter structure with initial guesses and constraints
impModule = import_module('q3dfit.' + q3di.fcnlineinit)
run_fcnlineinit = getattr(impModule, q3di.fcnlineinit)
if q3di.argslineinit is not None:
argslineinit = q3di.argslineinit
else:
argslineinit = dict()
if q3di.fcnlineinit == 'lineinit':
argslineinit['waves'] = gdlambda
# Note that if we do a line fit, siginit_gas, siglim_gas, ncomp
# are set up by fitloop
emlmod, q3do.parinit = \
run_fcnlineinit(listlines, listlinesz, q3di.linetie,
peakinit, siginit_gas, siglim_gas, ncomp,
linevary=linevary, specConv=specConv,
**argslineinit)
# Actual fit
# Collect keywords to pass to the minimizer routine via lmfit
fit_kws = {}
# Maximum # evals cannot be specified as a keyword to the minimzer,
# as it's a parameter of the fit method. Default for 'least_squares'
# with 'trf' method (which is what lmfit assumes)
# is 100*npar, but lmfit changes this to 2000*(npar+1)
# https://github.com/lmfit/lmfit-py/blob/b930ddef320d93f984181db19fec8e9c9a41be8f/lmfit/minimizer.py#L1526
# We'll change default to 200*(npar+1).
# Note that lmfit method 'least_squares’ with default least_squares
# method 'lm' counts function calls in
# Jacobian estimation if numerical Jacobian is used (again the default)
max_nfev = 200*(len(q3do.parinit)+1)
iter_cb = None
method = 'least_squares'
# Add more using 'argslinefit' dict in init file
if q3di.argslinefit is not None:
for key, val in q3di.argslinefit.items():
# max_nfev goes in as parameter to fit method instead
if key == 'max_nfev':
max_nfev = val
# iter_cb goes in as parameter to fit method instead
elif key == 'iter_cb':
iter_cb = globals()[val]
elif key == 'method':
method = val
else:
fit_kws[key] = val
if method == 'least_squares':
# verbosity for scipy.optimize.least_squares
if quiet:
lmverbose = 0
else:
lmverbose = 2
fit_kws['verbose'] = lmverbose
if method == 'leastsq':
# to get mesg output
fit_kws['full_output'] = True
# increase number of max iterations; this is the default for this algorithm
# https://github.com/lmfit/lmfit-py/blob/7710da6d7e878ffee0dc90a85286f1ec619fc20f/lmfit/minimizer.py#L1624
if 'max_nfev' not in fit_kws:
max_nfev = 2000*(len(q3do.parinit)+1)
lmout = emlmod.fit(q3do.line_dat, q3do.parinit, x=gdlambda,
method=method, weights=np.sqrt(gdinvvar_nocnt),
nan_policy='omit', max_nfev=max_nfev,
fit_kws=fit_kws, iter_cb=iter_cb)
q3do.line_fit = emlmod.eval(lmout.params, x=gdlambda)
write_msg(lmout.fit_report(show_correl=False), file=q3di.logfile, quiet=quiet)
q3do.param = lmout.best_values
q3do.covar = lmout.covar
q3do.dof = lmout.nfree
q3do.redchisq = lmout.redchi
q3do.nfev = lmout.nfev
q3do.errorbars = lmout.errorbars
'''
error messages corresponding to LMFIT, plt
documentation was not very helpful with the error messages...
This can happen if, e.g., max_nfev is reached. Status message
is in this case not set, so we'll set it by hand.
The reason for algorithm termination (in least_squaares):
-1 : improper input parameters status returned from MINPACK.
0 : the maximum number of function evaluations is exceeded.
1 : gtol termination condition is satisfied.
2 : ftol termination condition is satisfied.
3 : xtol termination condition is satisfied.
4 : Both ftol and xtol termination conditions are satisfied.
'''
# q3do.fitstatus = 1 # default for good fit
if method == 'least_squares':
# https://lmfit.github.io/lmfit-py/model.html#lmfit.model.success
if not lmout.success:
write_msg('lmfit: '+lmout.message, file=q3di.logfile, quiet=quiet)
if hasattr(lmout, 'status'):
q3do.fitstatus = lmout.status
elif lmout.nfev >= max_nfev:
q3do.fitstatus = 0
if method == 'leastsq':
# Return values from scipy.optimize.leastsq for fit status
# https://github.com/scipy/scipy/blob/44e4ebaac992fde33f04638b99629d23973cb9b2/scipy/optimize/_minpack_py.py#L446
# success = 1-4, failure=4-8
# Possible lmfit error messages here:
# Presently, fit is aborting with messaage "Fit aborted" if max_nfev is reached
# setting ier=-1.
# I don't understand why this is happening, as I can't find the code that
# actually sets result.aborted to True in the lmfit code.
# See here for where ier is set in this case:
# https://github.com/lmfit/lmfit-py/blob/7710da6d7e878ffee0dc90a85286f1ec619fc20f/lmfit/minimizer.py#L1653
if not lmout.success:
write_msg('lmfit: '+lmout.message, file=q3di.logfile, quiet=quiet)
write_msg('lmfit: '+lmout.lmdif_message, file=q3di.logfile, quiet=quiet)
q3do.fitstatus = lmout.ier
# Errors from covariance matrix
q3do.perror = dict()
for p in lmout.params:
q3do.perror[p] = lmout.params[p].stderr
# If no error is returned, set to 0.
if not q3do.errorbars:
q3do.perror[p] = 0.
# Get flux peak errors from error spectrum and std dev of residual
q3do.perror_errspec = copy.deepcopy(q3do.perror)
q3do.perror_resid = copy.deepcopy(q3do.perror)
for line in listlines['name']:
for i in range(0, ncomp[line]):
lmline = lmlabel(line)
fluxlab = f'{lmline.lmlabel}_{i}_flx'
if q3do.param[fluxlab] > 0:
peakwave = q3do.param[f'{lmline.lmlabel}_{i}_cwv']
ipeakwave = (np.abs(gdlambda - peakwave)).argmin()
# from error spec
ipklo = ipeakwave - round(q3di.perror_errspecwin/2)
ipkhi = ipeakwave + round(q3di.perror_errspecwin/2)
if ipklo < 0:
ipklo = 0
if ipkhi > len(gdlambda)-1:
ipkhi = len(gdlambda)-1
q3do.perror_errspec[fluxlab] = \
np.median(gderr[ipklo:ipkhi+1])
# from residual
ipklo = ipeakwave - round(q3di.perror_residwin/2)
ipkhi = ipeakwave + round(q3di.perror_residwin/2)
if ipklo < 0:
ipklo = 0
if ipkhi > len(gdlambda)-1:
ipkhi = len(gdlambda)-1
q3do.perror_resid[fluxlab] = \
np.std((q3do.line_dat - q3do.line_fit)
[ipklo:ipkhi+1])
# Deal with flux pegging at boundary and no error
# from lmfit. Check for Nones, nans, and 0s
if q3do.perror[fluxlab] is None:
q3do.perror[fluxlab] = q3do.perror_errspec[fluxlab]
elif np.isnan(q3do.perror[fluxlab]) or \
q3do.perror[fluxlab] == 0.:
q3do.perror[fluxlab] = q3do.perror_errspec[fluxlab]
if q3di.perror_useresid and \
q3do.perror_resid[fluxlab] > q3do.perror[fluxlab]:
q3do.perror[fluxlab] = q3do.perror_resid[fluxlab]
q3do.cont_dat = gdflux.copy() - q3do.line_fit
fit_time2 = time.time()
write_msg('{:s}{:0.1f}{:s}'.format('FITSPEC: Line fit took ',
fit_time2-fit_time1, ' s.'), file=q3di.logfile, quiet=quiet)
else:
q3do.dolinefit = False
q3do.cont_dat = gdflux.copy()
else:
#q3do.fitstatus = 1
q3do.cont_dat = gdflux.copy()
# ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
# Finish
# ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
return(q3do)
# For diagnosing problems, print value of each parameter every iteration
# To use, set 'iter_cb': 'per_iteration' as a keyword/value pair in 'argslinefit'
# https://lmfit.github.io/lmfit-py/examples/documentation/model_with_iter_callback.html
[docs]
def per_iteration(pars, iteration, resid, *args, **kws):
print(" ITER ", iteration, [f"{p.name} = {p.value:.5f}"
for p in pars.values()])
[docs]
def airtovac(wv: np.ndarray,
waveunit: Literal['micron','Angstrom']='Angstrom',
logfile: Optional[str]=None,
quiet: bool=True) -> np.ndarray:
"""
Takes an array of wavelengths in air and converts them to vacuum
using eq. 3 from Morton et al. 1991.
Parameters
----------
wv
Input wavelengths.
waveunit
Optional. Wavelength unit. Default is Angstrom.
Returns
-------
ndarray
An array of the same dimensions as the input and in the same units
as the input
References
----------
Morton 1991, ApJSS, 77, 119
Examples
--------
>>> wv=np.arange(3000,7000,1)
>>> vac_wv=airtovac(wv)
array([3000.87467224, 3001.87492143, 3002.87517064, ..., 6998.92971915, 6999.92998844, 7000.93025774])
"""
x=wv
# get x to be in Angstroms for calculations if it isn't already
if ((waveunit!='Angstrom') & (waveunit!='micron')):
write_msg(f'Wave unit {waveunit} not recognized, assuming Angstroms.', logfile, quiet)
if (waveunit=='micron'):
x=wv*1.e4
tmp=1.e4/x
# vacuum wavelengths are indeed slightly longer
y=x*(1.+6.4328e-5+2.94981e-2/(146.-tmp**2)+2.5540e-4/(41.-tmp**2))
# get y to be in the same units as the input:
if (waveunit=='micron'):
y=y*1.e-4
return(y)
[docs]
def masklin(llambda: np.ndarray,
linelambda: dict[str, np.ndarray],
halfwidth: Table,
nomaskran: Optional[ArrayLike]=None) -> np.ndarray:
"""
Masks emission lines from the spectrum for continuum fitting
Parameters
----------
llambda
Wavelengths of the spectrum.
linelambda
Dictionary with line names as keys and arrays of central wavelengths as values,
one value per component.
halfwidth
Dictionary with line names as keys and arrays of half widths as values, one
value per component; or a Table with the same structure.
nomaskran
Optional. Set of lower and upper wavelength limits of regions not to mask. If
None, no regions are excluded.
Returns
-------
numpy.ndarray
Array of llambda-array indices indicating non-masked wavelengths.
Examples
--------
1. from masklin import masklin
Generate linelist:
from linelist import linelist
mylist=['Halpha','Hbeta','Hgamma']
u=linelist(inlines=mylist)
Generate wavelength array:
import numpy as np
llambda=np.arange(3000.,8000.,1.)
Generate half widths for masking:
from astropy.table import Table
# https://docs.astropy.org/en/stable/table/construct_table.html
halfwidth=Table([u['name']])
# populate another column
# https://docs.astropy.org/en/stable/table/modify_table.html
halfwidth['halfwidths']=6000.0
# I have chosen a huge masking width for better visual eff
#now use the masking function
ind=masklin(llambda, u, halfwidth, 60.)
# plot the spectrum without and with the masking
from matplotlib import pyplot as plt
plt.interactive(True)
fig=plt.figure(1, figsize=(8,8))
plt.clf()
spec=1.+(llambda-3000.0)*1e-3
plt.plot(llambda, spec, color='grey',alpha=0.3)
plt.scatter(llambda[ind],spec[ind]+0.05,color='red',alpha=0.1,s=2)
plt.show()
# now let's define a no-masking array
dontmask=np.array([[3000,4000,5000],[4000,5000,6000]])
ind1=masklin(llambda, u, halfwidth, 60., nomaskran=dontmask)
plt.scatter(llambda[ind1],spec[ind1]-0.05,color='blue',alpha=0.03,s=2)
plt.show()
"""
# we will return the ones that are not masked
# start by retaining all elements -- mark them all True
retain = np.array(np.ones(len(llambda)), dtype=bool)
# line is the index in the linelambda array and
# cwv is the central wavelength
# let's flag the indices that are masked
for line, cwv in linelambda.items():
for i in range(halfwidth.columns[line].size):
temp1 = \
np.array((llambda <= cwv[i]*(1. - halfwidth.columns[line][i] /
c.to('km/s').value)), dtype=bool)
temp2 = \
np.array((llambda >= cwv[i]*(1. + halfwidth.columns[line][i] /
c.to('km/s').value)), dtype=bool)
retain = (retain & (temp1 | temp2))
# if the user has defined the regions not to be masked:
if nomaskran is not None:
# set all to False
nomask = np.array(np.zeros(len(llambda)), dtype=bool)
for j, llim in enumerate(nomaskran[0]):
# set to True between lower and upper limit for each
temp1 = np.array((llambda >= llim), dtype=bool)
temp2 = np.array((llambda <= nomaskran[1][j]), dtype=bool)
nomask = (nomask | (temp1 & temp2))
# combine retain and nomask
retain = (retain | nomask)
# return the indices to be retained
# https://stackoverflow.com/questions/21448225/getting-indices-of-true-values-in-a-boolean-list
indgd = np.where(retain)[0]
return indgd