Source code for pyspeckit.spectrum.models.ammonia

"""
========================================
Ammonia inversion transition TROT fitter
========================================

Ammonia inversion transition TROT fitter translated from Erik Rosolowsky's
http://svn.ok.ubc.ca/svn/signals/nh3fit/

.. moduleauthor:: Adam Ginsburg <adam.g.ginsburg@gmail.com>

Module API
^^^^^^^^^^

"""
from __future__ import division

import numpy as np
from ...mpfit import mpfit
from ...spectrum.parinfo import ParinfoList,Parinfo
from . import fitter
from . import model
import matplotlib.cbook as mpcb
import copy
from astropy import log
from astropy.extern.six import iteritems
from . import mpfit_messages
import operator
import string

from .ammonia_constants import (line_names, freq_dict, aval_dict, ortho_dict,
                                voff_lines_dict, tau_wts_dict)

TCMB = 2.7315 # K

[docs]def ammonia(xarr, trot=20, tex=None, ntot=14, width=1, xoff_v=0.0, fortho=0.0, tau=None, fillingfraction=None, return_tau=False, background_tb=TCMB, verbose=False, return_components=False, debug=False, line_names=line_names): """ Generate a model Ammonia spectrum based on input temperatures, column, and gaussian parameters Parameters ---------- xarr: `pyspeckit.spectrum.units.SpectroscopicAxis` Array of wavelength/frequency values trot: float The rotational temperature of the lines. This is the excitation temperature that governs the relative populations of the rotational states. tex: float or None Excitation temperature. Assumed LTE if unspecified (``None``) or if tex>trot. This is the excitation temperature for *all* of the modeled lines, which means we are explicitly assuming T_ex is the same for all lines. ntot: float Total log column density of NH3. Can be specified as a float in the range 5-25 width: float Line width in km/s xoff_v: float Line offset in km/s fortho: float Fraction of NH3 molecules in ortho state. Default assumes all para (fortho=0). tau: None or float If tau (optical depth in the 1-1 line) is specified, ntot is NOT fit but is set to a fixed value. The optical depths of the other lines are fixed relative to tau_oneone fillingfraction: None or float fillingfraction is an arbitrary scaling factor to apply to the model return_tau: bool Return a dictionary of the optical depths in each line instead of a synthetic spectrum return_components: bool Return a list of arrays, one for each hyperfine component, instead of just one array background_tb : float The background brightness temperature. Defaults to TCMB. verbose: bool More messages debug: bool For debugging. Returns ------- spectrum: `numpy.ndarray` Synthetic spectrum with same shape as ``xarr`` component_list: list List of `numpy.ndarray`'s, one for each hyperfine component tau_dict: dict Dictionary of optical depth values for the various lines (if ``return_tau`` is set) """ from .ammonia_constants import (ckms, ccms, h, kb, Jortho, Jpara, Brot, Crot) # Convert X-units to frequency in GHz xarr = xarr.as_unit('GHz') if tex is None: log.warning("Assuming tex=trot") tex = trot elif isinstance(tex, dict): for k in tex: assert k in line_names,"{0} not in line list".format(k) line_names = tex.keys() from .ammonia_constants import line_name_indices, line_names as original_line_names # recreate line_names keeping only lines with a specified tex # using this loop instead of tex.keys() preserves the order & data type line_names = [k for k in original_line_names if k in line_names] if 5 <= ntot <= 25: # allow ntot to be specified as a logarithm. This is # safe because ntot < 1e10 gives a spectrum of all zeros, and the # plausible range of columns is not outside the specified range lin_ntot = 10**ntot else: raise ValueError("ntot, the logarithmic total column density," " must be in the range 5 - 25") tau_dict = {} """ Column density is the free parameter. It is used in conjunction with the full partition function to compute the optical depth in each band """ Zpara = (2*Jpara+1)*np.exp(-h*(Brot*Jpara*(Jpara+1)+ (Crot-Brot)*Jpara**2)/(kb*trot)) Zortho = 2*(2*Jortho+1)*np.exp(-h*(Brot*Jortho*(Jortho+1)+ (Crot-Brot)*Jortho**2)/(kb*trot)) Qpara = Zpara.sum() Qortho = Zortho.sum() log.debug("Partition Function: Q_ortho={0}, Q_para={1}".format(Qortho, Qpara)) for linename in line_names: if ortho_dict[linename]: # define variable "ortho_or_para_frac" that will be the ortho # fraction in the case of an ortho transition or the para # fraction for a para transition ortho_or_parafrac = fortho Z = Zortho Qtot = Qortho else: ortho_or_parafrac = 1.0-fortho Z = Zpara Qtot = Qpara # for a complete discussion of these equations, please see # https://github.com/keflavich/pyspeckit/blob/ammonia_equations/examples/AmmoniaLevelPopulation.ipynb # and # http://low-sky.github.io/ammoniacolumn/ # and # https://github.com/pyspeckit/pyspeckit/pull/136 # short variable names for readability frq = freq_dict[linename] partition = Z[line_name_indices[linename]] aval = aval_dict[linename] # Total population of the higher energy inversion transition population_rotstate = lin_ntot * ortho_or_parafrac * partition/Qtot if isinstance(tex, dict): expterm = ((1-np.exp(-h*frq/(kb*tex[linename]))) / (1+np.exp(-h*frq/(kb*tex[linename])))) else: expterm = ((1-np.exp(-h*frq/(kb*tex))) / (1+np.exp(-h*frq/(kb*tex)))) fracterm = (ccms**2 * aval / (8*np.pi*frq**2)) widthterm = (ckms/(width*frq*(2*np.pi)**0.5)) tau_i = population_rotstate * fracterm * expterm * widthterm tau_dict[linename] = tau_i log.debug("Line {0}: tau={1}, expterm={2}, pop={3}," " partition={4}" .format(linename, tau_i, expterm, population_rotstate, partition)) # allow tau(11) to be specified instead of ntot # in the thin case, this is not needed: ntot plays no role # this process allows you to specify tau without using the approximate equations specified # above. It should remove ntot from the calculations anyway... if tau is not None: tau11_temp = tau_dict['oneone'] # re-scale all optical depths so that tau is as specified, but the relative taus # are sest by the kinetic temperature and partition functions for linename,t in iteritems(tau_dict): tau_dict[linename] = t * tau/tau11_temp if return_tau: return tau_dict model_spectrum = _ammonia_spectrum(xarr, tex, tau_dict, width, xoff_v, fortho, line_names, background_tb=background_tb, fillingfraction=fillingfraction, return_components=return_components) if model_spectrum.min() < 0 and background_tb == TCMB: raise ValueError("Model dropped below zero. That is not possible " " normally. Here are the input values: "+ ("tex: {0} ".format(tex)) + ("trot: %f " % trot) + ("ntot: %f " % ntot) + ("width: %f " % width) + ("xoff_v: %f " % xoff_v) + ("fortho: %f " % fortho) ) if verbose or debug: log.info("trot: %g tex: %s ntot: %g width: %g xoff_v: %g " "fortho: %g fillingfraction: %g" % (trot, tex, ntot, width, xoff_v, fortho, fillingfraction)) return model_spectrum
[docs]def cold_ammonia(xarr, tkin, **kwargs): """ Generate a model Ammonia spectrum based on input temperatures, column, and gaussian parameters Parameters ---------- xarr: `pyspeckit.spectrum.units.SpectroscopicAxis` Array of wavelength/frequency values tkin: float The kinetic temperature of the lines in K. Will be converted to rotational temperature following the scheme of Swift et al 2005 (http://esoads.eso.org/abs/2005ApJ...620..823S, eqn A6) and further discussed in Equation 7 of Rosolowsky et al 2008 (http://adsabs.harvard.edu/abs/2008ApJS..175..509R) """ dT0 = 41.18 # Energy difference between (2,2) and (1,1) in K trot = tkin * (1 + (tkin/dT0)*np.log(1 + 0.6*np.exp(-15.7/tkin)))**-1 log.debug("Cold ammonia turned T_K = {0} into T_rot = {1}".format(tkin,trot)) return ammonia(xarr, trot=trot, **kwargs)
[docs]def ammonia_thin(xarr, tkin=20, tex=None, ntot=14, width=1, xoff_v=0.0, fortho=0.0, tau=None, return_tau=False, **kwargs): """ Use optical depth in the 1-1 line as a free parameter The optical depths of the other lines are then set by the kinetic temperature tkin is used to compute trot assuming a 3-level system consisting of (1,1), (2,1), and (2,2) as in Swift et al, 2005 [2005ApJ...620..823S] """ tau_dict = {} tex = tkin dT0 = 41.5 # Energy diff between (2,2) and (1,1) in K trot = tkin/(1+tkin/dT0*np.log(1+0.6*np.exp(-15.7/tkin))) tau_dict['oneone'] = tau tau_dict['twotwo'] = tau*(23.722/23.694)**2*4/3.*5/3.*np.exp(-41.5/trot) tau_dict['threethree'] = tau*(23.8701279/23.694)**2*3/2.*14./3.*np.exp(-101.1/trot) tau_dict['fourfour'] = tau*(24.1394169/23.694)**2*8/5.*9/3.*np.exp(-177.34/trot) line_names = tau_dict.keys() # TODO: Raise a warning if tkin > (some value), probably 50 K, because # the 3-level system approximation used here will break down. if return_tau: return tau_dict else: return _ammonia_spectrum(xarr, tex, tau_dict, width, xoff_v, fortho, line_names, **kwargs)
def _ammonia_spectrum(xarr, tex, tau_dict, width, xoff_v, fortho, line_names, background_tb=TCMB, fillingfraction=None, return_components=False): """ Helper function: given a dictionary of ammonia optical depths, an excitation tmeperature... etc, produce the spectrum """ from .ammonia_constants import (ckms, h, kb) # fillingfraction is an arbitrary scaling for the data # The model will be (normal model) * fillingfraction if fillingfraction is None: fillingfraction = 1.0 # "runspec" means "running spectrum": it is accumulated over a loop runspec = np.zeros(len(xarr)) components = [] for linename in line_names: voff_lines = np.array(voff_lines_dict[linename]) tau_wts = np.array(tau_wts_dict[linename]) lines = (1-voff_lines/ckms)*freq_dict[linename]/1e9 tau_wts = tau_wts / (tau_wts).sum() nuwidth = np.abs(width/ckms*lines) nuoff = xoff_v/ckms*lines # tau array tauprof = np.zeros(len(xarr)) for kk,nuo in enumerate(nuoff): tauprof += (tau_dict[linename] * tau_wts[kk] * np.exp(-(xarr.value+nuo-lines[kk])**2 / (2.0*nuwidth[kk]**2))) components.append(tauprof) T0 = (h*xarr.value*1e9/kb) # "temperature" of wavelength if isinstance(tex, dict): runspec = ((T0/(np.exp(T0/tex[linename])-1) - T0/(np.exp(T0/background_tb)-1)) * (1-np.exp(-tauprof)) * fillingfraction + runspec) else: runspec = ((T0/(np.exp(T0/tex)-1) - T0/(np.exp(T0/background_tb)-1)) * (1-np.exp(-tauprof)) * fillingfraction + runspec) if return_components: if isinstance(tex, dict): term1 = [(T0/(np.exp(T0/tex[linename])-1)-T0/(np.exp(T0/background_tb)-1)) for linename in line_names] else: term1 = (T0/(np.exp(T0/tex)-1)-T0/(np.exp(T0/background_tb)-1)) return term1*(1-np.exp(-1*np.array(components))) else: return runspec class ammonia_model(model.SpectralModel): def __init__(self,npeaks=1,npars=6, parnames=['trot','tex','ntot','width','xoff_v','fortho'], **kwargs): npeaks = self.npeaks = int(npeaks) npars = self.npars = int(npars) self._default_parnames = parnames self.parnames = copy.copy(self._default_parnames) # all fitters must have declared modelfuncs, which should take the fitted pars... self.modelfunc = ammonia self.n_modelfunc = self.n_ammonia # for fitting ammonia simultaneously with a flat background self.onepeakammonia = fitter.vheightmodel(ammonia) #self.onepeakammoniafit = self._fourparfitter(self.onepeakammonia) self.default_parinfo = None self.default_parinfo, kwargs = self._make_parinfo(**kwargs) # Remove keywords parsed by parinfo and ignored by the fitter for kw in ('tied','partied'): if kw in kwargs: kwargs.pop(kw) # enforce ammonia-specific parameter limits for par in self.default_parinfo: if 'tex' in par.parname.lower(): par.limited = (True,par.limited[1]) par.limits = (max(par.limits[0],TCMB), par.limits[1]) if 'trot' in par.parname.lower(): par.limited = (True,par.limited[1]) par.limits = (max(par.limits[0],TCMB), par.limits[1]) if 'width' in par.parname.lower(): par.limited = (True,par.limited[1]) par.limits = (max(par.limits[0],0), par.limits[1]) if 'fortho' in par.parname.lower(): par.limited = (True,True) if par.limits[1] != 0: par.limits = (max(par.limits[0],0), min(par.limits[1],1)) else: par.limits = (max(par.limits[0],0), 1) if 'ntot' in par.parname.lower(): par.limited = (True,par.limited[1]) par.limits = (max(par.limits[0],0), par.limits[1]) self.parinfo = copy.copy(self.default_parinfo) self.modelfunc_kwargs = kwargs # lower case? self.modelfunc_kwargs.update({'parnames':self.parinfo.parnames}) self.use_lmfit = kwargs.pop('use_lmfit') if 'use_lmfit' in kwargs else False self.fitunits = 'GHz' def __call__(self,*args,**kwargs): return self.multinh3fit(*args,**kwargs) def n_ammonia(self, pars=None, parnames=None, **kwargs): """ Returns a function that sums over N ammonia line profiles, where N is the length of trot,tex,ntot,width,xoff_v,fortho *OR* N = len(pars) / 6 The background "height" is assumed to be zero (you must "baseline" your spectrum before fitting) *pars* [ list ] a list with len(pars) = (6-nfixed)n, assuming trot,tex,ntot,width,xoff_v,fortho repeated *parnames* [ list ] len(parnames) must = len(pars). parnames determine how the ammonia function parses the arguments """ if hasattr(pars,'values'): # important to treat as Dictionary, since lmfit params & parinfo both have .items parnames,parvals = zip(*pars.items()) parnames = [p.lower() for p in parnames] parvals = [p.value for p in parvals] elif parnames is None: parvals = pars parnames = self.parnames else: parvals = pars if len(pars) != len(parnames): # this should only be needed when other codes are changing the number of peaks # during a copy, as opposed to letting them be set by a __call__ # (n_modelfuncs = n_ammonia can be called directly) # n_modelfuncs doesn't care how many peaks there are if len(pars) % len(parnames) == 0: parnames = [p for ii in range(len(pars)//len(parnames)) for p in parnames] npars = len(parvals) / self.npeaks else: raise ValueError("Wrong array lengths passed to n_ammonia!") else: npars = int(len(parvals) / self.npeaks) self._components = [] def L(x): v = np.zeros(len(x)) for jj in range(int(self.npeaks)): modelkwargs = kwargs.copy() for ii in range(int(npars)): name = parnames[ii+jj*int(npars)].strip('0123456789').lower() modelkwargs.update({name:parvals[ii+jj*int(npars)]}) v += self.modelfunc(x,**modelkwargs) return v return L def components(self, xarr, pars, hyperfine=False, **kwargs): """ Ammonia components don't follow the default, since in Galactic astronomy the hyperfine components should be well-separated. If you want to see the individual components overlaid, you'll need to pass hyperfine to the plot_fit call """ comps=[] for ii in range(self.npeaks): if hyperfine: modelkwargs = dict(zip(self.parnames[ii*self.npars:(ii+1)*self.npars], pars[ii*self.npars:(ii+1)*self.npars])) comps.append(self.modelfunc(xarr, return_components=True, **modelkwargs)) else: modelkwargs = dict(zip(self.parnames[ii*self.npars:(ii+1)*self.npars], pars[ii*self.npars:(ii+1)*self.npars])) comps.append([self.modelfunc(xarr, return_components=False, **modelkwargs)]) modelcomponents = np.concatenate(comps) return modelcomponents def multinh3fit(self, xax, data, err=None, parinfo=None, quiet=True, shh=True, debug=False, maxiter=200, use_lmfit=False, veryverbose=False, **kwargs): """ Fit multiple nh3 profiles (multiple can be 1) Inputs: xax - x axis data - y axis npeaks - How many nh3 profiles to fit? Default 1 (this could supersede onedgaussfit) err - error corresponding to data These parameters need to have length = 6*npeaks. If npeaks > 1 and length = 6, they will be replicated npeaks times, otherwise they will be reset to defaults: params - Fit parameters: [trot, tex, ntot (or tau), width, offset, ortho fraction] * npeaks If len(params) % 6 == 0, npeaks will be set to len(params) / 6 fixed - Is parameter fixed? limitedmin/minpars - set lower limits on each parameter (default: width>0, Tex and trot > Tcmb) limitedmax/maxpars - set upper limits on each parameter parnames - default parameter names, important for setting kwargs in model ['trot','tex','ntot','width','xoff_v','fortho'] quiet - should MPFIT output each iteration? shh - output final parameters? Returns: Fit parameters Model Fit errors chi2 """ if parinfo is None: parinfo = self.parinfo = self.make_parinfo(**kwargs) else: if isinstance(parinfo, ParinfoList): if not quiet: log.info("Using user-specified parinfo.") self.parinfo = parinfo else: if not quiet: log.info("Using something like a user-specified parinfo, but not.") self.parinfo = ParinfoList([p if isinstance(p,Parinfo) else Parinfo(p) for p in parinfo], preserve_order=True) fitfun_kwargs = dict((x,y) for (x,y) in kwargs.items() if x not in ('npeaks', 'params', 'parnames', 'fixed', 'limitedmin', 'limitedmax', 'minpars', 'maxpars', 'tied', 'max_tem_step')) if 'use_lmfit' in fitfun_kwargs: raise KeyError("use_lmfit was specified in a location where it " "is unacceptable") # not used: npars = len(parinfo)/self.npeaks self._validate_parinfo() def mpfitfun(x,y,err): if err is None: def f(p,fjac=None): return [0,(y-self.n_ammonia(pars=p, parnames=parinfo.parnames, **fitfun_kwargs)(x))] else: def f(p,fjac=None): return [0,(y-self.n_ammonia(pars=p, parnames=parinfo.parnames, **fitfun_kwargs)(x))/err] return f if veryverbose: log.info("GUESSES: ") log.info(str(parinfo)) #log.info "\n".join(["%s: %s" % (p['parname'],p['value']) for p in parinfo]) if use_lmfit: return self.lmfitter(xax, data, err=err, parinfo=parinfo, quiet=quiet, debug=debug) else: mp = mpfit(mpfitfun(xax,data,err), parinfo=parinfo, maxiter=maxiter, quiet=quiet, debug=debug) mpp = mp.params if mp.perror is not None: mpperr = mp.perror else: mpperr = mpp*0 chi2 = mp.fnorm if mp.status == 0: raise Exception(mp.errmsg) for i,p in enumerate(mpp): parinfo[i]['value'] = p parinfo[i]['error'] = mpperr[i] if not shh: log.info("Fit status: {0}".format(mp.status)) log.info("Fit message: {0}".format(mpfit_messages[mp.status])) log.info("Fit error message: {0}".format(mp.errmsg)) log.info("Final fit values: ") for i,p in enumerate(mpp): log.info(" ".join((parinfo[i]['parname'], str(p), " +/- ", str(mpperr[i])))) log.info(" ".join(("Chi2: ", str(mp.fnorm)," Reduced Chi2: ", str(mp.fnorm/len(data)), " DOF:", str(len(data)-len(mpp))))) self.mp = mp self.parinfo = parinfo self.mpp = self.parinfo.values self.mpperr = self.parinfo.errors self.mppnames = self.parinfo.names self.model = self.n_ammonia(pars=self.mpp, parnames=self.mppnames, **fitfun_kwargs)(xax) indiv_parinfo = [self.parinfo[jj*self.npars:(jj+1)*self.npars] for jj in range(int(len(self.parinfo)/self.npars))] modelkwargs = [dict([(p['parname'].strip("0123456789").lower(), p['value']) for p in pi]) for pi in indiv_parinfo] self.tau_list = [self.modelfunc(xax, return_tau=True,**mk) for mk in modelkwargs] return self.mpp,self.model,self.mpperr,chi2 def moments(self, Xax, data, negamp=None, veryverbose=False, **kwargs): """ Returns a very simple and likely incorrect guess """ # trot, TEX, ntot, width, center, ortho fraction return [20,10, 15, 1.0, 0.0, 1.0] def annotations(self): from decimal import Decimal # for formatting tex_key = {'trot':'T_R', 'tkin': 'T_K', 'tex':'T_{ex}', 'ntot':'N', 'fortho':'F_o', 'width':'\\sigma', 'xoff_v':'v', 'fillingfraction':'FF', 'tau':'\\tau_{1-1}', 'background_tb':'T_{BG}'} # small hack below: don't quantize if error > value. We want to see the values. label_list = [] for pinfo in self.parinfo: parname = tex_key[pinfo['parname'].strip("0123456789").lower()] parnum = int(pinfo['parname'][-1]) if pinfo['fixed']: formatted_value = "%s" % pinfo['value'] pm = "" formatted_error="" else: formatted_value = Decimal("%g" % pinfo['value']).quantize(Decimal("%0.2g" % (min(pinfo['error'],pinfo['value'])))) pm = "$\\pm$" formatted_error = Decimal("%g" % pinfo['error']).quantize(Decimal("%0.2g" % pinfo['error'])) label = "$%s(%i)$=%8s %s %8s" % (parname, parnum, formatted_value, pm, formatted_error) label_list.append(label) labels = tuple(mpcb.flatten(label_list)) return labels def make_parinfo(self, quiet=True, npeaks=1, params=(20,20,14,1.0,0.0,0.5), parnames=None, fixed=(False,False,False,False,False,False), limitedmin=(True,True,True,True,False,True), limitedmax=(False,False,True,False,False,True), minpars=(TCMB,TCMB,5,0,0,0), maxpars=(0,0,25,0,0,1), tied=('',)*6, max_tem_step=1., **kwargs ): if not quiet: log.info("Creating a 'parinfo' from guesses.") self.npars = int(len(params) / npeaks) if len(params) != npeaks and (len(params) / self.npars) > npeaks: npeaks = len(params) / self.npars npeaks = self.npeaks = int(npeaks) if isinstance(params,np.ndarray): params=params.tolist() # this is actually a hack, even though it's decently elegant # somehow, parnames was being changed WITHOUT being passed as a variable # this doesn't make sense - at all - but it happened. # (it is possible for self.parnames to have npars*npeaks elements where # npeaks > 1 coming into this function even though only 6 pars are specified; # _default_parnames is the workaround) if parnames is None: parnames = copy.copy(self._default_parnames) partype_dict = dict(zip(['params', 'parnames', 'fixed', 'limitedmin', 'limitedmax', 'minpars', 'maxpars', 'tied'], [params, parnames, fixed, limitedmin, limitedmax, minpars, maxpars, tied])) # make sure all various things are the right length; if they're # not, fix them using the defaults # (you can put in guesses of length 12 but leave the rest length 6; # this code then doubles the length of everything else) for partype,parlist in iteritems(partype_dict): if len(parlist) != self.npars*self.npeaks: # if you leave the defaults, or enter something that can be # multiplied by npars to get to the right number of # gaussians, it will just replicate if len(parlist) == self.npars: partype_dict[partype] *= npeaks elif len(parlist) > self.npars: # DANGER: THIS SHOULD NOT HAPPEN! log.warning("WARNING! Input parameters were longer than allowed for variable {0}".format(parlist)) partype_dict[partype] = partype_dict[partype][:self.npars] elif parlist==params: # this instance shouldn't really be possible partype_dict[partype] = [20,20,1e10,1.0,0.0,0.5] * npeaks elif parlist==fixed: partype_dict[partype] = [False] * len(params) elif parlist==limitedmax: # only fortho, fillingfraction have upper limits partype_dict[partype] = (np.array(parnames) == 'fortho') + (np.array(parnames) == 'fillingfraction') elif parlist==limitedmin: # no physical values can be negative except velocity partype_dict[partype] = (np.array(parnames) != 'xoff_v') elif parlist==minpars: # all have minima of zero except kinetic temperature, which can't be below CMB. # Excitation temperature technically can be, but not in this model partype_dict[partype] = ((np.array(parnames) == 'trot') + (np.array(parnames) == 'tex')) * TCMB elif parlist==maxpars: # fractions have upper limits of 1.0 partype_dict[partype] = ((np.array(parnames) == 'fortho') + (np.array(parnames) == 'fillingfraction')).astype('float') elif parlist==parnames: # assumes the right number of parnames (essential) partype_dict[partype] = list(parnames) * self.npeaks elif parlist==tied: partype_dict[partype] = [_increment_string_number(t, ii*self.npars) for t in tied for ii in range(self.npeaks)] if len(parnames) != len(partype_dict['params']): raise ValueError("Wrong array lengths AFTER fixing them") # used in components. Is this just a hack? self.parnames = partype_dict['parnames'] parinfo = [{'n':ii, 'value':partype_dict['params'][ii], 'limits':[partype_dict['minpars'][ii],partype_dict['maxpars'][ii]], 'limited':[partype_dict['limitedmin'][ii],partype_dict['limitedmax'][ii]], 'fixed':partype_dict['fixed'][ii], 'parname':partype_dict['parnames'][ii]+str(int(ii/int(self.npars))), 'tied':partype_dict['tied'][ii], 'mpmaxstep':max_tem_step*float(partype_dict['parnames'][ii] in ('tex','trot')), # must force small steps in temperature (True = 1.0) 'error': 0} for ii in range(len(partype_dict['params'])) ] # hack: remove 'fixed' pars #parinfo_with_fixed = parinfo #parinfo = [p for p in parinfo_with_fixed if not p['fixed']] #fixed_kwargs = dict((p['parname'].strip("0123456789").lower(), # p['value']) # for p in parinfo_with_fixed if p['fixed']) # don't do this - it breaks the NEXT call because npars != len(parnames) self.parnames = [p['parname'] for p in parinfo] # this is OK - not a permanent change #parnames = [p['parname'] for p in parinfo] # not OK self.npars = len(parinfo)/self.npeaks parinfo = ParinfoList([Parinfo(p) for p in parinfo], preserve_order=True) #import pdb; pdb.set_trace() return parinfo def _validate_parinfo(self, must_be_limited={'trot': [True,False], 'tex': [False,False], 'ntot': [True, True], 'width': [True, False], 'xoff_v': [False, False], 'tau': [False, False], 'fortho': [True, True]}, required_limits={'trot': [0, None], 'ntot': [5, 25], 'width': [0, None], 'fortho': [0,1]}): """ Make sure the input parameters are all legitimate """ for par in self.parinfo: limited = par.limited parname = par.parname.strip(string.digits).lower() mbl = must_be_limited[parname] for a,b,ul in zip(limited, mbl, ('a lower','an upper')): if b and not a: raise ValueError("Parameter {0} must have {1} limit " "but no such limit is set.".format( parname, ul)) if parname in required_limits: limits = par.limits rlimits = required_limits[parname] for a,b,op,ul in zip(limits, rlimits, (operator.lt, operator.gt), ('a lower','an upper')): if b is not None and op(a,b): raise ValueError("Parameter {0} must have {1} limit " "at least {2} but it is set to {3}." .format(parname, ul, b, a)) class ammonia_model_vtau(ammonia_model): def __init__(self, parnames=['trot', 'tex', 'tau', 'width', 'xoff_v', 'fortho'], **kwargs): super(ammonia_model_vtau, self).__init__(parnames=parnames, **kwargs) def moments(self, Xax, data, negamp=None, veryverbose=False, **kwargs): """ Returns a very simple and likely incorrect guess """ # trot, TEX, ntot, width, center, ortho fraction return [20, 10, 10, 1.0, 0.0, 1.0] def __call__(self,*args,**kwargs): return self.multinh3fit(*args,**kwargs) def _validate_parinfo(self, must_be_limited={'trot': [True,False], 'tex': [False,False], 'tau': [True, False], 'width': [True, False], 'xoff_v': [False, False], 'fortho': [True, True]}, required_limits={'trot': [0, None], 'tex': [None,None], 'width': [0, None], 'tau': [0, None], 'xoff_v': [None,None], 'fortho': [0,1]}): supes = super(ammonia_model_vtau, self) supes._validate_parinfo(must_be_limited=must_be_limited, required_limits=required_limits) return supes def make_parinfo(self, params=(20,14,0.5,1.0,0.0,0.5), fixed=(False,False,False,False,False,False), limitedmin=(True,True,True,True,False,True), limitedmax=(False,False,False,False,False,True), minpars=(TCMB,TCMB,0,0,0,0), maxpars=(0,0,0,0,0,1), tied=('',)*6, **kwargs ): """ parnames=['trot', 'tex', 'tau', 'width', 'xoff_v', 'fortho'] """ return super(ammonia_model_vtau, self).make_parinfo(params=params, fixed=fixed, limitedmax=limitedmax, limitedmin=limitedmin, minpars=minpars, maxpars=maxpars, tied=tied, **kwargs) class ammonia_model_vtau_thin(ammonia_model_vtau): def __init__(self,parnames=['tkin', 'tau', 'width', 'xoff_v', 'fortho'], **kwargs): super(ammonia_model_vtau_thin, self).__init__(parnames=parnames, npars=5, **kwargs) self.modelfunc = ammonia_thin def _validate_parinfo(self, must_be_limited={'tkin': [True,False], 'tex': [False,False], 'ntot': [True, True], 'width': [True, False], 'xoff_v': [False, False], 'tau': [False, False], 'fortho': [True, True]}, required_limits={'tkin': [0, None], 'ntot': [5, 25], 'width': [0, None], 'fortho': [0,1]}): supes = super(ammonia_model_vtau_thin, self) return supes._validate_parinfo(must_be_limited=must_be_limited, required_limits=required_limits) def moments(self, Xax, data, negamp=None, veryverbose=False, **kwargs): """ Returns a very simple and likely incorrect guess """ # trot, tau, width, center, ortho fraction return [20, 1, 1.0, 0.0, 1.0] def __call__(self,*args,**kwargs): return self.multinh3fit(*args, **kwargs) def make_parinfo(self, params=(20,14,1.0,0.0,0.5), fixed=(False,False,False,False,False), limitedmin=(True,True,True,False,True), limitedmax=(False,False,False,False,True), minpars=(TCMB,0,0,0,0), maxpars=(0,0,0,0,1), tied=('',)*5, **kwargs ): return super(ammonia_model_vtau_thin, self).make_parinfo(params=params, fixed=fixed, limitedmax=limitedmax, limitedmin=limitedmin, minpars=minpars, maxpars=maxpars, tied=tied, **kwargs) class ammonia_model_background(ammonia_model): def __init__(self,**kwargs): super(ammonia_model_background,self).__init__(npars=7, parnames=['trot', 'tex', 'ntot', 'width', 'xoff_v', 'fortho', 'background_tb']) def moments(self, Xax, data, negamp=None, veryverbose=False, **kwargs): """ Returns a very simple and likely incorrect guess """ # trot, TEX, ntot, width, center, ortho fraction return [20,10, 10, 1.0, 0.0, 1.0, TCMB] def __call__(self,*args,**kwargs): #if self.multisingle == 'single': # return self.onepeakammoniafit(*args,**kwargs) #elif self.multisingle == 'multi': # # Why is tied 6 instead of 7? return self.multinh3fit(*args,**kwargs) def make_parinfo(self, npeaks=1, err=None, params=(20,20,14,1.0,0.0,0.5,TCMB), parnames=None, fixed=(False,False,False,False,False,False,True), limitedmin=(True,True,True,True,False,True,True), limitedmax=(False,False,False,False,False,True,True), minpars=(TCMB,TCMB,0,0,0,0,TCMB), parinfo=None, maxpars=(0,0,0,0,0,1,TCMB), tied=('',)*7, quiet=True, shh=True, veryverbose=False, **kwargs): return super(ammonia_model_background, self).make_parinfo(npeaks=npeaks, err=err, params=params, parnames=parnames, fixed=fixed, limitedmin=limitedmin, limitedmax=limitedmax, minpars=minpars, parinfo=parinfo, maxpars=maxpars, tied=tied, quiet=quiet, shh=shh, veryverbose=veryverbose, **kwargs) def multinh3fit(self, xax, data, npeaks=1, err=None, params=(20,20,14,1.0,0.0,0.5,TCMB), parnames=None, fixed=(False,False,False,False,False,False,True), limitedmin=(True,True,True,True,False,True,True), limitedmax=(False,False,False,False,False,True,True), minpars=(TCMB,TCMB,0,0,0,0,TCMB), parinfo=None, maxpars=(0,0,0,0,0,1,TCMB), tied=('',)*7, quiet=True, shh=True, veryverbose=False, **kwargs): return super(ammonia_model_background, self).multinh3fit(xax, data, npeaks=npeaks, err=err, params=params, parnames=parnames, fixed=fixed, limitedmin=limitedmin, limitedmax=limitedmax, minpars=minpars, parinfo=parinfo, maxpars=maxpars, tied=tied, quiet=quiet, shh=shh, veryverbose=veryverbose, **kwargs) class cold_ammonia_model(ammonia_model): def __init__(self, parnames=['tkin', 'tex', 'ntot', 'width', 'xoff_v', 'fortho'], **kwargs): super(cold_ammonia_model, self).__init__(parnames=parnames, **kwargs) self.modelfunc = cold_ammonia def _validate_parinfo(self, must_be_limited={'tkin': [True,False], 'tex': [False,False], 'ntot': [True, False], 'width': [True, False], 'xoff_v': [False, False], 'fortho': [True, True]}, required_limits={'tkin': [0, None], 'tex': [None,None], 'width': [0, None], 'ntot': [0, None], 'xoff_v': [None,None], 'fortho': [0,1]}): supes = super(cold_ammonia_model, self) return supes._validate_parinfo(must_be_limited=must_be_limited, required_limits=required_limits) def _increment_string_number(st, count): """ Increment a number in a string Expects input of the form: p[6] """ import re dig = re.compile('[0-9]+') if dig.search(st): n = int(dig.search(st).group()) result = dig.sub(str(n+count), st) return result else: return st