Source code for q3dfit.lineinit

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from __future__ import annotations
from numpy.typing import ArrayLike

from typing import Optional
import numpy as np
import os

from astropy.constants import c
from astropy.table import QTable, Table
from lmfit import Model, Parameters

import q3dfit.data
from . import q3dutil
from q3dfit.exceptions import InitializationError
from q3dfit.spectConvol import spectConvol

[docs] def lineinit(linelist: Table, linelistz: dict, linetie: dict, initflux: dict[str, np.ndarray], initsig: dict[str, np.ndarray], siglim: dict[str, np.ndarray], ncomp: dict[str, np.ndarray], linevary: Optional[dict[str, np.ndarray]]=None, specConv: Optional[spectConvol]=None, lineratio: Optional[Table | QTable]=None, waves: Optional[np.ndarray]=None) -> tuple[Model, Parameters]: ''' Initialize parameters for emission-line fitting. Parameters ---------- linelist Emission lines to fit. linelistz Initial guess for line redshifts. linetie Line to which to tie each line. initflux Initial guess for line fluxes. initsig Initial guess for line sigmas. siglim Lower and upper limits for line sigmas. ncomp Number of components for each line. linevary Optional. Line parameter vary flags (fix/free). If set to None, all parameters are set to vary=True. The default is None. specConv Optional. Spectral resolution object. If set to None, no convolution will be performed. The default is None. lineratio Optional. Table of line ratio constraints, beyond those in the doublets table, https://github.com/Q3D/q3dfit/blob/main/q3dfit/data/linelists/doublets.tbl. The default is None. The table must contain the columns `line1`, `line2`, `comp`, and optionally `value`, `fixed`, `lower`, and `upper`.\n - `line1`/`line2`: the names of the two lines to which the ratio `line1`/`line2` applies. These are the labels found in the linelist Table. - `comp`: an array of velocity components (zero-indexed) on which to apply the constraints, one array for each pair of lines. - `value`: the initial value of `line1`/`line2`. Presently, if `value` is specified for one pair of lines, it must be specified for all. Otherwise, the initial value is determined from the data. - `fixed`: The ratio can be `fixed` to the initial value. Presently, if `fixed` is defined, it must be set to `True` or `False` for all pairs of line. - `lower`/`upper`: If the ratio is not `fixed`, `lower` and `upper` limits can also be specified. (If they are not, and the line pair is a doublet in the doublets.tbl file, then the lower and upper limits are set using the data in that file.) Presently, if `lower` or `upper` is defined here for one set of lines, it must be defined here for every pair of lines. waves Optional. Wavelength array for determining if a line is in the fit range. The default is None. Returns ------- tuple[Model, Parameters] A tuple containing the LMFIT Model and the fit parameters. ''' # Get fixed-ratio doublet pairs for tying intensities data_path = os.path.abspath(q3dfit.data.__file__)[:-11] doublets64 = Table.read(os.path.join(data_path, 'linelists', 'doublets.tbl'), format='ipac') doublets = \ Table(doublets64, dtype=['str', 'str', 'int', 'float64', 'float64', 'float64']) dblt_pairs = dict() # key the doublet pairs by the line listed in the second column # Ratio value is of the first line divided by the second for idx, name in enumerate(doublets['line1']): #if doublets['fixed_ratio'][idx] == 1: dblt_pairs[doublets['line2'][idx]] = doublets['line1'][idx] # converts the astropy.Table structure of linelist into a Python # dictionary that is compatible with the code downstream lines_arr = {name: linelist['lines'][idx] for idx, name in enumerate(linelist['name'])} # the total LMFIT Model # size = # model instances totmod = [] # cycle through lines for line in lines_arr: # cycle through velocity components for i in range(0, ncomp[line]): # LMFIT parameters can only consist of letters, numbers, or _ lmline = q3dutil.lmlabel(line) mName = f'{lmline.lmlabel}_{i}_' imodel = Model(manygauss, prefix=mName, SPECRES=specConv) if isinstance(totmod, Model): totmod += imodel else: totmod = imodel # Create parameter dictionary fit_params = totmod.make_params() # dictionary for ratios if needed rat_params = Parameters() # this is so we can reset the lower bound for the lower line # otherwise it defaults to the lower value of the ratio, for unknown # reasons rat_lines2 = list() # Cycle through parameters for i, parname in enumerate(fit_params.keys()): # split parameter name string into line, component #, and parameter psplit = parname.split('_') lmline = '' # this bit is for the case where the line label has underscores in it for i in range(0, len(psplit)-2): lmline += psplit[i] # string for line label if i != len(psplit)-3: lmline += '_' line = q3dutil.lmlabel(lmline, reverse=True) inrange = True if waves is not None: if linelistz[line.label][0] > max(waves) or \ linelistz[line.label][0] < min(waves): inrange = False # ... the final two underscores separate the line label from the comp # and gaussian parname comp = int(psplit[len(psplit)-2]) # string for line component gpar = psplit[len(psplit)-1] # parameter name in manygauss # If line out of range or something else value = np.float64(np.finfo(float).eps) limited = None limits = None vary = False tied = '' # Process input values if gpar == 'flx' and inrange: value = initflux[line.label][comp] limited = np.array([1, 0], dtype='uint8') limits = np.array([np.finfo(float).eps, np.finfo(float).eps], dtype='float64') # Check if it's a doublet; this will break if weaker line # is in list, but stronger line is not # The label is the second line in the doublet if line.label in dblt_pairs.keys(): # we'll index the doublets table by the first line dblt_lmline = q3dutil.lmlabel(dblt_pairs[line.label]) idx_line = np.where(doublets['line1']==dblt_lmline.lmlabel.replace('lb', '[').replace('rb', ']'))[0] # This is for doublets with fixed ratios if doublets['fixed_ratio'][idx_line].value[0] == 1: ratio = doublets['ratio'][idx_line].value[0] # this is ties the second line flux to equal # the first line flux divided by the ratio tied = f'{dblt_lmline.lmlabel}_{comp}_flx/{ratio}' # This is for doublets with lower and upper limits else: # for the case where the initial value is set # in the doublets table if not isinstance(doublets['ratio'][idx_line].value[0], np.ma.MaskedArray): ratio = doublets['ratio'][idx_line].value[0] # otherwise, use the initial flux values else: ratio = initflux[dblt_pairs[line.label]][comp] / \ initflux[line.label][comp] # if the ratio is outside the limits, set it to the # average of the limits if ratio < doublets['lower'][idx_line].value[0] or \ ratio > doublets['upper'][idx_line].value[0]: ratio = (doublets['upper'][idx_line].value[0] + doublets['lower'][idx_line].value[0]) / 2. # free ratio, so tie to first line divided by ratio dblt_lmline2 = q3dutil.lmlabel(line.label) # create new parameter for the ratio lmrat = f'{dblt_lmline.lmlabel}_div_{dblt_lmline2.lmlabel}_{comp}' tied = f'{dblt_lmline.lmlabel}_{comp}_flx/{lmrat}' # set limits for the ratio limits = np.array([doublets['lower'][idx_line].value[0], doublets['upper'][idx_line].value[0]], dtype='float64') rat_params.add(lmrat) rat_params = set_params(rat_params, lmrat, VALUE=ratio, VARY=True, LIMITED=[1,1], LIMITS=limits) rat_lines2.append(f'{dblt_lmline2.lmlabel}_{comp}_flx') else: tied = '' if linevary is None: vary = True else: try: vary = linevary[line.label][gpar][comp] except: raise InitializationError('linevary not properly defined') elif gpar == 'cwv': value = linelistz[line.label][comp] limited = np.array([1, 1], dtype='uint8') limits = np.array([value*0.997, value*1.003], dtype='float32') # Check if line is tied to something else if linetie[line.label] != line.label: linetie_tmp = q3dutil.lmlabel(linetie[line.label]) tied = '{0:0.6e} / {1:0.6e} * {2}_{3}_cwv'.\ format(lines_arr[line.label], lines_arr[linetie[line.label]], linetie_tmp.lmlabel, comp) # This is pretty odd; it's a very special case # elif force_cwv_lines is not None: # if line.label in force_cwv_lines and comp > 0: # linetie_tmp = q3dutil.lmlabel(linetie[line.label]) # tied = '{}_0_cwv'.\ # format(linetie_tmp.lmlabel) else: tied = '' if linevary is None: vary = True else: try: vary = linevary[line.label][gpar][comp] except: raise InitializationError('linevary not properly defined') elif gpar == 'sig': value = initsig[line.label][comp] limited = np.array([1, 1], dtype='uint8') limits = siglim[line.label][comp] if linetie[line.label] != line.label: linetie_tmp = q3dutil.lmlabel(linetie[line.label]) tied = f'{linetie_tmp.lmlabel}_{comp}_sig' else: tied = '' if linevary is None: vary = True else: try: vary = linevary[line.label][gpar][comp] except: raise InitializationError('linevary not properly defined') fit_params = \ set_params(fit_params, parname, VALUE=value, VARY=vary, LIMITED=limited, TIED=tied, LIMITS=limits) # add ratio parameters to fit_params if len(rat_params) > 0: fit_params.update(rat_params) # logic for bounding or fixing line ratios if lineratio is not None: if not isinstance(lineratio, QTable) and \ not isinstance(lineratio, Table): raise InitializationError('The lineratio key must be' + ' an astropy Table or QTable') elif 'line1' not in lineratio.colnames or \ 'line2' not in lineratio.colnames or \ 'comp' not in lineratio.colnames: raise InitializationError('The lineratio table must contain' + ' the line1, line2, and comp columns') for ilinrat in range(0, len(lineratio)): line1 = lineratio['line1'][ilinrat] line2 = lineratio['line2'][ilinrat] comps = lineratio['comp'][ilinrat] lmline1 = q3dutil.lmlabel(line1) lmline2 = q3dutil.lmlabel(line2) for comp in comps: if f'{lmline1.lmlabel}_{comp}_flx' in fit_params.keys() and \ f'{lmline2.lmlabel}_{comp}_flx' in fit_params.keys(): # set initial value if 'value' in lineratio.colnames: initval = lineratio['value'][ilinrat] else: initval = \ np.divide( fit_params[f'{lmline1.lmlabel}_{comp}_flx'], fit_params[f'{lmline2.lmlabel}_{comp}_flx']) lmrat = f'{lmline1.lmlabel}_div_{lmline2.lmlabel}_{comp}' # make sure the user didn't invert the order of the lines if f'{lmline2.lmlabel}_div_{lmline1.lmlabel}_{comp}' \ in fit_params.keys(): raise InitializationError( f'Line ratio {line2} / {line1} ' + f'is inverted') # add the ratio parameter if it doesn't exist if lmrat not in fit_params.keys(): fit_params.add(lmrat, value=initval.astype('float64')) # tie second line to first line divided by the ratio fit_params[f'{lmline2.lmlabel}_{comp}_flx'].expr = \ f'{lmline1.lmlabel}_{comp}_flx'+'/'+lmrat # if it already exists, set the value elif 'value' in lineratio.colnames: fit_params[lmrat].value = initval.astype('float64') # fixed or free if 'fixed' in lineratio.colnames: if lineratio['fixed'][ilinrat]: fit_params[lmrat].vary = False # apply lower limit? if 'lower' in lineratio.colnames: lower = lineratio['lower'][ilinrat] fit_params[lmrat].min = lower.astype('float64') # logic to apply doublet lower limits if in doublets table elif line1 in doublets['line1']: iline1 = np.where(doublets['line1'] == line1) if doublets['line2'][iline1] == line2: lower = doublets['lower'][iline1][0] fit_params[lmrat].min = lower.astype('float64') # doublet can be specified in init file in either order # relative to doublets table ... elif line1 in doublets['line2']: iline1 = np.where(doublets['line2'] == line1) if doublets['line1'][iline1] == line2: upper = 1. / doublets['lower'][iline1][0] fit_params[lmrat].max = upper.astype('float64') # apply upper limit? if 'upper' in lineratio.colnames: upper = lineratio['upper'][ilinrat] fit_params[lmrat].max = upper.astype('float64') elif line1 in doublets['line1']: iline1 = np.where(doublets['line1'] == line1) if doublets['line2'][iline1] == line2: upper = doublets['upper'][iline1][0] fit_params[lmrat].max = upper.astype('float64') elif line1 in doublets['line2']: iline1 = np.where(doublets['line2'] == line1) if doublets['line1'][iline1] == line2: lower = 1. / doublets['upper'][iline1][0] fit_params[lmrat].min = lower.astype('float64') rat_lines2.append(f'{lmline2.lmlabel}_{comp}_flx') if len(rat_lines2) > 0: # this is to reset the lower bound for the second line in the ratio # otherwise it defaults to the lower value of the ratio, for unknown # reasons for ratline2 in rat_lines2: fit_params[ratline2].min = np.finfo(float).eps return totmod, fit_params
[docs] def set_params(fit_params: Parameters, NAME: str, VALUE: Optional[float]=None, VARY: bool=True, LIMITED: Optional[ArrayLike]=None, TIED: Optional[str]=None, LIMITS: Optional[np.ndarray]=None)->Parameters: ''' Set parameters for the lmfit model using the lmfit Parameters object. Parameters ---------- fit_params Fit parameter object. NAME Parameter name. VALUE Optional. Initial value. The default is None, in which case no value is set. VARY Vary flag. The default is True, in which case the parameter is free to vary. LIMITED Whether or not to limit the parameter. The default, None, is to set no limits. TIED Expression for tying the parameter to another. The default is None. LIMITS Limits for the parameter. The default is None, in which case no limits are set. Returns ------- Parameters Modified fit parameter object. ''' # we can force the input to float64, but lmfit has values as float64 and # doesn't seem like we can change it. These astypes assume that the # VALUE is a numpy object if VALUE is not None: fit_params[NAME].set(value=VALUE.astype('float64')) fit_params[NAME].set(vary=VARY) if TIED is not None: fit_params[NAME].expr = TIED if LIMITED is not None and LIMITS is not None: if LIMITED[0] == 1: fit_params[NAME].min = LIMITS[0].astype('float64') if LIMITED[1] == 1: fit_params[NAME].max = LIMITS[1].astype('float64') return fit_params
[docs] def manygauss(x: np.ndarray, flx: np.ndarray, cwv: float, sig: float, SPECRES: Optional[spectConvol]=None) -> np.ndarray: ''' Generate a Gaussian model for a given set of parameters. Parameters ---------- x Wavelength array. flx Flux array. cwv Central wavelength. sig Sigma. SPECRES Spectral resolution object, for convolving the Gaussian with the spectral resolution. The default is None. Returns ------- numpy.ndarray Gaussian model. ''' sigs = sig / c.to('km/s').value * cwv gaussian = flx * np.exp(-np.power((x - cwv) / sigs, 2.)/2.) if SPECRES is not None: # resample spectrum on smaller grid before convolution if # sigma less than 1 pixel. Note that this doesn't assume # constant dispersion, which I *think* is okay for the # ppxf_util.varsmooth() algorithm. But I'm not sure. oversample = 1 if np.mean(sigs) <= np.mean(x[1:-1]-x[0:-2])*1.: oversample = 10 datconv = SPECRES.spect_convolver(x, gaussian, wavecen=cwv, oversample=oversample) #maskval = np.float64(1e-4*max(datconv)) #maskind = np.asarray(datconv < maskval).nonzero()[0] #datconv[maskind] = np.float64(0.) return datconv else: #maskval = np.float64(1e-4*max(gaussian)) #maskind = np.asarray(gaussian < maskval).nonzero()[0] #gaussian[maskind] = np.float64(0.) return gaussian