Source code for src.common.dataProcessingFlowManager

import matplotlib.pylab as plt
# import matplotlib
# matplotlib.use('qtagg')

from dataclasses import dataclass, field
import numpy as np
import sys,os
from .msgConfiguration import msg_wrapper
from .miscellaneousFunctions import check_for_nans, sig_to_noise
from .fitting import clean_rfi, fit_dual_beam, fit_beam # fit_gauss_lin
from .plotting import plot_no_fit, plot_scan, plot_overlap, plotPeakFit,plotDualBeamFit
# from scipy.optimize import curve_fit
# from .prepScans import PrepareScans
import logging
logging.getLogger('matplotlib.font_manager').disabled = True

# import warnings
# def fxn():
#     warnings.warn("deprecated", DeprecationWarning)
# with warnings.catch_warnings(action="ignore"):
#     fxn()

[docs] @dataclass class DataProcessingFlowManager: fileName:str #= "" freq:float #= np.nan src:str #= "" x:np.array #= np.zeros_like(1000) y:np.array #= np.zeros_like(1000) log:object flag:int applyRFIremoval:str savefolder:str srcTag:str pol:str frontend:str HPBW:float FNBW:float theoFit:str autoFit:str # run an automated fit on pre-selected baseline locations force:str = 'n' # force a fit on the data or not, initially set to no # fileName,frq,src,x,lcp,log,0,'y',saveTo,'HPN_LCP','LCP',frontend,hpbw,fnbw def __post_init__(self): print('\n') print('#'*80) msg_wrapper("info", self.log.info, "PROCESSING/FITTING DRIFTSCAN INITIATED") print('#'*80) print('\n') saveTo='currentScanPlots' plotSavePath=f'plots/{self.src}/{int(self.freq)}' ext='.png' self.plotName=self.fileName[:18] self.plotPath = f"{plotSavePath}/{self.plotName}_{self.srcTag+ext}" # create_current_scan_directory() # 1. check data for nans # ======================== self.y, self.flag=check_for_nans(self.y,self.log) # plot=ScanPlot(self.src,self.x,self.y,self.flag,self.log) if self.flag == 1: msg="Flag 1: Found nans" plot_no_fit(self.x, self.y, "Plot of fitting error: No data","", self.plotPath, msg, xlabel="", ylabel="") if "D" in self.frontend: ret= {"correctedData":[],"driftRes":[],"driftRms":np.nan, "driftCoeffs":[], "baseLocsCombined":[], "baseLocsLeft":[],"baseLocsRight":[], "leftPeakData":[],"leftPeakModelData":[], "leftPeakFit":np.nan, "leftPeakFitErr":np.nan,"leftPeakFitRes":[], "rightPeakData":[],"rightPeakModelData":[], "rightPeakFit":np.nan, "rightPeakFitErr":np.nan,"rightPeakFitRes":[], "msg":msg,"midXValueLeft":np.nan,"midXValueRight":np.nan } ret['flag']=2 self.__dict__[self.srcTag]=ret return else: ret={ # "peakFit":np.nan,"peakModel":[], "peakRms":np.nan,"correctedData":[],"peakPts":[], # "msg":"","driftRes":[],"driftRms":[],"driftCoeffs":[],"fitRes":[],"midXValue":np.nan, # "flag":1,"baseLocsLeft":[],"baseLocsRight":[],"baseLocsCombined":[],"peakLoc":[], # "baseLeft":[],"baseRight":[],'s2n':np.nan} "peakFit":np.nan,"peakModel":[], "peakRms":np.nan,"correctedData":[],"peakPts":[], "msg":"","driftRes":[],"driftRms":[],"driftCoeffs":[],"fitRes":[],"midXValue":np.nan, "flag":1,"baseLocsLeft":[],"baseLocsRight":[],"baseLocsCombined":[],"peakLoc":[], "baseLeft":[],"baseRight":[],'s2n':np.nan} ret['flag']=1 self.__dict__[self.srcTag]=ret return # sys.exit() else: # 2. Clean the data, remove RFI # =============================== # RFI can make fitting a drift scan imossible, this is why as # part of the cleaning process we eliminate or remove any data # that is an outlier, surpassing all other data points by more than # 3 times the rms. # clean the data of RFI contamination self.clean_data() # make initial processing plots # make plots of the current scan for source overview # ---------------------------------------------------------------------- tag=self.srcTag tagCleaned=self.srcTag+"Cleaned" src=self.src freq=int(self.freq) title=f"Plot of {self.src} file {self.fileName} {self.srcTag}" msg_wrapper("debug", self.log.debug, "\n") msg_wrapper("info", self.log.info, f'Plot path: {self.plotPath}') msg_wrapper("debug", self.log.debug, "\n") # plot raw data rawPath=f'{saveTo}/{self.plotName}_{tag}_raw.png' msg_wrapper("debug", self.log.debug, f'Plot {tag} raw data: {rawPath}') plot_scan(self.x, self.y,tag,f'Plot of {tag} raw data',rawPath) # plot cleaned data cleanPath=f'{saveTo}/{self.plotName}_{tag}_clean.png' msg_wrapper("debug", self.log.debug, f'Plot {tag} clean data: {cleanPath}') plot_scan(self.xCleaned, self.yCleaned,tag,f'Plot of {tag} clean data',cleanPath) # plot raw and cleaned data overlapPath=f'{saveTo}/{self.plotName}_{tag}_overlap.png' msg_wrapper("debug", self.log.debug, f'Plot {tag} overlap data: {overlapPath}') plot_overlap(self.x, self.y,self.xCleaned,self.yCleaned,'Plot of overlap data',"Raw",'Clean',overlapPath) # 3. Check rms for error handling of bad data # ============================================ if self.RMSA >= 1.0: msg="RMS >= 1.0" self.flag = 2 title=f"Plot of {self.src} file {self.fileName}" subtitle= "(Flag "+str(self.flag)+": High rms > 1)" msg_wrapper("warning", self.log.warning, msg) plot_no_fit(self.xCleaned, self.yCleaned, title,subtitle,self.plotPath ,self.srcTag) # print("High rms") if "D" in self.frontend: ret= {"correctedData":[],"driftRes":[],"driftRms":np.nan, "driftCoeffs":[], "baseLocsCombined":[], "baseLocsLeft":[],"baseLocsRight":[], "leftPeakData":[],"leftPeakModelData":[], "leftPeakFit":np.nan, "leftPeakFitErr":np.nan,"leftPeakFitRes":[], "rightPeakData":[],"rightPeakModelData":[], "rightPeakFit":np.nan, "rightPeakFitErr":np.nan,"rightPeakFitRes":[], "msg":msg,"midXValueLeft":np.nan,"midXValueRight":np.nan } ret['flag']=2 self.__dict__[self.srcTag]=ret return else: ret={ "peakFit":np.nan,"peakModel":[], "peakRms":np.nan,"correctedData":[],"peakPts":[], "msg":"","driftRes":[],"driftRms":[],"driftCoeffs":[],"fitRes":[],"midXValue":np.nan, "flag":1,"baseLocsLeft":[],"baseLocsRight":[],"baseLocsCombined":[],"peakLoc":[], "baseLeft":[],"baseRight":[],'s2n':np.nan} ret['flag']=2 self.__dict__[self.srcTag]=ret return # sys.exit() else: # 4. Correct baseline and Create a new data set with no drift # ============================================================= if 'S' in self.frontend: # sys.exit() # fit the data using a gaussian # set initial parameters for data fitting p0 = [max(self.yCleaned), np.mean(self.xCleaned), self.HPBW, .01, .0] # self.peakFit, self.peakModel, self.peakRms, yCorrected, peakPts, msg, \ # self.driftRes, self.driftRms, self.driftCoeffs, self.fitRes, self.mid, \ # self.flag, self.baseLeft, self.baseRight, self.base, self.peakLoc, \ # self.lb,self.rb = saveTag=f'{saveTo}/{self.plotName}_{tag}_' #fit_beam(x, y, p, fnbw, force, log, saveTag, fitTheoretical, autoFit=None) # print(self.autoFit) # sys.exit() ret = fit_beam(self.xCleaned, self.yCleaned, p0, self.FNBW, self.force, self.log, saveTag, self.theoFit['value'], self.autoFit['value']) # print(len(ret['correctedData'])>1) print(ret.keys()) # sys.exit() if len(ret['correctedData'])>1: if max(ret['peakModel']) < 0: # self.flag=12 ret['flag']=12 saveTo=f'{plotSavePath}/{self.plotName}_{self.srcTag}{ext}' plot_no_fit(self.x, self.y, "Plot of fitting error: Peak max < 0","", saveTo, 'Peak max < 0', xlabel="", ylabel="") # ret['s2n']=np.nan # print(ret) sys.exit() elif ret['peakRms'] > 1.: # self.flag=13 ret['flag']=13 saveTo=f'{plotSavePath}/{self.plotName}_{self.srcTag}{ext}' plot_no_fit(self.x, self.y, "Plot of fitting error: Peak rms > 1","", saveTo, 'Peak rms > 1', xlabel="", ylabel="") # print(ret) # ret['s2n']=np.nan elif np.isinf(ret['peakRms']): # self.flag=14 ret['flag']=14 saveTo=f'{plotSavePath}/{self.plotName}_{self.srcTag}{ext}' plot_no_fit(self.x, self.y, "Plot of fitting error: RMS is +- inf","", saveTo, 'RMS is +- inf', xlabel="", ylabel="") sys.exit() else: # ============================================= # Estimate the signal to noise ratio # ============================================= msg=f"Peak = {ret['peakFit']:.3f} +- {ret['peakRms']:.3f} [K]" #.format(, ) msg_wrapper("info", self.log.info, msg) s2n = sig_to_noise(max(ret['correctedData']), ret['driftRes'], self.log) msg2="S/N: {:.2f}".format(s2n) msg_wrapper("info", self.log.info, msg2) ret['s2n']=s2n if s2n < 3.0: msg=("signal to noise ratio < 3") # self.flag = 15 ret['flag']=15 msg_wrapper("warning", self.log.warning, msg) else: pass else: msg_wrapper("error", self.log.error,'No corrected data found') ret['flag']=55 # check if this is valid ret['s2n']=np.nan # print(ret) # sys.exit() saveTo=f'{plotSavePath}/{self.plotName}_{self.srcTag}{ext}' plot_no_fit(self.x, self.y, " No corrected data found","", saveTo, 'No corrected data', xlabel="", ylabel="") # sys.exit() self.__dict__[self.srcTag]=ret # ret={ # "peakFit":np.nan,"peakModel":[], "peakRms":np.nan,"correctedData":[],"peakPts":[], # "msg":"","driftRes":[],"driftRms":[],"driftCoeffs":[],"fitRes":[],"midXValue":np.nan, # "flag":1,"baseLocsLeft":[],"baseLocsRight":[],"baseLocsCombined":[],"peakLoc":[], # "baseLeft":[],"baseRight":[],'s2n':np.nan} # ret['flag']==55 # self.__dict__[self.srcTag]=ret return # print(self.srcTag) self.__dict__[self.srcTag]=ret # self.flag=ret['flag'] # print(ret.keys()) saveTo=f'{plotSavePath}/{self.plotName}_{self.srcTag}{ext}' title=f'Plot of {self.plotName}_{self.srcTag}' plotPeakFit(title,self.xCleaned,ret['correctedData'],ret['peakModel'],ret['peakRms'],ret['peakPts'],\ saveTo,xlabel="",ylabel="") # print(self.__dict__) # sys.exit() elif 'D' in self.frontend: if self.frontend=='03.5D': npts=100 factor=.6 # .65 (cal/tar) elif self.frontend=='06.0D': npts=150 # number of points used for baseline fit factor=0.55 #.8 (cal/tar) else: print(f'Invalid frontend: {self.frontend}') sys.exit() saveTag=f'{saveTo}/{self.plotName}_{tag}_' ret = fit_dual_beam(self.xCleaned, self.yCleaned, self.HPBW, self.FNBW, factor,saveTag,self.log) # fit_dual_beam(x, y, hpbw, fnbw, factor, npts, dec, srcType, data, scanNum, force, log): # ret={"correctedData":yCorrected,"driftRes":driftRes,"driftRms":driftRms, # "driftCoeffs":driftCoeffs, "baseLocsCombined":baseLocs, # "baseLocsLeft":baseLocsl,"baseLocsRight":baseLocsr, # "leftPeakData":ypeakldata,"leftPeakModelData":ypeakl, # "leftPeakFit":ymax, "leftPeakFitErr":err_peakl,"leftPeakFitRes":fitResl, # "rightPeakData":ypeakrdata,"rightPeakModelData":ypeakr, # "rightPeakFit":ymin, "rightPeakFitErr":err_peakr,"rightPeakFitRes":fitResr, # "msg":"","midXValueLeft":peakLoca,"midXValueRight":peakLocb, # "flag":flag # } # print(ret) # sys.exit() if len(ret['correctedData'])>1: if ret['leftPeakFitErr'] > 1. or ret['rightPeakFitErr'] > 1.: msg = "Peak rms > 1" self.flag = 13 msg_wrapper("warning", self.log.warning, msg) saveTo=f'{plotSavePath}/{self.plotName}_{self.srcTag}{ext}' plot_no_fit(self.x, self.y, msg,"", saveTo, msg, xlabel="", ylabel="") # ret['s2na']=np.nan # ret['s2nb']=np.nan elif np.isinf(ret['leftPeakFitErr']) or np.isinf(ret['rightPeakFitErr']): msg = "Rms is +- inf" self.flag = 14 msg_wrapper("warning", self.log.warning, msg) saveTo=f'{plotSavePath}/{self.plotName}_{self.srcTag}{ext}' plot_no_fit(self.x, self.y, msg,"", saveTo, msg, xlabel="", ylabel="") else: # ============================================= # Estimate the signal to noise ratio # ============================================= # A beam msg=f"Peak A= {ret['leftPeakFit']:.3f} +- {ret['leftPeakFitErr']:.3f} [K]" #.format(, ) msg_wrapper("info", self.log.info, msg) s2na = sig_to_noise(abs(max(ret['leftPeakData'])), ret['driftRes'], self.log) msga="S/N: {:.2f}".format(s2na) msg_wrapper("info", self.log.info, msga) ret['s2na']=s2na # B beam msg=f"Peak B = {ret['rightPeakFit']:.3f} +- {ret['rightPeakFitErr']:.3f} [K]" #.format(, ) msg_wrapper("info", self.log.info, msg) s2nb = sig_to_noise(abs(min(ret['rightPeakData'])), ret['driftRes'], self.log) msgb="S/N: {:.2f}".format(s2nb) msg_wrapper("info", self.log.info, msgb) ret['s2nb']=s2nb # print("S/N A beam: {:.2f} \nS/N B beam: {:.2f}".format(s2na, s2nb)) if s2na < 3. or s2nb < 3.: msg="Got a s2n < 3" self.flag = 15 ret['flag']=self.flag # ret['s2na']=np.nan # ret['s2nb']=np.nan msg_wrapper("warning", self.log.warning, msg) else: pass else: msg_wrapper("error", self.log.error,'No corrected data found') ret['flag']=55 # check if this is valid ret['s2na']=np.nan ret['s2nb']=np.nan saveTo=f'{plotSavePath}/{self.plotName}_{self.srcTag}{ext}' plot_no_fit(self.x, self.y, "No corrected data found","", saveTo, 'No corrected data', xlabel="", ylabel="") # sys.exit() self.__dict__[self.srcTag]=ret # ret={ # "peakFit":np.nan,"peakModel":[], "peakRms":np.nan,"correctedData":[],"peakPts":[], # "msg":"","driftRes":[],"driftRms":[],"driftCoeffs":[],"fitRes":[],"midXValue":np.nan, # "flag":1,"baseLocsLeft":[],"baseLocsRight":[],"baseLocsCombined":[],"peakLoc":[], # "baseLeft":[],"baseRight":[],'s2n':np.nan} # ret['flag']==55 return # ret={"correctedData":yCorrected,"driftRes":driftRes,"driftRms":driftRms, # "driftCoeffs":driftCoeffs, "baseLocsCombined":baseLocs, # "baseLocsLeft":baseLocsl,"baseLocsRight":baseLocsr, # "leftPeakData":ypeakldata,"leftPeakModelData":ypeakl, # "leftPeakFit":ymax, "leftPeakFitErr":err_peakl,"leftPeakFitRes":fitResl, # "rightPeakData":ypeakrdata,"rightPeakModelData":ypeakr, # "rightPeakFit":ymin, "rightPeakFitErr":err_peakr,"rightPeakFitRes":fitResr, # "msg":"","midXValueLeft":peakLoca,"midXValueRight":peakLocb, # "flag":flag # } self.__dict__[self.srcTag]=ret saveTo=f'{plotSavePath}/{self.plotName}_{self.srcTag}{ext}' title=f'Plot of {self.plotName}_{self.srcTag}' plotDualBeamFit(self.xCleaned,ret['correctedData'],ret['leftPeakData'],\ ret['leftPeakModelData'],ret['rightPeakData'],\ ret['rightPeakModelData'],title,\ 'corrected data',\ "T$_{A}$ [K]"+f" = {ret['leftPeakFit']:.3f} +- {ret['leftPeakFitErr']:.3f}",\ "T$_{A}$ [K]"+f" = {ret['rightPeakFit']:.3f} +- {ret['rightPeakFitErr']:.3f}",\ saveTo,xlabel="",ylabel="") else: print(f'Invalid frontend: {self.frontend}') sys.exit()
[docs] def fit_theoretical_baselines(self, coeff): """Fit the data at the theoretical baseline location.""" print(coeff) hfnbw=self.FNBW/2 offset=self.xCleaned # Get indices where the offset values are within the main beam offset_masked, amp_masked, main_beam=self.locate_baseline_blocks_masked(self,offset,coeff) #fit a polynomial of any order lin_first_null = np.poly1d(np.ma.polyfit(x=offset_masked, y=amp_masked, deg=3)) #Subtract the polynomial fitted to the baseline, then fit 2nd-order polynomial through the top of the beam. base_sub = self.yCleaned - lin_first_null(offset) self.flag=26 return base_sub,main_beam,offset_masked, amp_masked
[docs] def make_initial_plots(self,plot,title,plotlab, plotlabCleaned): """Make initial plots of the raw and cleaned data""" plot.add_data(plotlab,f'{title} raw data', self.x, self.y, "Scandist [Deg]", "T$_{A}$ [K]", self.pol, f'currentScanPlots/{self.srcTag}_raw.png') plot.add_data(plotlabCleaned,f'{title} cleaned data', self.xCleaned, self.yCleaned, "Scandist [Deg]", "T$_{A}$ [K]", self.pol, f'currentScanPlots/{self.srcTag}_cleaned.png') plot.plot_beam(plotlab) plot.plot_beam(plotlabCleaned) plot.overplot_2_scans(plotlab,plotlabCleaned,f"{title} raw vs cleaned data",'raw','cleaned', f'currentScanPlots/{self.srcTag}_overlay_data.png') return plot
[docs] def clean_data(self): """ Clean the data of possible RFI. Remove all data points larger that 3 times the standard deviation. """ if self.applyRFIremoval=='n': self.xCleaned=self.x self.yCleaned=self.y self.spl=np.nan self.pt=np.nan self.RMSB=np.nan self.RMSA=np.nan else: self.xCleaned, self.yCleaned,self.RMSB, self.RMSA, self.spl, self.pt = clean_rfi( self.x, self.y,self.log)
[docs] def locate_main_beam_data(self,offset,coeff): # Get indices where the offset values are within the main beam hfnbw=self.FNBW/2. main_beam = np.where(np.logical_and(offset >= coeff[1]-hfnbw, offset <= coeff[1]+hfnbw))[0] return main_beam
[docs] def locate_baseline_blocks_masked(self,offset,y,coeff): """ Locate baseline by masking the main beam. This doesn't work well because the data is not perfect and may contain sidelobes which may not necessarily be best addressed by looking at using higher order polynomials.""" main_beam = self.locate_main_beam_data(self,offset,coeff) #mask the main beam for polynomial fit to the baseline mask = np.zeros(len(offset)) mask[main_beam] = 1 offset_masked = np.ma.array(offset,mask=mask) amp_masked = np.ma.array(y, mask=mask) return offset_masked, amp_masked, main_beam
[docs] def locate_baseline_blocks_fnbw(self,x,hfnbw): """ Find the locations to fit a baseline. We assume the baseline gives a good fit beyond the fnbw points on either side of the beam. """ # If all looks we'll get the locations or positions where # a first order polynomial is going to be fit in order to # correct for any drift in the data. We are assuming that the # data is devoid of any RFI beyond the fnbw locations on either # side of the main beam left=-hfnbw#coeff[1]-hfnbw right=hfnbw#coeff[1]+hfnbw baseLine = np.where(np.logical_or( x <= left, x >= right))[0] # get coeffecients of a first order polynomial fit to # either ends of the baseline selection locations # baseline locations to the left of the beam leftBaseline = np.where(x <= left)[0] # baseline locations to the right of the beam rightBaseline = np.where(x >= right)[0] sidelobes=0 # no sidelobes/sidelobes ignored return baseLine, leftBaseline, rightBaseline, sidelobes