'''
Continuum fitting routines and subroutines.
'''
from __future__ import annotations
__all__ = ['fitpoly', 'fitqsohost', 'questfit', 'linfit_plus_FeII',
'Fe_template_vw01', 'readcf']
from typing import Optional, Any, Literal
from numpy.typing import ArrayLike
import copy
import importlib.resources as pkg_resources
import lmfit
import numpy as np
import ppxf.ppxf_util as util
import os
from astropy import units as u
from astropy.constants import c # type: ignore
from astropy.modeling import models, fitting
# from lmfit.models import ExpressionModel
from lmfit.models import LinearModel
from ppxf.ppxf import ppxf
from scipy.interpolate import interp1d
from . import q3dmath, q3dutil, qsohostfcn, questfitfcn, spectConvol
from q3dfit.data import questfit_templates
from q3dfit.data.questfit_templates import silicatemodels
from q3dfit.exceptions import InitializationError
[docs]
def fitpoly(wave: np.ndarray,
flux: np.ndarray,
weight: np.ndarray,
index: np.ndarray,
logfile: Optional[str]=None,
quiet: bool=True,
fitord: int=3,
refit: Optional[dict[str, ArrayLike]] = None) \
-> tuple[np.ndarray, dict[str, ArrayLike], None, None]:
'''
Fit the continuum as a polynomial.
Parameters
----------
wave
Wavelengths
flux
Fluxes
weight
Weights
index
Points used in the fit.
logfile
Optional. Name of the log file. Default is None.
quiet
Optional. Suppress output to stdout? Default is True.
fitord
Optional. Order of the polynomial. Default is 3.
refit
Optional. Refit with a different polynomial order? If so, set to dictionary
with keys 'ord' and 'ran' for the order and range of the
refit, respectively. Can specify multiple ranges and orders as
array_like. Default is None.
Returns
-------
np.ndarray
best fit continuum model
dict[str, ArrayLike]
Dictionary with single key-value pair, 'poly' and the polynomial coefficients.
None
Unused
None
Unused
'''
iwave = np.array(wave[index])
iflux = np.array(flux[index])
#the fitter I used puts weights in 1/sigma so I took the square root to make the data correct
w = np.array(weight[index])
iweight = np.array(np.sqrt(w))
iwave = iwave.reshape(iwave.size)
iflux = iflux.reshape(iwave.size)
iweight = iweight.reshape(iwave.size)
if len(iwave) <= fitord:
fitord = len(iwave)-1
if fitord==0:
deg1=len(iwave)-1
deg2=fitord
else:
deg1=fitord
deg2=fitord
#making astropy fitter
fitter = fitting.LevMarLSQFitter()
#making polynomial model
polymod1= models.Polynomial1D(deg1)
polymod2= models.Polynomial1D(deg2)
#creating fluxfit
fluxfit = fitter(polymod1, iwave, iflux, weights=iweight)
# polynomial coefficients
fluxfitparam = fluxfit.parameters
#flip for numpy.poly1d
ct_coeff = {'poly': np.flip(fluxfitparam)}
# this is a class:
# https://numpy.org/doc/stable/reference/generated/numpy.poly1d.html
ct_poly = np.poly1d(ct_coeff['poly'], variable='lambda')
continuum = ct_poly(wave)
icontinuum = continuum[index]
if refit is not None:
for i in range (0, np.size(refit['ord']) - 1):
tmp_ind=np.where(wave >= refit['ran'][0,i] and
wave <= refit['ran'][1,i])
tmp_iind=np.where(iwave >= refit['ran'][0,i] and
iwave <= refit['ran'][1,i])
# parinfo=np.full(refit['ord'][i]+1, {'value':0.0})
#degree of polynomial fit defaults to len(x-variable)-1
if deg2==0:
deg2=len(iwave[tmp_iind])-1
#creating tmp_pars
tmp_pars=fitter(polymod2, iwave[tmp_iind],
(iflux[tmp_iind]-icontinuum[tmp_iind]),
z=None, weights=iweight[tmp_iind])
tmp_parsptmp=tmp_pars.parameters
tmp_parsparam=np.flip(tmp_parsptmp)
# refitted continuum object
ct_poly = np.poly1d(tmp_parsparam, variable='lambda')
# refitted continuum over wavelength range for refitting,
# indexed to full wavelength array
cont_refit = ct_poly(wave[tmp_ind])
# now add in the refitted continuum to the full continuum
continuum += cont_refit
return continuum, ct_coeff, None, None
[docs]
def fitqsohost(wave: np.ndarray,
flux: np.ndarray,
weight: np.ndarray,
index: np.ndarray,
logfile: Optional[str]=None,
quiet: bool=True,
refit: Optional[str]=None,
template_wave: Optional[np.ndarray]=None,
template_flux: Optional[np.ndarray]=None,
zstar: Optional[float]=None,
flux_log: Optional[np.ndarray]=None,
err_log: Optional[np.ndarray]=None,
index_log: Optional[np.ndarray]=None,
siginit_stars: float=50.,
add_poly_degree: int=30,
mult_poly_degree: int=0,
av_star: Optional[float]=None,
fitran: Optional[ArrayLike]=None,
qsoord: Optional[int]=None,
hostord: Optional[int]=None,
blrpar: Optional[ArrayLike]=None,
qsoonly: bool=False,
hostonly: bool=False,
blronly: bool=False,
fluxunit: Optional[str]=None,
waveunit: Optional[Literal['micron','Angstrom']]=None,
*, qsoxdr: str,
**kwargs) -> tuple[np.ndarray, dict[str, Any],
Optional[float], Optional[float]]:
'''
Fit the continuum using a quasar template and a stellar template. Calls
:py:func:`~q3dfit.qsohostfcn.qsohostfcn` to jointly fit the quasar and host
galaxy components. Calls :py:func:`~ppxf.ppxf` or :py:func:`~q3dfit.contfit.questfit`
to refit the residual after subtracting the quasar component.
Additional keywords can be passed to :py:meth:`~lmfit.model.Model.fit` by passing
them as keyword arguments to this function. Arguments to the :py:mod:`~scipy.optimize`
functions can be passed by setting the keyword argument `argslmfit` to a dictionary of
parameter names and values as keys and values. Arguments to
:py:func:`~q3dfit.contfit.questfit` for the `refit` can be passed by setting the
keyword argument `args_questfit` to a dictionary of parameter names and values as
keys and values.
Parameters
-----
wave
Wavelengths
flux
Fluxes
weight
Weights
index
Points used in the fit.
logfile
Optional. Name of the log file. Default is None.
quiet
Optional. Suppress output to stdout? Default is True.
fitran
Optional. Wavelength range to fit. Default is None, to fit all wavelengths.
qsoord
Optional. Order of the polynomial added to the quasar multiplier.
Default is None.
hostord
Optional. Order of the polynomial model for the host galaxy.
Default is None.
blrpar
Optional. Parameters of the broad line region scattering model. If set,
fit this part of the model. Set to array_like Default is None.
qsoonly
Optional. Fit only the quasar template. Default is False.
hostonly
Optional. Fit only the host galaxy. Default is False.
blronly
Optional. Fit only the broad line region scattering model. Default is False.
refit
Optional. Refit the continuum residual after subtracting the quasar.
Options are 'ppxf' or 'questfit'. Default is None, which means no refit.
template_wave
Optional, but must be set if refit='ppxf'. Wavelength array of the stellar
template used as model for stellar continuum.
template_flux
Optional, but must be set if refit='ppxf'. Flux array of the stellar
template used as model for stellar continuum.
zstar
Optional, but must be set if refit='ppxf'. Input guess for redshift of the
stellar continuum. Default is None.
flux_log
Optional, but must be set if refit='ppxf'. Flux values for the log-rebinned
pixels.
err_log
Optional, but must be set if refit='ppxf'. Error values for the log-rebinned
pixels.
index_log
Optional, but must be set if refit='ppxf'. Pixels used in the fit for the
pPXF fit, as log-rebinned.
siginit_stars
Optional. Initial guess for the stellar velocity dispersion in km/s, used
in the pPXF fit. Default is 50. km/s.
add_poly_degree
Optional. Degree of the additive polynomial for the pPXF fit. Default is
30.
mult_poly_degree
Optional. Degree of the multiplicative polynomial for the pPXF fit. Default is
0.
av_star
Optional. Initial guess for stellar reddening, used in the pPXF fit.
Default is None, which means no reddening is applied.
qsoxdr
Path and filename for the quasar template.
fluxunit
Optional. Units of the flux, as defined by :py:class:`~q3dfit.readcube.Cube`.
Default is None. Passed to :py:func:`~q3dfit.contfit.questfit` if refit='questfit'.
waveunit
Optional. Units of the wavelength, as defined by :py:class:`~q3dfit.readcube.Cube`.
Default is None.
Returns
-------
numpy.ndarray
Best fit continuum model.
dict[str, Any]
Best fit parameters. It contains a the key 'qso_host', which contains the
parameters as a :py:class:`~lmfit.Parameters` object. If `refit` is also set
to True in :py:func:`~q3dfit.q3din.q3din`, then it also contains the key
'ppxf', which contains the best fit parameters from the pPXF fit as
another dictionary with keys 'stelmod', 'polymod', 'stelweights',
'polyweights', 'sigma', 'sigma_err', and 'av'. If `refit` is set to
'questfit', it will contain the best fit parameters with keys set
by the :py:func:`~q3dfit.contfit.questfit` function.
Optional[float]
Best fit stellar redshift if refit='ppxf'. Otherwise, the input redshift
or None if not set.
Optional[float]
Best fit stellar redshift error if refit='ppxf'. Otherwise, None.
'''
# Load quasar template
try:
qsotemplate = np.load(qsoxdr, allow_pickle=True).item()
qsowave = qsotemplate['wave']
qsoflux_full = qsotemplate['flux']
except:
raise InitializationError('Cannot locate quasar template (qsoxdr) specified ')
# Interpolate quasar template to the data grid. By not doing this
# we are assuming that the quasar template is already on the same
# grid as the data.
# qsoflux = interptemp(wave, qsowave, qsoflux_full)
# Make sure we are fitting only the range of the quasar template
# that matches the range of the complete fit, as specfified by fitspec
if fitran is not None:
iqsoflux = np.where((qsowave >= fitran[0]) & (qsowave <= fitran[1]))
qsoflux = qsoflux_full[iqsoflux]
else:
qsoflux = qsoflux_full
index = np.array(index, dtype=np.int64)
# err = 1/weight**0.5
iwave = wave[index]
iflux = flux[index]
iweight = weight[index]
# ierr = err[index]
# Instantiate the quasar + featureless host continuum model
_, ymod, params = \
qsohostfcn.qsohostfcn(wave, params_fit=None, qsoonly=qsoonly,
qsoord=qsoord, hostonly=hostonly, hostord=hostord,
blronly=blronly, blrpar=blrpar, qsoflux=qsoflux,
medflux=np.median(iflux))
# Set the fitting method
fit_kws = {}
# Maximum # evals cannot be specified as a keyword to the minimzer,
# as it's a parameter of the fit method. Default for 'least_squares'
# with 'trf' method (which is what lmfit assumes)
# is 100*npar, but lmfit changes this to 2000*(npar+1)
# https://github.com/lmfit/lmfit-py/blob/b930ddef320d93f984181db19fec8e9c9a41be8f/lmfit/minimizer.py#L1526
# We'll change default to 200*(npar+1).
# Note that lmfit method 'least_squares’ with default least_squares
# method 'lm' counts function calls in
# Jacobian estimation if numerical Jacobian is used (again the default)
max_nfev = 200*(len(params)+1)
iter_cb = None
method = 'least_squares'
# Add additional parameter settings for scipy.optimize.least_squares
if 'argslmfit' in kwargs:
for key, val in kwargs['argslmfit'].items():
# have to treat 'method' separately, as it is a parameter
# of the fit function, not a parameter of the fit
if key == 'method':
method = val
# max_nfev goes in as parameter to fit method instead
elif key == 'max_nfev':
max_nfev = val
# iter_cb goes in as parameter to fit method instead
elif key == 'iter_cb':
iter_cb = globals()[val]
else:
fit_kws[key] = val
# Could also set the method here, but better
# to set it in argslmfit
if 'method' in kwargs:
method = kwargs['method']
if method == 'least_squares':
if quiet:
lmverbose = 0 # verbosity for scipy.optimize.least_squares
else:
lmverbose = 2
fit_kws = {'verbose': lmverbose}
if method == 'leastsq':
# to get mesg output
fit_kws['full_output'] = True
# increase number of max iterations; this is the default for this algorithm
# https://github.com/lmfit/lmfit-py/blob/7710da6d7e878ffee0dc90a85286f1ec619fc20f/lmfit/minimizer.py#L1624
if 'max_nfev' not in fit_kws:
max_nfev = 2000*(len(params)+1)
result = ymod.fit(iflux, params, weights=np.sqrt(iweight),
qsotemplate=qsoflux[index],
wave=iwave, x=iwave, method=method,
nan_policy='raise', max_nfev=max_nfev,
fit_kws=fit_kws)
q3dutil.write_msg(result.fit_report(), file=logfile, quiet=quiet)
# comps = result.eval_components(wave=wave, qso_model=qsoflux, x=wave)
continuum = result.eval(wave=wave, qsotemplate=qsoflux, x=wave)
ct_coeff = {'qso_host': result.params}
zstar_err = None # default value if not refitting
# Refit the residual after subtracting the quasar component
if refit == 'ppxf' and \
flux_log is not None and \
err_log is not None and \
index_log is not None and \
template_wave is not None and \
template_flux is not None:
# log rebin residual
# lamRange1 = np.array([wave.min(), wave.max()])/(1+zstar)
cont_log, lambda_log, velscale = util.log_rebin(fitran, continuum)
resid_log = flux_log - cont_log
# nan_indx = np.where(np.isnan(resid_log))[0]
# if len(nan_indx) > 0:
# resid_log[nan_indx] = 0
# Interpolate template to same grid as data
temp_log = q3dmath.interptemp(lambda_log, np.log(template_wave.T[0]),
template_flux)
# Rest wavelength if reddening is applied
# must be in Angstroms
redlam = np.exp(lambda_log)/(1. + zstar)
if waveunit is not None:
if waveunit == 'micron':
redlam *= 1.e4
# vel = c*np.log(1 + zstar) # eq.(8) of Cappellari (2017)
# t = clock()
start = [0, siginit_stars] # (km/s), starting guess for [V, sigma]
pp = ppxf(temp_log, resid_log, err_log, velscale, start,
goodpixels=index_log,
reddening=av_star,
lam=redlam,
degree=add_poly_degree,
quiet=quiet)
# Errors in best-fit velocity and dispersion.
# From PPXF docs:
# These errors are meaningless unless Chi^2/DOF~1.
# However if one *assumes* that the fit is good ...
solerr = copy.copy(pp.error)
ct_rchisq = pp.chi2
solerr *= np.sqrt(pp.chi2)
# Resample the best fit into linear space
cinterp = interp1d(lambda_log, pp.bestfit,
kind='cubic', fill_value="extrapolate")
resid_mod = cinterp(np.log(wave))
if add_poly_degree > 0:
pinterp = interp1d(lambda_log, pp.apoly,
kind='cubic', fill_value="extrapolate")
cont_fit_poly = pinterp(np.log(wave))
else:
cont_fit_poly = np.zeros_like(wave)
if mult_poly_degree > 0:
mpinterp = interp1d(lambda_log, pp.mpoly,
kind='cubic', fill_value="extrapolate")
cont_fit_mpoly = mpinterp(np.log(wave))
else:
cont_fit_mpoly = np.zeros_like(np.log(wave))
ct_coeff_ppxf = dict()
ct_coeff_ppxf['polymod'] = cont_fit_poly
ct_coeff_ppxf['mpolymod'] = cont_fit_mpoly
ct_coeff_ppxf['stelmod'] = resid_mod - cont_fit_poly
ct_coeff_ppxf['polyweights'] = pp.polyweights
ct_coeff_ppxf['stelweights'] = pp.weights
ct_coeff_ppxf['av'] = pp.reddening
ct_coeff_ppxf['sigma'] = pp.sol[1]
ct_coeff_ppxf['sigma_err'] = solerr[1]
ct_coeff['ppxf'] = ct_coeff_ppxf
# From ppxf docs:
# IMPORTANT: The precise relation between the output pPXF velocity
# and redshift is Vel = c*np.log(1 + z).
# See Section 2.3 of Cappellari (2017) for a detailed explanation.
zstar += np.exp(pp.sol[0]/c.to('km/s').value)-1.
zstar_err = np.exp(solerr[0]/c.to('km/s').value)-1.
# host can't be negative
#ineg = np.where(continuum < 0)
#continuum[continuum < 0] = 0
continuum += resid_mod
elif refit == 'questfit':
resid = flux - continuum
argscontfit_use = kwargs['args_questfit']
cont_resid, ct_coeff, zstar, zstar_err = \
questfit(wave, resid, weight, index,
logfile=logfile, quiet=quiet, z=zstar,
fluxunit=fluxunit, **argscontfit_use)
from . import plot
from matplotlib import pyplot as plt
initdatdict = argscontfit_use.copy()
initdatdict['label'] = 'miritest'
initdatdict['plotMIR'] = True
plot.plotquest(wave, resid, cont_resid, ct_coeff, initdatdict)
plt.show()
continuum += cont_resid
ct_coeff['qso_host'] = result.params
return continuum, ct_coeff, zstar, zstar_err
[docs]
def questfit(wave: np.ndarray,
flux: np.ndarray,
weight: np.ndarray,
index: np.ndarray,
logfile: Optional[str]=None,
quiet: bool=True,
z: Optional[float]=None,
fluxunit: Optional[str]=None,
tempdir: Optional[str]=None,
*, config_file: str,
**kwargs) -> tuple[np.ndarray, dict[str, Any],
Optional[float], Optional[float]]:
'''
Fit the MIR continuum. Calls :py:func:`~q3dfit.contfit.questfitfcn` to fit the
continuum using the templates specified in the configuration file.
Parameters
-----
wave
Wavelengths
flux
Fluxes
weight
Weights
index
Points used in the fit.
quiet
Optional. Suppress output? Default is True.
z
Optional. Redshift of the source, for redshifting the templates.
Default is None.
fluxunit
Optional. Units of the flux, as defined by :py:class:`~q3dfit.readcube.Cube`.
Default is None. The templates are assumed to be in f_nu. If this
unit contains 'micron' or 'Angstrom', the flux is assumed to be in f_lambda.
The templates are then converted to f_lambda. If None, the templates are not
converted.
tempdir
Optional. Directory containing the templates, to search if default
directory does not contain template. Default is None.
config_file
Configuration file for the fit.
Returns
-------
numpy.ndarray
Best fit continuum model.
dict[str, Any]
Best fit parameters. Key 'MIRparams' is a :py:class:`~lmfit.Parameters` object
containing the best fit parameters. Key 'comp_best_fit' is a dictionary
containing with key/value pairs of the component names and their values
evaluated at the best fit parameters.
Optional[float]
Input redshift, if set. Otherwise, None.
Optional[float]
None, as the redshift error is not calculated in this function.
'''
# models dictionary holds extinction, absorption models
emcomps = dict()
# template dictionary holds templates, blackbodies, powerlaws
allcomps = dict()
# Read input configuration file
config_file = readcf(config_file)
# set up global extinction. Loop through all lines, looking for global tag
global_extinction = False
for key in config_file:
if len(config_file[key]) > 3:
if 'global' in config_file[key][3]:
global_extinction = True
# Set name of global extinction / ice models
if global_extinction:
for key in config_file:
if 'extinction' in config_file[key]:
global_ext_model = key
if 'absorption' in config_file[key]:
global_ice_model = key
# counter for PAH/silicate/quasar templates
n_temp = 0
# populating the components dictionaries and setting up lmfit models
for i in config_file.keys():
# loc_models = q3dfit.__path__[0]+'/data/questfit_templates/'
if 'blackbody' in i:
#starting with the blackbodies
model_parameters = config_file[i]
name_model = 'blackbody'+str(int(float(model_parameters[7])))
extinction_model = config_file[i][3]
ice_model = config_file[i][9]
# instantiate model class and set parameters
model_temp_BB, param_temp_BB = \
questfitfcn.\
set_up_fit_blackbody_model([np.float64(model_parameters[1]),
np.float64(model_parameters[7])],
[np.float64(model_parameters[2]),
np.float64(model_parameters[8])],
name_model[:])
# instantiate individual extinction model
if global_extinction is False and \
config_file[i][3] != '_' and \
config_file[i][3] != '-':
model_temp_extinction, param_temp_extinction = \
questfitfcn.\
set_up_fit_extinction([np.float64(model_parameters[4])],
[np.float64(model_parameters[5])],
name_model+'_ext',
extinction_model,
model_parameters[6])
# multiply the BB model by extinction and add parameters
# temporary holders just for this component
model_temp = model_temp_BB*model_temp_extinction
param_temp = param_temp_BB + param_temp_extinction
# add extinction model to dictionary
allcomps[extinction_model] = \
config_file[extinction_model][0]
else:
# temporary holders just for this component, without extinction
model_temp = model_temp_BB
param_temp = param_temp_BB
# checking if we need to add ice absorption
if 'ice' in i and global_extinction is False:
model_temp_ice, param_temp_ice = \
questfitfcn.\
set_up_absorption([np.float64(model_parameters[10])],
[np.float64(model_parameters[11])],
name_model+'_abs',
model_parameters[9])
model_temp = model_temp*model_temp_ice
param_temp += param_temp_ice
allcomps[model_parameters[9]] = \
config_file[model_parameters[9]][0]
# Check if this is the first component; if not, set up full
# model/pars
if 'model' not in vars():
model, param = model_temp, param_temp
# otherwise, add them in
else:
model += model_temp
param += param_temp
# powerlaw model
if 'powerlaw' in i:
model_parameters = config_file[i]
name_model = 'powerlaw'+str(int(float(model_parameters[7])))
extinction_model = config_file[i][3]
ice_model = config_file[i][9]
#powerlaw model
# exponent to that specified in config. file.
model_temp_powerlaw, param_temp_powerlaw = \
questfitfcn.\
set_up_fit_powerlaw_model(
[np.float64(1.), np.float64(model_parameters[7])],
[np.float64(model_parameters[2]),
np.float64(model_parameters[8])],
name_model[:])
if global_extinction is False and \
config_file[i][3] != '_' and \
config_file[i][3] != '-':
model_temp_extinction, param_temp_extinction = \
questfitfcn.\
set_up_fit_extinction([np.float64(model_parameters[4])],
[np.float64(model_parameters[5])],
'powerlaw' +
str(int(float(model_parameters[7])))
+ '_ext', extinction_model,
model_parameters[6])
model_temp = model_temp_powerlaw*model_temp_extinction
param_temp = param_temp_powerlaw + param_temp_extinction
allcomps[extinction_model] = \
config_file[extinction_model][0]
else:
model_temp = model_temp_powerlaw
param_temp = param_temp_powerlaw
if 'ice' in i and global_extinction is False:
model_temp_ice, param_temp_ice = \
questfitfcn.\
set_up_absorption([np.float64(model_parameters[10])],
[np.float64(model_parameters[11])],
name_model+'_abs', model_parameters[9])
model_temp = model_temp*model_temp_ice
param_temp += param_temp_ice
allcomps[model_parameters[9]] = \
config_file[model_parameters[9]][0]
if 'model' not in vars():
model, param = model_temp, param_temp
else:
model += model_temp
param += param_temp
if 'template' in i: #template model
model_parameters = config_file[i]
name_model = 'template_'+str(n_temp)#i
extinction_model = config_file[i][3]
ice_model = config_file[i][9]
# if it's not a polynomial model for PSF fitting
#if not 'poly' in i:
model_temp_template, param_temp_template = \
questfitfcn.\
set_up_fit_model_scale([np.float64(model_parameters[1])],
[np.float64(model_parameters[2])],
name_model, name_model) #,
#maxamp=1.05*max(flux[index]) )
# # ... and if it is!
# Not presently implmented
# else:
# # constrain amplitude
# minamp = np.float64(model_parameters[1]) / 1.25
# maxamp = np.float64(model_parameters[1]) * 1.25
# model_temp_template, param_temp_template = \
# questfitfcn.set_up_fit_model_scale_withpoly(
# [np.float64(model_parameters[1])],
# [np.float64(model_parameters[2])],
# name_model, name_model, minamp=minamp, maxamp=maxamp)
if global_extinction is False and \
config_file[i][3] != '_' and \
config_file[i][3] != '-':
model_temp_extinction, param_temp_extinction = \
questfitfcn.\
set_up_fit_extinction([np.float64(model_parameters[4])],
[np.float64(model_parameters[5])],
name_model + '_ext',
extinction_model,
model_parameters[6])
model_temp = model_temp_template*model_temp_extinction
param_temp = param_temp_template + param_temp_extinction
allcomps[extinction_model] = \
config_file[extinction_model][0]
else:
model_temp = model_temp_template
param_temp = param_temp_template
if 'ice' in i and global_extinction is False:
model_temp_ice, param_temp_ice = \
questfitfcn.set_up_absorption([float(model_parameters[10])],
[float(model_parameters[11])],
name_model+'_abs',
model_parameters[9])
model_temp = model_temp*model_temp_ice
param_temp += param_temp_ice
allcomps[model_parameters[9]] = \
config_file[model_parameters[9]]
if 'model' not in vars():
model,param = model_temp,param_temp
else:
model += model_temp
param += param_temp
# add numerical template to emcomps
emcomps[name_model] = config_file[i][0]
# increment counter for PAH/silicate/quasar templates
n_temp+=1
#if qsoflux is not None:
#model_qso, param_qso = questfitfcn.set_up_fit_model_scale
# Check to see if we are using global extinction, where the total
# model flux is extincted by the same ice and dust model.
if global_extinction:
model_global_ext, param_global_ext = \
questfitfcn.set_up_fit_extinction([np.float64(0.)],
[np.float64(1.)], 'global_ext',
global_ext_model, 'S')
model = model*model_global_ext
param += param_global_ext
allcomps[global_ext_model] = config_file[global_ext_model][0]
model_global_ice, param_global_ice = \
questfitfcn.set_up_absorption([np.float64(0.)], [np.float64(1.)],
'global_ice', global_ice_model)
model = model*model_global_ice
param += param_global_ice
allcomps[global_ice_model] = \
config_file[global_ice_model][0]
# loop over abs. dictionary, load in the numerical models and resample.
for i in allcomps.keys():
#if qsoflux is not None:
with pkg_resources.path(questfit_templates, allcomps[i]) as p:
if os.path.exists(p):
temp_model = np.load(p, allow_pickle=True)
else:
if tempdir is not None:
temp_model = np.load(tempdir+allcomps[i])
else:
raise InitializationError(f'Cannot locate template in default '+
'directory and no tempdir specified')
# Check to see if we are using global extinction, where the total
temp_wave=temp_model['WAVE']
if z is not None:
temp_wave = np.float64(temp_wave*(1.+z))
temp_value=temp_model['FLUX']
temp_value_rebin = \
q3dmath.interp_lis(wave, temp_wave, temp_value)
allcomps[i] = temp_value_rebin
# conversion from f_nu to f_lambda is f_lambda = f_nu x c/lambda^2
# [1 Jy = 1e-10 erg/s/cm^2/um [/sr] ???]
# about 1.2 for wave=5 [micron]
#c_scale = constants.c * u.Unit('m').to('micron') / wave**2 * \
# 1e-23 * 1e10
# the exact value doesn't matter, because we normalize the templates
c_scale = 1. / wave**2
# loop over emission template dictionary, load them in and resample.
for i in emcomps.keys():
if 'sifrom' in emcomps[i]:
tempdir = silicatemodels #questfit_templates.silicatemodels
else:
tempdir = questfit_templates
with pkg_resources.path(tempdir, emcomps[i]) as p:
if os.path.exists(p):
temp_model = np.load(p, allow_pickle=True)
else:
if tempdir is not None:
temp_model = np.load(tempdir+emcomps[i])
else:
raise InitializationError(f'Cannot locate template {emcomps[i]} '+
'in default directory and no tempdir specified')
try:
temp_wave = np.float64(temp_model['WAVE'])
if z is not None:
temp_wave = np.float64(temp_wave*(1.+z))
temp_value = np.float64(temp_model['FLUX'])
except:
# if a QSO template generated by makeqsotemplate() is included,
# that is formatted slightly differently
temp_model = temp_model[()]
temp_wave = temp_model['wave'] #*(1.+z)
temp_value = temp_model['flux']
temp_value_rebin = \
q3dmath.interp_lis(wave, temp_wave, temp_value)
allcomps[i] = temp_value_rebin
# conversion from f_nu to f_lambda: f_lambda = f_nu x c/lambda^2
# assume that presence of micron or Angstrom in fluxunit means that the flux is
# f_lambda
if fluxunit is not None:
if 'micron' in fluxunit or 'Angstrom' in fluxunit:
allcomps[i] *= c_scale
allcomps[i] = allcomps[i]/allcomps[i].max()
# Add in wavelength and units flags
allcomps['wave'] = np.float64(wave)
allcomps['fluxunit'] = fluxunit
# produce copy of components dictionary with index applied
flux_cut = flux[index]
allcomps_cut = copy.deepcopy(allcomps)
for el in allcomps.keys():
if not ('fluxunit' in el):
allcomps_cut[el] = allcomps_cut[el][index]
# Get ready for fit
fit_kws = {}
# Maximum # evals cannot be specified as a keyword to the minimzer,
# as it's a parameter of the fit method. Default for 'least_squares'
# with 'trf' method (which is what lmfit assumes)
# is 100*npar, but lmfit changes this to 2000*(npar+1)
# https://github.com/lmfit/lmfit-py/blob/b930ddef320d93f984181db19fec8e9c9a41be8f/lmfit/minimizer.py#L1526
# We'll change default to 200*(npar+1).
# Note that lmfit method 'least_squares’ with default least_squares
# method 'lm' counts function calls in
# Jacobian estimation if numerical Jacobian is used (again the default)
max_nfev = 200*(len(param)+1)
iter_cb = None
method = 'least_squares'
# Add additional parameter settings for scipy.optimize.least_squares
if 'argslmfit' in kwargs:
for key, val in kwargs['argslmfit'].items():
# max_nfev goes in as parameter to fit method instead
if key == 'max_nfev':
max_nfev = val
# iter_cb goes in as parameter to fit method instead
elif key == 'iter_cb':
iter_cb = globals()[val]
elif key == 'method':
method = val
else:
fit_kws[key] = val
# Could also set the method here, but better
# to set it in argslmfit
if 'method' in kwargs:
method = kwargs['method']
if method == 'least_squares':
if quiet:
lmverbose = 0 # verbosity for scipy.optimize.least_squares
else:
lmverbose = 2
fit_kws = {'verbose': lmverbose}
if method == 'leastsq':
# to get mesg output
fit_kws['full_output'] = True
# increase number of max iterations; this is the default for this algorithm
# https://github.com/lmfit/lmfit-py/blob/7710da6d7e878ffee0dc90a85286f1ec619fc20f/lmfit/minimizer.py#L1624
if 'max_nfev' not in fit_kws:
max_nfev = 2000*(len(param)+1)
# FIT!
result = model.fit(flux_cut, param, **allcomps_cut,
max_nfev=max_nfev, method=method,
nan_policy='raise', iter_cb=iter_cb,
fit_kws=fit_kws)
q3dutil.write_msg(result.fit_report(), file=logfile, quiet=quiet)
# use allcomps rather than allcomps_cut to evaluate
# over all wavelengths within fit range (not just [index])
best_fit = result.eval(**allcomps)
comp_best_fit = result.eval_components(**allcomps)
# print(result.best_values)
# apply extinction and ice absorption to total best fit
if global_extinction:
for el in comp_best_fit.keys():
if el != 'global_ext' and el != 'global_ice':
comp_best_fit[el] *= comp_best_fit['global_ext']
comp_best_fit[el] *= comp_best_fit['global_ice']
ct_coeff = {'MIRparams': result.params,
'comp_best_fit': comp_best_fit}
return best_fit, ct_coeff, z, None
[docs]
def linfit_plus_FeII(lam: np.ndarray,
flux: np.ndarray,
weight: np.ndarray,
index: np.ndarray,
logfile: Optional[str]=None,
quiet: bool=True,
specConv: Optional[spectConvol]=None,
Fe_z: float=0.,
Fe_FWHM_opt: Optional[float]=None,
Fe_FWHM_uv: Optional[float]=None,
tempdir: Optional[str]=None,
**kwargs):
'''
Fit the continuum as a superposition of a linear fit and
an FeII template. Optical FeII template taken from Vestergaard & Wilkes (2001).
UV FeII template follows the PyQSOFit code (Guo et al. 2018) which uses the
template by Shen et al. (2019). Which is in turn a composite of VW01,
Tsuzuki et al. (2006) and Salviander et al. (2007). The UV and optical FeII
templates are scaled up/down separately from each other.
Parameters
----------
lam
Wavelengths
flux
Fluxes
weight
Weights
index
Points used in the fit.
logfile
Optional. Name of the log file. Default is None.
quiet
Optional. Suppress output to stdout? Default is True.
specConv
Optional. Spectral resolution object. If set to None, no convolution
will be performed. The default is None.
Fe_z
Optional. Redshift of the FeII template. Default is 0.
Fe_FWHM_opt
Optional. The smoothing kernel of the optical FeII template,
in km/s. Default is None, which means no smoothing.
Fe_FWHM_uv
Optional. The smoothing kernel of the optical FeII template,
in km/s. Default is None, which means no smoothing.
tempdir
Optional. Directory containing the templates, to search if default
directory does not contain template. Default is None.
Returns
-------
numpy.ndarray
Best fit continuum model.
dict[str, Any]
Best fit parameters.
'''
flux_cut = flux[index]
lam_cut = lam[index]
models_dictionary = {'x': lam_cut}
ind_opt = (lam_cut > 0.3686*(1.+Fe_z))&(lam_cut < 0.7484*(1.+Fe_z))
ind_UV = (lam_cut > 0.120*(1.+Fe_z))&(lam_cut < 0.350*(1.+Fe_z))
model_linfit = LinearModel(independent_vars=['x'], prefix='lincont',
nan_policy='raise', **kwargs)
pars_lin = model_linfit.make_params(intercept=np.nanmedian(flux), slope=0)
model = model_linfit
param = pars_lin
if np.sum(ind_opt)>0:
F_fe_rebin = np.zeros(len(lam_cut))
F_Fe_opt_conv, wave_fe = \
Fe_template_vw01('fe_optical', Fe_z=Fe_z, Fe_FWHM=Fe_FWHM_opt,
specConv=specConv, wavecen=4861.*(1. + Fe_z),
tempdir=tempdir)
F_fe_rebin[ind_opt] = \
q3dmath.interp_lis(lam_cut[ind_opt], wave_fe, F_Fe_opt_conv)
models_dictionary['temp_fe_opt'] = F_fe_rebin
p_init = [0.5]
p_vary = [True]
model_fe_templ_opt, param_fe_templ_opt = \
questfitfcn.set_up_fit_model_scale(p_init, p_vary,
'temp_fe_opt', 'temp_fe_opt', maxamp=1.05*max(flux[index]))
model += model_fe_templ_opt
param += param_fe_templ_opt
if np.sum(ind_UV)>0:
if Fe_FWHM_uv is None:
Fe_FWHM_uv = Fe_FWHM_opt
F_Fe_rebin_UV = np.zeros(len(lam_cut))
F_Fe_UV_conv, wave_fe_UV = \
Fe_template_vw01('fe_uv', Fe_z=Fe_z, Fe_FWHM=Fe_FWHM_uv,
specConv=specConv, wavecen=2800.*(1. + Fe_z),
tempdir=tempdir)
F_Fe_rebin_UV[ind_UV] = \
q3dmath.interp_lis(lam_cut[ind_UV], wave_fe_UV, F_Fe_UV_conv)
models_dictionary['temp_fe_UV'] = F_Fe_rebin_UV
p_init = [0.5]
p_vary = [True]
model_fe_templ_UV, param_fe_templ_UV = \
questfitfcn.set_up_fit_model_scale(p_init, p_vary, 'temp_fe_UV',
'temp_fe_UV', maxamp=1.05*max(flux[index]) )
model += model_fe_templ_UV
param += param_fe_templ_UV
result = model.fit(flux_cut, param, **models_dictionary,
max_nfev=int(1e5), method='least_squares',
nan_policy='omit', **{'verbose': 2})
print(result.fit_report())
# extrapolate result to full wavelength vector
models_dictionary_B = {'x': lam}
if np.sum(ind_opt)>0:
F_Fe_opt_all = q3dmath.interp_lis(lam, wave_fe, F_Fe_opt_conv)
models_dictionary_B['temp_fe_opt'] = F_Fe_opt_all
if np.sum(ind_UV)>0:
F_Fe_UV_all = q3dmath.interp_lis(lam, wave_fe_UV, F_Fe_UV_conv)
models_dictionary_B['temp_fe_UV'] = F_Fe_UV_all
continuum = result.eval(**models_dictionary_B)
comp_best_fit = result.eval_components(**models_dictionary_B)
ct_coeff = {'params_best_fit': result.params,
'comp_best_fit': comp_best_fit}
'''
# Not sure why this is necessary
if isinstance(rows, list) or isinstance(rows, np.ndarray):
rows = rows[0]
if isinstance(cols, list) or isinstance(cols, np.ndarray):
cols = cols[0]
if rows is not None and cols is not None:
saveres = 'fit_result__{}_{}.txt'.format(cols, rows)
else:
saveres = 'fit_result.txt'
if outdir is not None:
with open(outdir+saveres, 'w') as fh:
fh.write(result.fit_report())
fh.write('\n')
'''
return continuum, ct_coeff, None, None
[docs]
def Fe_template_vw01(filename: str,
Fe_z: Optional[float]=None,
Fe_FWHM: Optional[float]=None,
specConv: Optional[object]=None,
wavecen: Optional[float]=None,
tempdir: Optional[str]=None) \
-> tuple[np.ndarray, np.ndarray]:
'''
Load the Vestergaard & Wilkes 2001 FeII template; redshift it;
convolve it with intrinsic broadening;
and convolve it with the instrumental resolution.
Parameters
----------
filename
Name of the FeII template file.
Fe_z
Optional. Redshift to apply to the FeII template. Default is None, which
means no redshift is applied.
Fe_FWHM
Optional. The Gaussian smoothing kernel for the FeII template in km/s.
Default is None, which means no smoothing is applied.
specConv
Optional. Instance of :py:class:`~q3dfit.spectConvol.spectConvol` specifying
the instrumental spectral resolution convolution. Default is None, which
means no convolution is applied.
wavecen
Optional. Central wavelength for creating specConv instance, to find
proper gratings. Default is None, which means no convolution is applied.
tempdir
Optional. Directory containing the templates, to search if default
directory does not contain template. Default is None.
Returns
-------
np.ndarray
Flux array of the FeII template.
np.ndarray
Wavelength array of the FeII template.
'''
with pkg_resources.path(questfit_templates, filename) as p:
data1 = np.loadtxt(p)
if not os.path.exists(p) and tempdir is not None:
temp_model = np.load(tempdir+filename)
else:
raise InitializationError('Cannot locate template specified in config file.')
wave_fe = 10**data1[:,0]/1e4
# normalize template
F_fe = data1[:,1]/np.max(data1[:,1])
# redshift if requested
if Fe_z is not None:
wave_fe *= 1. + Fe_z
# intrinsic convolution
if Fe_FWHM is not None:
if Fe_FWHM <= 900.:
raise InitializationError('FWHM of the FeII template must be >900 km/s.')
# Start by computing the *additional* convolution required.
sig_conv = np.sqrt(Fe_FWHM ** 2 - 900.0 ** 2) / 2. / \
np.sqrt(2. * np.log(2.))
# 106.3 km/s is the dispersion for the BG92 FeII template
# used by Vestergaard & Wilkes 2001
sig_pix = sig_conv / 106.3
xx = np.arange(0, 2*np.round(3.5*sig_pix+1, 0), 1) - \
np.round(3.5*sig_pix+1, 0)
# normalized Gaussian kernal in pixel space
kernel = np.exp(-xx ** 2 / (2 * sig_pix ** 2))
kernel = kernel / np.sum(kernel)
F_fe = np.convolve(F_fe, kernel, 'same')
if specConv is not None and wavecen is not None:
F_fe = specConv.spect_convolver(wave_fe, F_fe, wavecen)
return F_fe, wave_fe
[docs]
def readcf(filename: str) -> dict:
'''
Read in the configuration file for fitting with questfit.
Parameters
----------
filename
Name of, and path, to the configuration file.
Returns
-------
dict
Dictionary with the configuration file. Keys are the names of the
components to be fitted, values are lists with the parameters for
the components.
'''
cf = np.loadtxt(filename, dtype=np.str_, comments="#")
# Cycle through rows
init_questfit = dict()
for i in cf:
if i[0] == 'template' or i[0] == 'powerlaw' or i[0] == 'blackbody' \
or i[0] == 'template_poly':
# populate initilization dictionary with
# col 0: filename (if nessesary; path hardcoded)
# col 1: lower wavelength limit or normalization factor
# col 2: upper wavelength limit or fix/free parameter (0 or 1) for normalization
# col 3: name of ext. curve or ice feature
# col 4: initial guess for Av
# col 5: fix/free parameter (0/1) for Av
# col 6: S,M = screen or mixed extinction
# col 7: initial guess for BB temperature or powerlaw index
# col 8: fix/free parameter (0/1) for BB temperature or powerlaw index
# col 9: ice name model
# col 10: intial guess for ice absorption tau
# col 11: fix/free parameter (0/1) for tau
init_questfit[i[0]+'_'+i[1]+'_'+i[4]+'_'+i[10]] = i[1:]
if i[0] == 'absorption' or i[0] == 'extinction':
#init_questfit[i[0]+'_'+i[1]+'_'+i[4]+'_'+i[10]] = i[1:]
init_questfit[i[4]] = [i[1], i[0]]
if i[0] == 'source':
init_questfit['source'] = i[1:]
return init_questfit