from .msgConfiguration import msg_wrapper
from .plotting import *
from scipy import interpolate
from sklearn.metrics import mean_squared_error
import numpy as np
# import matplotlib.pyplot as pl
from scipy.optimize import curve_fit
import sys
import bisect
# import warnings
# def fxn():
# warnings.warn("deprecated", DeprecationWarning)
# with warnings.catch_warnings(action="ignore"):
# fxn()
[docs]
def clean_rfi(x, y, log=""):
"""
Clean the RFI in the data using iterative rms cuts.
Parameters:
x (array): Array of the independent variable
y (array): Array of the dependent variable
log (object): file logging object
returns:
finX (array): the data representing the x-axis after the removal of rfi data
finY (array):the data representing the y-axis after the removal of rfi data
rmsBeforeClean (int): the rms before removal of rfi data
rmsAfterClean (int): the rms after removal of rfi data
finspl (array): the spline of the cleaned data
pointsDeleted (int): number of points deleted when cleaning the data
"""
msg_wrapper("debug", log.debug, "Cleaning the data of RFI")
# spline the data
splined_data = spline(x, y,log=log)
scanLen = len(x)
resRFI, rmsBeforeClean, rmsAfterClean, finX, finY, finRes, finMaxSpl, finspl, pointsDeleted = clean_data(
splined_data, x, y, scanLen,log)
msg_wrapper("debug", log.debug, "Splined RMS before: after cleaning -> {:.3f}: {:.3f} \n".format
(rmsBeforeClean, rmsAfterClean))
return finX, finY, rmsBeforeClean, rmsAfterClean, finspl, pointsDeleted
[docs]
def clean_data(spl, x, y, scanLen, log=""):
"""
Clean the data using iterative fitting.
Args:
spl : 1d array
the splined data
x : 1d array
data representing the x-axis
y : 1d array
data representing the y-axis
scanLen : int
length of the drift scan array
log : object
file logging object
Returns:
resRFI: 1d array
the residual before the rfi has been removed
rmsBeforeClean: int
the rms before removing rfi data
rmsAfterClean: int
the rms after removal of rfi data
finX: 1d array
the data representing the x-axis after the removal of rfi data
finY: 1d array
the data representing the y-axis after the removal of rfi data
finRes: 1d array
the residual of the cleaned data after the rfi has been removed
finMaxSpl: int
the maximum of the spline of the cleaned data
finspl: 1d array
the spline of the cleaned data
pointsDeleted: int
number of points deleted when cleaning the data
"""
msg_wrapper("debug", log.debug, "Iterative cleaning of RFI")
# calculate the residual and clean the data
resRFI, rmsBeforeClean = calc_residual(spl, y,log)
finX, finY, rmsAfterClean, finRes, finMaxSpl, finspl, pointsDeleted = clean_data_iterative_fitting(
x, y, scanLen, resRFI, rmsBeforeClean,log)
return resRFI, rmsBeforeClean, rmsAfterClean, finX, finY, finRes, finMaxSpl, finspl, pointsDeleted
[docs]
def calc_residual(model, data, log=""):
"""
Calculate the residual and rms between the model and the data.
Parameters:
model (array): 1D array containing the model data
data (array): 1D array containing the raw data
log (object): file logging object
Returns
-------
res: 1d array
the residual
rms: int
the rms value
"""
res = np.array(model - data)
rms = np.sqrt(mean_squared_error(data, model))
return res, rms
[docs]
def clean_data_iterative_fitting(x, y, scanLen, res, rms, log="",x2=""):
''' Find the best fit to the data by iteratively eliminating data points
that fall beyond an established cut-off limit.
Parameters
----------
x : 1d array
data representing the x-axis
y : 1d array
data representing the y-axis
scanLen : int
length of the drift scan array
res: 1d array
the residual
rms: int
the rms value
log : object
file logging object
x2 : 1d array
filenames
Returns
-------
finalX: 1d array
the data representing the x-axis after the removal of rfi data
finalY: 1d array
the data representing the y-axis after the removal of rfi data
finalRms: int
the rms after removal of rfi data
finRes: 1d array
the residual of the cleaned data after the rfi has been removed
finMaxSpl: int
the maximum of the spline of the cleaned data
finalSplinedData: 1d array
the spline of the cleaned data
pointsDeleted: int
number of points deleted when cleaning the data
'''
#msg_wrapper("debug", log.debug, "Performing RFI cuts on data")
# ToDo: Make this more effecient
# set initial values
smallestY = y
smallestX = x
# set final value parameters
finalNames=[]
finalX = []
finalY = []
finalRms = []
finalRes = []
finalMaxSpl = []
finalSplinedData = []
smallest = True
loop = 0
while smallest:
#While you don't have the smallest rms, process data
loop = loop+1
#print('loop: ',loop)
#Remove spikes
if len(x2)==0:
newX, newY = _remove_RFI(smallestX, smallestY, res, rms)
else:
newX, newY ,newNames= _remove_RFI(smallestX, smallestY, res, rms,log,x2)
finalNames=newNames
newX = np.array(newX)
newY = np.array(newY)
#spline the data if you found RFI
splineData2 = spline(newX, newY,log=log)
maxSplineData2 = max(splineData2)
res2, rms2 = calc_residual(splineData2, newY)
if rms2 < rms: # get better values
rms = rms2
res = res2
smallestX = newX
smallestY = newY
finalX = newX
finalY = newY
finalRms = rms2
finalRes = res2
finalSplinedData = splineData2
finalMaxSpl = maxSplineData2
if len(x2)!=0:
finalNames=newNames
smallest = True
else:
smallest = False
# If the rms cut only looped once, values dont change
if loop == 1:
# If you already have good data, keep values as they are
finalX = x
finalY = y
finalRms = rms
finalRes = res
#spline the data
finalSplinedData = spline(x, y,log=log)
finalMaxSpl = max(finalSplinedData)
if len(x2)==0:
#print('p')
finalNames=x2
pointsDeleted = scanLen-len(finalY)
if len(x2)==0:
return finalX, finalY, finalRms, finalRes, finalMaxSpl, finalSplinedData, pointsDeleted
else:
print(f'\n{abs(len(x)-len(finalNames))} entries removed, after {loop} iterations\n{len(finalNames)} remaining\n')
return finalX, finalY, finalRms, finalRes, finalMaxSpl, finalSplinedData, pointsDeleted, finalNames
def _remove_RFI(x, y, res, rms, log="",x2=""):
'''
Removes data points with a residual cutt-off
more or less than 2.7*rms
Parameters
----------
x : 1d array
data representing the x-axis
y : 1d array
data representing the y-axis
res: 1d array
the residual
rms: int
the rms value
log : object
file logging object
Returns
-------
cleanX: 1d array
array of data representing the x-axis
cleanY: 1d array
array of data representing the cleaned y-axis data
'''
scanLen = len(x)
cleanX, cleanY, names = ([] for i in range(3)) # create new lists
cut=3#2.7
if len(x2)==0:
for i in range(scanLen):
if(abs(res[i]) > cut * rms): # 2.5, 2.7 3.0
pass
else:
# Keep points that lie within the cut_off
cleanX.append(x[i])
cleanY.append(y[i])
return cleanX, cleanY
else:
#print('n')
for i in range(scanLen):
if(abs(res[i]) > cut* rms): # 2.5, 2.7 3.0
pass
else:
# Keep points that lie within the cut_off
cleanX.append(x[i])
cleanY.append(y[i])
names.append(x2[i])
#print('len names: ',len(names))
return cleanX, cleanY, names
[docs]
def gauss_lin(x, *p):
"""
Generate a gaussian plus first-order polynomial to fit the drift
scan beam. Note that the width of the Gaussian is hard-coded
to the half-power beamwidth.
Parameters
----------
x : 1D array
1D array of data representing the x-axis
p : tuple
tuple of gaussian parameters
Returns
-------
gaussFit: 1d array
array of data representing the gaussian fit
"""
amp, mu, hpbw, a, c = p
sigma = hpbw/(2*np.sqrt(2*np.log(2)))
return amp*np.exp(-(x-mu)**2/(2.*sigma**2)) + a*x + c
[docs]
def spline(x, y, anchor_points=9, order=3,log=""):
'''
Given a set of data points (x,y) determine a smooth spline
approximation of degree k on the interval x[0] <= x <= x[n]
Args:
x (array): 1D array of data representing the x-axis
y (array): 1D array of data representing the y-axis
anchor_points (int): the number of anchor points in the data
order (int): polynomial order to fit, preferrably a cubic spline
Return:
spline_fit (array): Spline of the data
'''
try:
msg_wrapper("debug", log.debug, "Spline the data to get estimate of underlying signal")
except:
pass
scanLen = len(x)
if scanLen <= anchor_points*2:
print("Too few points to interpolate, consider entering lower knot points")
print("spline not set")
spline_fit=[]
else:
#anchor_points=7
anchor_points_intervals = scanLen/anchor_points # intervals where anchor points will be placed
anchor_points_pos = [] # list to hold positions where anchor_points will be placed olong the array
anchor_points_counter = 0 # position counter or locator
# create a list of the positions where the anchor_points will be located
#anchor_points=10
for i in range(anchor_points-1):
anchor_points_counter = anchor_points_counter + anchor_points_intervals
anchor_points_pos.append(int(anchor_points_counter))
# interpolate the data
# create linearly spaced data points
x1 = np.linspace(1, scanLen, scanLen)
x2 = np.array((anchor_points_pos), int) # create array of anchor_points positions
try:
tck = interpolate.splrep(x1, y, k=order, task=-1,
t=x2) # interpolate, k=5 is max
spline_fit = interpolate.splev(x1, tck, der=0)
except:
print("Failed to interpolate, Error on input data,too few points, min required = 9")
spline_fit=y
return spline_fit#,anchor_points_pos
[docs]
def spline_fit(x, y, anchor_points=9, order=3):
'''
Given a set of data points (x,y) determine a smooth spline
approximation of degree k on the interval x[0] <= x <= x[n]
Args:
x (array): 1D array of data representing the x-axis
y (array): 1D array of data representing the y-axis
anchor_points (int): the number of anchor points in the data
order (int): polynomial order to fit, preferrably a cubic spline
Return:
spline_fit (array): Spline of the data
'''
nx=[]
ny=[]
# remove nan values
for i in range(len(x)):
#print(i,y[i],type(y[i]))
if y[i] == np.nan or str(y[i]) =="nan":
#print(i,y[i])
pass
else:
nx.append(x[i])
ny.append(y[i])
scanLen = len(nx)
if scanLen <= anchor_points*2:
print("Too few points to interpolate, consider entering lower knot points")
print("spline not set")
spline_fit=[]
else:
#anchor_points=7
anchor_points_intervals = scanLen/anchor_points # intervals where anchor points will be placed
anchor_points_pos = [] # list to hold positions where anchor_points will be placed olong the array
anchor_points_counter = 0 # position counter or locator
# create a list of the positions where the anchor_points will be located
#anchor_points=10
for i in range(anchor_points-1):
anchor_points_counter = anchor_points_counter + anchor_points_intervals
anchor_points_pos.append(int(anchor_points_counter))
# interpolate the data
# create linearly spaced data points
x1 = np.linspace(1, scanLen, scanLen)
x2 = np.array((anchor_points_pos), int) # create array of anchor_points positions
try:
tck = interpolate.splrep(x1, ny, k=order, task=-1,
t=x2) # interpolate, k=5 is max
spline_fit = interpolate.splev(x1, tck, der=0)
except:
print("Failed to interpolate, Error on input data,too few points, min required = 9")
spline_fit=ny
#print(len(x),len(spline_fit))
#print(spline_fit)
# pl.plot(x,y,'k.')
# pl.plot(nx,spline_fit,'r')
# # pl.plot(nx[anchor_points_pos],spline_fit[anchor_points_pos],'r.')
# pl.show()
# pl.close()
# print('closed')
# sys.exit()
return nx,spline_fit#,anchor_points_pos
[docs]
def gauss(x, *p):
"""
Gaussian for fitting the beam
"""
amp, mu, hpbw = p
sigma = hpbw/(2*np.sqrt(2*np.log(2))) #a bit messy but not sure how to pass a constant through the curve fitting
return amp*np.exp(-(x-mu)**2/(2.*sigma**2))
[docs]
def test_gauss_fit(x,y,p0,log=""):
"""
Fit the data using a gaussian
Arguments:
p0: initial fit guess
"""
# fit initial guess parameters to data and get best fit parameters
# returns best fit coeffecients and errors
# Curve fitting is a type of optimization that finds an optimal set
# of parameters for a defined function that best fits a given set of observations.
try:
coeff, covarMatrix = curve_fit(gauss_lin, x,y, p0)
fit = gauss_lin(x, *coeff)
try:
msg_wrapper("info", log.info, f"Passed gaussian fit test, Max peak = {max(fit):.3f} [K]")
except:
pass
return coeff,fit,""
except Exception as e:
print(e)
try:
msg_wrapper("warning", log.warning, "gaussian curve_fit algorithm failed")
except:
pass
flag=3
return [],[],flag
[docs]
def test_position_validity(localMaxPositions,localMinPositions,maxPoints):
"""
Test the position validity of the local min/max positions. The
local minimum positions cannot fall within the range of the
local maximum positions.
Args:
localMaxPositions (list): list of local max positions
localMinPositions (list): list of local min positions
p (int): maximum number of permissable points.
Returns:
pointsToDelete: index list of positions to delete
"""
pointsToDelete = []
halfMaxPoints=int(maxPoints/2)
for i in range(len(localMaxPositions)):
# create list of max positions bounded by the condition
window=np.arange(localMaxPositions[i]-halfMaxPoints,localMaxPositions[i]+halfMaxPoints,1)
for j in range(len(localMinPositions)):
if localMinPositions[j] in window:
print(j, localMinPositions[j] , i, localMaxPositions[i])#, window)
pointsToDelete.append(localMinPositions[j] )
return pointsToDelete
[docs]
def locate_baseline_blocks_auto(x,y,peakCenterGuess, hfnbw,log,saveLoc):
"""
Find the locations to fit a baseline automatically.
These locations are found/determined by fitting a
spline to the data and using the
locations of the local minimum as baseline regions.
Parameters:
peakCenterGuess (float): value of x at peak center in x array
hfnbw (float): half the first null beam width
log (object): loffing object
"""
# setup parameters
msg = "" # message to write on plot
flag = 0 # error flag, default to no error detected
# generate a spline to help locate where to best fit our baseline model.
yspl = spline(x, y,log=log)
# find location of maximum peak, assumed to be at center
try:
peakPosition = (np.where(x >= peakCenterGuess)[0])[0]
peakSpline = np.where(yspl == max(yspl))[0]
except Exception:
msg = "Failed to determine center peak location"
msg_wrapper("warning",log.warning,msg)
flag = 22
return [], [], [], 0, flag, msg, 0, 0, 0, 0
# LOCATE LOCAL MINUMUM/MAXIMUM POSITIONS
# these values are used to figure out where to
# select our baseline points/locations for
msg_wrapper("debug",log.debug,'Locate local min and max positions')
localMinPositions = (np.diff(np.sign(np.diff(yspl))) > 0).nonzero()[0] + 1 # local min positions / first nulls
localMaxPositions = (np.diff(np.sign(np.diff(yspl))) < 0).nonzero()[0] + 1 # local max positions / peak
msg=f'Found positions of possible local mins at: {localMinPositions}, and local maxs at: {localMaxPositions}'
msg_wrapper("debug",log.debug,msg)
# get maximum of driftscan
ymax = max(yspl)
# Delete local min point within 50 points of either side of a local max point from the positions found above
maxPoints=50
pointsToDelete=test_position_validity(localMaxPositions,localMinPositions,maxPoints)
# If any invalid points found, delete them
if len(pointsToDelete)!=0:
for k in range(len(pointsToDelete)):
if pointsToDelete[k] in localMinPositions:
localMinPositions=list(localMinPositions)
ind=localMinPositions.index(pointsToDelete[k])
del localMinPositions[ind]
#ensure that the points are in an array
localMinPositions=np.array(localMinPositions)
msg=f'After validating points, the accepted positions for -> Min locs: {localMinPositions}, and Max locs: {localMaxPositions}'
msg_wrapper("debug",log.debug,msg)
# if len(localMinPositions)=0 try to establish other possible locations
if len(localMinPositions) == 0 :
msg = "- Failed to locate local min positions, set to FNBW locs"
msg_wrapper("warning", log.warning, msg)
flag = 21
# sys.exit()
try:
h=np.where(x<=-hfnbw)[0][-1]
print(h)
except:
h=x[0]
try:
#TODO write better notes on why i do this
hh=[np.where(x>=hfnbw)[0][0]][0]
except:
hh=x[-1]
localMinPositions=[h,hh]
# if len(localMinPositions) still = 0 and len(localMaxPositions) == 0
if len(localMinPositions) == 0 or len(localMaxPositions) == 0:
msg = "Failed to locate local min and max positions"
msg_wrapper("warning", log.warning, msg)
flag = 19
# print(msg)
# sys.exit()
return [], [], [], 1, flag, msg, 0, 0, 0, 0
# localMinPositions must be = 2, number of local minimum positions found
# IF MORE ARE FOUND WE MOST PROBABLY HAVE SIDELOBES
numberOfMinPositions = len(localMinPositions)
# Find the index or insertion point for the peakPosition in the
# locMinPositions list. i.e. where we most likely expect the
# peak to be located.
peakInd = bisect.bisect(localMinPositions, peakPosition)
scanLen = len(x)
# basic parameter info
msg_wrapper("debug", log.debug, "\n")
msg_wrapper("debug", log.debug, "-"*30)
msg_wrapper("debug", log.debug,"Drift scan basic info: ")
msg_wrapper("debug", log.debug,"-"*30)
msg_wrapper("debug", log.debug,f"Found local minimums at = {localMinPositions}")
msg_wrapper("debug", log.debug,f"Found local maximums at = {localMaxPositions}")
msg_wrapper("debug", log.debug, f"no. of local min positions = {numberOfMinPositions}")
msg_wrapper('debug', log.debug, f"index of peak in local mins = {peakInd}") # i.e. if peak were inserted between the local mins, what would its index be
# check location of peak
msg_wrapper("debug", log.debug, "\n")
msg_wrapper("debug", log.debug, "-"*20)
msg_wrapper("debug", log.debug, "Peak locations: ")
msg_wrapper("debug", log.debug, "-"*20)
msg_wrapper("debug", log.debug, f"Peak pos from Gauss fit: {peakPosition}")
msg_wrapper("debug", log.debug, f"Peak pos from Spline fit: {peakSpline[0]}")
msg_wrapper("debug", log.debug, f"Peak pos if at mid of scan: {int(scanLen/2)}")
# Ensure peak falls within expected range, beyond 25% of the beginning of the drift scan and
# below 75% of the end of the drift scan.
peakLocMinLimit = int(scanLen*.25)
peakLocMaxLimit = int(scanLen*.75)
# If the code has struggled to locate the local min/max,
if ( peakPosition < peakLocMinLimit or peakPosition > peakLocMaxLimit ):
msg = "Peak not within expected range."
msg_wrapper("info", log.info, msg)
flag=20
msg_wrapper("info", log.info, f"Peak limit left: {peakLocMinLimit}")
msg_wrapper("info", log.info, f"Peak position: {peakPosition}")
msg_wrapper("info", log.info, f"Peak limit right: {peakLocMaxLimit}")
# sys.exit()
return [],[],[],1,flag,msg,[],[],0,0
else:
# locate/setup fnbw location on left and right of beam
# if we can't locate base locations program defaults
# to fnbw locations
try:
leftFNBWPoint = (np.where(x >= (peakCenterGuess-hfnbw))[0])[0]
except:
leftFNBWPoint = 0
try:
rightFNBWPoint = (np.where(x >= (peakCenterGuess+hfnbw))[0])[0]
except:
rightFNBWPoint = len(x)
if rightFNBWPoint == len(x) or leftFNBWPoint == 0:
flag=27
msg="Failed to locate FNBW locations"
msg_wrapper("info", log.info, msg)
# print(msg)
# sys.exit()
return [], [], [], 1, flag, msg, 0, 0, 0, 0
# msg_wrapper("debug", log.debug, "\n")
msg_wrapper("debug", log.debug, "\n")
msg_wrapper("debug", log.debug, "-"*20)
msg_wrapper("debug", log.debug, "Locate/setup location of FNBW: ")
msg_wrapper("debug", log.debug, "-"*20)
msg_wrapper("debug", log.debug, f"FNBW: {hfnbw*2}")
msg_wrapper("debug", log.debug, f"HFNBW: {hfnbw}")
msg_wrapper("debug", log.debug, f"left fnbw loc: {leftFNBWPoint}")
msg_wrapper("debug", log.debug, f"right fnbw loc: {rightFNBWPoint}")
# SEARCH FOR MINIMUMS
# We only need two points, one on left of peak and one on right of peak
# if we have more points this means there are side lobes and we
# need to adjust the baseline accordingly
msg_wrapper("debug", log.debug, "\n")
msg_wrapper("debug", log.debug, "-"*20)
msg_wrapper("debug", log.debug, "Finding locations of local minimum from scan: ")
msg_wrapper("debug", log.debug, "-"*20)
# set fault parameters
rf = 0 # right is faulty rf=1
lf = 0 # left is faulty lf=1
if peakInd == 0 and len(localMinPositions) == 1:
# there is only one peak index position found
msg_wrapper("debug", log.debug, f'peak ind: {peakInd}, local min pos: {len(localMinPositions)}')
if localMinPositions[peakInd] < peakPosition:
# the position found is to the left of the assumed peak location
minPosOnLeftOfPeak = localMinPositions
minPosOnRightOfPeak = rightFNBWPoint
flag = 6
rf = 1
msg = "Failed to locate right min pos"
msg_wrapper("info", log.info,msg)
msg_wrapper("debug", log.debug,
"leftMinLoc found: {}\n".format(minPosOnLeftOfPeak))
msg_wrapper("debug", log.debug, "setting rightMinLoc: {}\n".format(
minPosOnRightOfPeak))
if localMinPositions[peakInd] > peakPosition:
# the position found is to the right of the assumed peak location
minPosOnLeftOfPeak = leftFNBWPoint
minPosOnRightOfPeak = localMinPositions
flag = 7
lf = 1
msg = "Failed to locate left min pos"
msg_wrapper("info", log.info,msg)
msg_wrapper("debug", log.debug,"setting leftMinLoc: {}\n".format(minPosOnLeftOfPeak))
msg_wrapper("debug", log.debug,"rightMinLoc found: {}\n".format(
minPosOnRightOfPeak))
elif peakInd < len(localMinPositions):
msg_wrapper("debug", log.debug, f'peak ind < local min pos: {peakInd} < {len(localMinPositions)}; i.e. one peak, 2 or more local mins' )
if (localMinPositions[peakInd] < peakPosition) or (localMinPositions[peakInd] > peakPosition):
# grab all the local min positions to the left and right of the peak
minPosOnLeftOfPeak = localMinPositions[:peakInd]
minPosOnRightOfPeak = localMinPositions[peakInd:]
msg_wrapper("debug", log.debug,f"all leftMinLoc: {minPosOnLeftOfPeak}")
msg_wrapper("debug", log.debug,f"all rightMinLoc: {minPosOnRightOfPeak}")
if len(minPosOnLeftOfPeak) == 0:
# there are no left min positons, set left min to fnbw point
minPosOnLeftOfPeak = leftFNBWPoint
flag = 7
lf = 1
msg = "Failed to locate left min pos"
msg_wrapper("info", log.info, msg)
if len(minPosOnRightOfPeak) == 0:
# there are no right min positions, set right min to fnbw point
minPosOnRightOfPeak = rightFNBWPoint
flag = 6
rf = 1
msg = "Failed to locate right min pos"
msg_wrapper("info", log.info, msg)
else:
if localMinPositions[peakInd-1] < peakPosition:
# can't find local mins beyond peak
minPosOnLeftOfPeak = localMinPositions[:peakInd]
minPosOnRightOfPeak = rightFNBWPoint
flag = 6
rf = 1
msg = "Failed to locate right min pos"
print(msg)
msg_wrapper("info", log.info,msg)
msg_wrapper("debug", log.debug,"leftMinLoc: {}\n".format(minPosOnLeftOfPeak))
msg_wrapper("debug", log.debug,"rightMinLoc: {}\n".format(
minPosOnRightOfPeak))
if localMinPositions[0] > peakPosition:
# all local mins are beyond peak
minPosOnLeftOfPeak = leftFNBWPoint
minPosOnRightOfPeak = localMinPositions
flag = 7
lf = 1
msg = "Failed to locate left min pos"
msg_wrapper("info", log.info,msg)
msg_wrapper("debug", log.debug,
"setting leftMinLocs : {}\n".format(minPosOnLeftOfPeak))
msg_wrapper("debug", log.debug, "rightMinLoc beyond peak: {}\n".format(
minPosOnRightOfPeak))
# Determine if data has large sidelobes by evaluating the region
# around the center peak
maxPosInMaxs = localMaxPositions[np.abs(localMaxPositions-peakPosition).argmin()]
indOfmaxPosInMaxs = np.where(localMaxPositions == maxPosInMaxs)[0]
msg_wrapper("debug", log.debug, "\n")
msg_wrapper("debug", log.debug, "-"*20)
msg_wrapper("debug", log.debug, "Searching for sidelobes")
msg_wrapper("debug", log.debug, "-"*20)
msg_wrapper("debug", log.debug, f"local max positions: {localMaxPositions}")
msg_wrapper("debug", log.debug, f"maxPosInMaxs: {maxPosInMaxs}")
msg_wrapper("debug", log.debug, f"IndexOfMaxPos: {indOfmaxPosInMaxs}")
msg_wrapper("debug", log.debug, f"lenmaxpos: {len(localMaxPositions)} ")
sidelobes = 0 # data contains no sidelobes
if len(localMaxPositions) > 1:
# theres more than one peak
if len(localMaxPositions) == indOfmaxPosInMaxs[0]:
leftSidelobe = localMaxPositions[indOfmaxPosInMaxs[0]-1]
rightSidelobe = localMaxPositions[indOfmaxPosInMaxs[0]]
else:
if indOfmaxPosInMaxs == len(localMaxPositions)-1:
leftSidelobe = localMaxPositions[indOfmaxPosInMaxs[0]-1]
rightSidelobe = []
else:
leftSidelobe = localMaxPositions[indOfmaxPosInMaxs[0]-1]
rightSidelobe = localMaxPositions[indOfmaxPosInMaxs[0]+1]
maxPeakInSpline = yspl[maxPosInMaxs]
halfOfMaxPeakInSpline = 0.5*maxPeakInSpline
# check if sidelobes are larger than half the peak
if (yspl[leftSidelobe] >= halfOfMaxPeakInSpline) or (yspl[rightSidelobe] >= halfOfMaxPeakInSpline):
# large sidelobes detected
msg_wrapper("info", log.info, "Large sidelobes detected: {} < {} < {}".format(
yspl[leftSidelobe], maxPeakInSpline, yspl[rightSidelobe]))
sidelobes = 1
msg = "Large sidelobes detected"
flag = 9
else:
msg_wrapper("info", log.info, "No sidelobes detected, moving on")
elif len(localMaxPositions) == 1:
msg_wrapper("info", log.info, "Passed sidelobe check: No sidelobes detected\n")
else:
msg_wrapper("info", log.info, "\nFailed to locate a maximum")
sys.exit()
maxloc = np.where(yspl==max(yspl))[0]
#print(maxloc,maxloc[0],len(yspl),yspl[0],yspl[-1])
# Make sure peak lies within FNBW window or points
if maxloc[0] < leftFNBWPoint or maxloc[0]> rightFNBWPoint:
# found peak in noise
msg_wrapper("info", log.info,"peak found in baselocs")
# sys.exit()
msg = "peak found in baselocs"
flag = 16
# we have sidelobes
# find local min positions ----------------------------------------
locminpos = (np.diff(np.sign(np.diff(yspl))) > 0).nonzero()[0] + 1
baseLine, leftBaselineBlock, rightBaselineBlock, minPosOnLeftOfPeak, minPosOnRightOfPeak, lp, rp = get_base(
locminpos,int((scanLen*.05)/2), len(x))
#locminpos, int((scanLen*.05)/2), len(x)), # why 5% ???
# ----------------------------------------------------------------
#print(lp, rp)
# pl.plot(x,y)
# pl.plot(x,yspl)
# pl.plot(x[leftBaselineBlock], y[leftBaselineBlock], 'b.')
# pl.plot(x[rightBaselineBlock],y[rightBaselineBlock],'m.')
# pl.plot(x[lp], y[lp], 'k.')
# pl.plot(x[rp],y[rp],'k*')
# pl.plot(x[minPosOnLeftOfPeak],yspl[minPosOnLeftOfPeak],"y*")
# pl.plot(x[minPosOnRightOfPeak], yspl[minPosOnRightOfPeak], "r*")
# pl.show()
# pl.close()
# sys.exit()
sidelobes=1
#sys.exit()
return baseLine, leftBaselineBlock, rightBaselineBlock, sidelobes, flag, msg, minPosOnLeftOfPeak, minPosOnRightOfPeak,lp,rp
#return [],[],[],1,flag,msg,[],[],0,0
else:
# Check if you returned anything: sanity check
try:
type(minPosOnRightOfPeak)
# check you are returned something, dummy check
except:
flag=6
msg="Failed to locate right min pos"
msg_wrapper("info", log.info, msg)
return [], [], [], np.nan, flag, msg, np.nan, np.nan, [], []
try:
# check you are returned something, dummy check
type(minPosOnLeftOfPeak)
except:
flag=7
msg="Failed to locate left min pos"
msg_wrapper("info", log.info, msg)
return [], [], [], np.nan, flag, msg, np.nan, np.nan, [], []
# print(type(minPosOnLeftOfPeak).__name__)
# Get the locations of the local minimum positions, if not found, return the position to be within
# a hundred points from both extreme ends of the data array.
if(type(minPosOnRightOfPeak).__name__) != 'ndarray':
if(type(minPosOnRightOfPeak).__name__) == 'list':
pass
else:
try:
minPosOnRightOfPeak=[minPosOnRightOfPeak]
except:
minPosOnRightOfPeak = [len(x)-100]
if(type(minPosOnLeftOfPeak).__name__) != 'ndarray':
if(type(minPosOnRightOfPeak).__name__) == 'list':
ls=[]
# check all values are integers
try:
for val in minPosOnLeftOfPeak:
# print(type(val).__name__, val)
if "float" in (type(val).__name__):
pass
else:
ls.append(val)
minPosOnLeftOfPeak=ls
except:
pass
else:
try:
minPosOnLeftOfPeak=[minPosOnLeftOfPeak]
except:
minPosOnLeftOfPeak = [100]
msg_wrapper("debug", log.debug, "\n")
msg_wrapper("debug", log.debug, "-"*20)
msg_wrapper("info", log.info,"Getting center of baseline blocks on left and right of peak: ")
msg_wrapper("debug", log.debug, "-"*20)
msg_wrapper("info", log.info,f"min pos left: {x[minPosOnLeftOfPeak]} @ loc/s {minPosOnLeftOfPeak}")
msg_wrapper("info", log.info,f"min pos right: {x[minPosOnRightOfPeak]} @ loc/s {minPosOnRightOfPeak}")
msg_wrapper("info", log.info,f"scan length: {scanLen}")
# Plot possible locations to fit your baseline
saveTo=f'{saveLoc}_baselocs.png'
plotBaselineEstimate(x, y, yspl, minPosOnLeftOfPeak, minPosOnRightOfPeak, \
"left local minumums", "right local minimums", "Baseline points selection", saveTo)
# Locate the baselines
# the number of points to use for baseline on each side of beam
# limited to 5% of length of the scan, this works well for
# situations where there are large sidelobes e.g. Jupiter, so
# we apply it to all scans, if fit is bad you can always use the GUI
# to fit by hand.
maxPointsInBaselineBlock = int(scanLen*.05)
hMaxPointsInBaselineBlock = int(maxPointsInBaselineBlock/2)
nums = np.arange(1, scanLen, 1) # array of numbers from 1 to len(list)
# set left and right baseline points placeholders
lb=[]
rb=[]
# Get baseline block on left of beam
msg_wrapper("debug", log.debug, "\n")
msg_wrapper("debug", log.debug, "-"*20)
msg_wrapper("debug", log.debug,"Get baseline block on left of beam")
msg_wrapper("debug", log.debug, "-"*20)
if type(minPosOnLeftOfPeak).__name__ == "int64":
if lf == 1:
# left is faulty, set baseline points to half of the 5% limit
leftBaselineBlock = np.arange(0, minPosOnLeftOfPeak-hMaxPointsInBaselineBlock, 1)
lb = lb+[0, minPosOnLeftOfPeak-hMaxPointsInBaselineBlock]
else:
leftBaselineBlock = np.arange(
minPosOnLeftOfPeak-hMaxPointsInBaselineBlock, minPosOnLeftOfPeak+hMaxPointsInBaselineBlock, 1)
lb = lb+[minPosOnLeftOfPeak-hMaxPointsInBaselineBlock, minPosOnLeftOfPeak+hMaxPointsInBaselineBlock]
if minPosOnLeftOfPeak < len(leftBaselineBlock):
# min pos left is too close to the edge, adjust accordingly
leftBaselineBlock = np.arange(0, len(leftBaselineBlock), 1)
lb = lb+[0, len(leftBaselineBlock)]
msg_wrapper("debug", log.debug,f"mix: {leftBaselineBlock}")
sys.exit()
else:
leftBaselineBlock = []
slots = []
if lf == 0: # left not faulty
for i in range(len(minPosOnLeftOfPeak)):
slots = (np.arange(
minPosOnLeftOfPeak[i]-hMaxPointsInBaselineBlock, minPosOnLeftOfPeak[i]+hMaxPointsInBaselineBlock, 1))
lb = lb+[minPosOnLeftOfPeak[i]-hMaxPointsInBaselineBlock,
minPosOnLeftOfPeak[i]+hMaxPointsInBaselineBlock]
for j in range(len(slots)):
leftBaselineBlock.append(slots[j])
else:
# struggled to find left base so use all data to fnbw point
if len(minPosOnLeftOfPeak)==1:
leftBaselineBlock = np.arange(0,minPosOnLeftOfPeak[0],1)
lb = lb+[0,minPosOnLeftOfPeak[0]]
flag=7
msg="-- Failed to locate left min pos"
msg_wrapper("debug", log.debug,msg)
# TODO: Think about this a little bit more,
msg_wrapper("debug", log.debug, f"lb: {lb}")
# msg_wrapper("debug", log.debug, f"leftBaselineBlock: {leftBaselineBlock}")
# Get baseline block on right of beam
msg_wrapper("debug", log.debug, "\n")
msg_wrapper("debug", log.debug, "-"*20)
msg_wrapper("debug", log.debug,"Get baseline block on right of beam")
msg_wrapper("debug", log.debug, "-"*20)
if type(minPosOnRightOfPeak).__name__ == "int64":
if rf == 1:
flag=6
msg="Failed to locate right min pos"
rightBaselineBlock = np.arange(minPosOnRightOfPeak+hMaxPointsInBaselineBlock, scanLen, 1)
rb= rb+[minPosOnRightOfPeak +
hMaxPointsInBaselineBlock, scanLen]
msg_wrapper("debug", log.debug,msg)
else:
rightBaselineBlock = np.arange(minPosOnRightOfPeak-hMaxPointsInBaselineBlock, minPosOnRightOfPeak+hMaxPointsInBaselineBlock, 1)
rb=rb+[minPosOnRightOfPeak-hMaxPointsInBaselineBlock,
minPosOnRightOfPeak+hMaxPointsInBaselineBlock]
if (scanLen - len(rightBaselineBlock)) < minPosOnRightOfPeak:
# min pos right is too close to the edge, adjust accordingly
rightBaselineBlock = np.arange(scanLen - len(rightBaselineBlock), scanLen, 1)
rb= rb+[scanLen - len(rightBaselineBlock), scanLen]
else:
rightBaselineBlock=[]
slots = []
#rf=[]
if rf == 0:
for i in range(len(minPosOnRightOfPeak)):
end=minPosOnRightOfPeak[i]+hMaxPointsInBaselineBlock
if end>len(x):
end = len(x)#-1
else:
pass
slots=(np.arange(
minPosOnRightOfPeak[i]-hMaxPointsInBaselineBlock, end-1, 1))
rb = rb+[minPosOnRightOfPeak[i]-hMaxPointsInBaselineBlock, end-1]
for j in range(len(slots)):
rightBaselineBlock.append(slots[j])
else:
rightBaselineBlock = np.arange(
scanLen-maxPointsInBaselineBlock, scanLen, 1)
rb = rb+[scanLen-maxPointsInBaselineBlock, scanLen-1]
flag = 6
msg = "Failed to locate right min pos"
msg_wrapper("debug", log.debug,msg)
msg_wrapper("debug", log.debug, f"rb: {rb}")
# ensure data makes sense
msg_wrapper("debug", log.debug, "\n")
msg_wrapper("debug", log.debug, "-"*20)
msg_wrapper("debug", log.debug,"Ensure data makes sense: more sanity checks")
msg_wrapper("debug", log.debug, "-"*20)
try:
# find if data contains negatives and move/shift points forward
zeroIndex = leftBaselineBlock.index(0)
#print("\nzeroIndex found at index: {}".format(zeroIndex))
# find the difference between adjacent numbers and identify
# where to allocate shift
res = np.array([leftBaselineBlock[i + 1] - leftBaselineBlock[i] for i in range(len(leftBaselineBlock)-1)])
if len(res)>0:
# if more than one location for baseline is selected,
# determine shift and adjust accordingly
l = np.where(res!=1)[0]
#("shift parameter: ",l)
#print("value at shift parametera: ",leftBaselineBlock[l[0]],"\n")
left = leftBaselineBlock[zeroIndex:l[0]+1]
right = leftBaselineBlock[l[0]+1:]
s = np.arange(
leftBaselineBlock[l[0]]+1, leftBaselineBlock[l[0]]+1+zeroIndex, 1)
shiftedBlock = left+list(s)+right #np.arange(0, shift,1)
#print("shiftedleftbaselineblock: ", shiftedBlock)
leftBaselineBlock = shiftedBlock
msg_wrapper("debug", log.debug, "Shifted baseline block")
else:
pass
except Exception:
pass
try:
# find if data contains goes beyond max of scan
# and move/shift points backwards
maxIndex = rightBaselineBlock.index(scanLen)
#print("maxIndex: {}".format(maxIndex))
# find the difference between adjacent numbers and identify
# where to allocate shift
res = np.array([rightBaselineBlock[i + 1] - rightBaselineBlock[i] for i in range(len(rightBaselineBlock)-1)])
#print(res)
if len(res)>0:
# if more than one location for baseline is selected,
# determine shift and adjust accordingly
l = np.where(res != 1)[0]
#print("shift parameter: ",l)
if len(l) == 0:
msg_wrapper("debug", log.debug, "Shifting entire block")
last = rightBaselineBlock[-1]
shift =abs(scanLen-last)
shiftedBlock = np.arange(
rightBaselineBlock[0]-shift, scanLen, 1)
#print("shiftedrightbaselineblock: ", shiftedBlock)
rightBaselineBlock = shiftedBlock
else:
#print("value at shift parameterb: ",
# rightBaselineBlock[l[0]], "\n")
msg_wrapper("debug", log.debug, "Shifting block")
if len(l)==1:
left = rightBaselineBlock[:l[0]+1]
right = rightBaselineBlock[l[0]+1:]
shift = abs(rightBaselineBlock[-1]-scanLen)
s = np.arange(
rightBaselineBlock[l[0]+1]-shift, rightBaselineBlock[-1]-shift, 1)
shiftedBlock = left+list(s)
#print("shiftedrightbaselineblock: ", shiftedBlock)
rightBaselineBlock = shiftedBlock
elif len(l)==2:
#print("Length l > 1")
# find value closest to max
nearest = min(l, key=lambda t: abs(t-maxIndex))
index=[np.where(l==nearest)[0]][0]
#print(nearest,index[0])
#print("value at shift parameter: ",
# rightBaselineBlock[nearest], "\n")
# shift backwards
left = rightBaselineBlock[:nearest+1]
right = rightBaselineBlock[nearest+1:]
#print("left: ", left)
#print("right: ", right)
shift = abs(rightBaselineBlock[-1]-scanLen)
#print("shifting back by: ", shift)
s = np.arange(
rightBaselineBlock[nearest+1]-shift, rightBaselineBlock[-1]-shift, 1)
shiftedBlock = left+list(s)
#print("shiftedrightbaselineblock: ", shiftedBlock)
rightBaselineBlock = shiftedBlock
else:
#print("l > 2")
msg = "Too many shift parameters."
flag=18
msg_wrapper("debug", log.debug, f"Too many shift parameters: Peak limit left: {peakLocMinLimit}, Peak position: {peakPosition}, Peak limit right: {peakLocMaxLimit}")
sys.exit()
return [],[],[],1,flag,msg,np.nan
except Exception:
pass
# msg_wrapper("debug", log.debug, f"lb: {lb}")
# msg_wrapper("debug", log.debug, f"rb: {rb}")
# print(len(x),len(y),len(yspl))
# print(leftBaselineBlock)
# print(rightBaselineBlock)
saveTo=f'{saveLoc}_baselocsOutline.png'
plotBaselineEstimate(x, y, yspl, lb, rb,'baselocs at left of beam','baselocs at rigth of beam',\
"Plot with baseline blocks outlined", saveTo, leftBaselineBlock, rightBaselineBlock)
# create a single baseline block
baseLine = list(leftBaselineBlock) + list(rightBaselineBlock)
baseLine = np.array(baseLine, int)
return baseLine, leftBaselineBlock, rightBaselineBlock, sidelobes, flag, msg, minPosOnLeftOfPeak, minPosOnRightOfPeak,lb,rb
[docs]
def correct_drift(xBase, yBase, x, log, order=1):
'''
Correct for a drifting baseline in the scan by fitting
a first order polynomial to a region with no
signal.
Parameters
-----------
xBase : x data of the baseline
yBase : y data of the baseline
x : x data of the entire drift scan
'''
# fit the baseline and get best fit coeffecients
driftModel, driftRes, driftRms, driftCoeffs = calc_residual_and_rms(
xBase, yBase,log, order)
dataModel = np.polyval(driftCoeffs, x)
msg_wrapper("info", log.info, "\n")
msg_wrapper("info", log.info, "-"*30)
msg_wrapper("info", log.info, "Fit the baseline")
msg_wrapper("info", log.info,"-"*30)
msg = "Fit = {:.3}x + ({:.3}), rms error = {:.3}".format(driftCoeffs[0], driftCoeffs[1], driftRms)
msg_wrapper("info", log.info, msg)
return dataModel, driftModel, driftRes, driftRms, driftCoeffs
[docs]
def calc_residual_and_rms(x, y, log, order=1):
"""Calculate the residual and rms from data
Args:
x (array): 1d array
data representing the x-axis
y (array): 1d array
data representing the y-axis
deg(int): degree of the polynomial
Return:
model (array): model of
res (array):
rms (float):
coeff():
"""
#TODO: remove redundant code
# fit the baseline and get best fit coeffecients
coeffs = poly_coeff(x, y, order)
msg_wrapper("debug",log.debug,f'coeffs: {coeffs}')
# get a model for the fit using the best fit coeffecients
model = np.polyval(coeffs, x)
# Calculate the residual and rms
res, rms = calc_residual(y, model)
return model, res, rms, coeffs
[docs]
def calc_residual_and_rms_fit(x,y,order=1):
"""Calculate the residual and rms from data
Args:
x (array): 1d array
data representing the x-axis
y (array): 1d array
data representing the y-axis
deg(int): degree of the polynomial
Return:
model (array): model of
res (array):
rms (float):
coeff():
"""
#TODO: remove redundant code
nx=[]
ny=[]
# remove nan values
for i in range(len(x)):
#print(i,y[i],type(y[i]))
if y[i] == np.nan or str(y[i]) =="nan":
#print(i,y[i])
pass
else:
nx.append(x[i])
ny.append(y[i])
# fit the baseline and get best fit coeffecients
coeffs = poly_coeff(nx, ny, order)
print('coeffs: ',coeffs)
# get a model for the fit using the best fit coeffecients
model = np.polyval(coeffs, nx)
#print('model: ', model)
# Calculate the residual and rms
res, rms = calc_residual(ny, model)
return nx, model, res, rms, coeffs
[docs]
def poly_coeff(x, y, deg):
'''
Calculate the polynomial coeffecients depending
on the degree/order of the polynomial
Args:
x (array): 1d array
data representing the x-axis
y (array): 1d array
data representing the y-axis
deg(int): degree of the polynomial
Return:
array of polynomial fitted data
'''
return np.polyfit(x, y, deg)
[docs]
def fit_beam(x, y, p, fnbw, force, log, saveTag, fitTheoretical, autoFit=None):
"""
Fit single beam data.
Parameters:
x (array): 1D array of data representing the x-axis
y (array): 1D array of data representing the y-axis
p (list): list of initial fit parameters
fnbw (float): source first null beam width from file
dec (float): source declination
data (dict): dictionary of source parameters
scanNum (int): Value representing the current scan
force (str): String to determine whether to force a fit or not
log (object): loffing object
"""
# Setup parameters
scanLen = len(x) # length of the scan
hhpbw = p[2]/2 # half of the hpbw
hfnbw = fnbw/2 # half of the fnbw
flag = 0 # default flag set to zero
msg = "" # flag message to go on plot
# Try to fit a gaussian to determine peak location
# this also works as a bad scan filter
coeff, fit, flag = test_gauss_fit(x, y, p,log)
if len(coeff)==0:
if force=="y":
pass
else:
# return np.nan, [], np.nan, [], [], msg, [], [], [], [], np.nan, flag, [], [], [],np.nan,0,0
return {"peakFit":np.nan,"peakModel":[],"peakRms":np.nan,"correctedData":[],"peakPts":[],"msg":msg,
"driftRes":[],"driftRms":[],"driftCoeffs":[],"fitRes":[],"midXValue":np.nan,"flag":flag,
"baseLocsLeft":[],"baseLocsRight":[],"baseLocsCombined":[],"peakLoc":np.nan,"baseLeft":[],
"baseRight":[]}
if autoFit !=None and len(autoFit)>1:
# If given fitting points
#------------------------
#print(autoFit, autoFit['baselocs'])
#print(float(autoFit['baselocs']))
# fit baseline
# 2) correct the drift in the data and get the residual and rms
baseLocs=[]#np.where()[0]
# print(autoFit)
# sys.exit()
for i in range(len(autoFit['baselocs'])):
if i%2==0:
#print(autoFit['baselocs'][i],autoFit['baselocs'][i+1])
if i==0:
baseLocsl=np.where((x>=autoFit['baselocs'][i]) & (x<=autoFit['baselocs'][i+1]))[0]
else:
baseLocsr=np.where((x>=autoFit['baselocs'][i]) & (x<=autoFit['baselocs'][i+1]))[0]
d=np.where((x>=autoFit['baselocs'][i]) & (x<=autoFit['baselocs'][i+1]))[0]
baseLocs=baseLocs+list(d)
#print(baseLocs)
#baseLocsl=x[b1]
#baseLocsr=x[b2]
lb=[baseLocsl[0],baseLocsl[-1]]
rb=[baseLocsr[0],baseLocsr[-1]]
#print(lb,rb)
#sys.exit()
# fit baseline blocks
dataModel, driftModel, driftRes, driftRms, driftCoeffs = correct_drift(
x[baseLocs], y[baseLocs], x,log)
# 4) apply a polynomial fit to the baseline data
lin_first_null = np.poly1d(np.polyfit(x[baseLocs], y[baseLocs], 1))
# 5) Subtract the polynomial fitted to the baseline to get corrected beam
yCorrected = y - lin_first_null(x)
# 6) get sline of corrected data
s = spline(x, yCorrected)
plotCorrectedData(x,yCorrected,baseLocsl,baseLocsr,'Corrected data','blocks','Plot of baseline corrected data',f'{saveTo}corrected.png',xlabel="",ylabel="")
plot_overlap(x,yCorrected,x,s,'Plot of splined data','corrected','spline fit',f'{saveTo}splined.png',xlabel="",ylabel="")
# pl.title("Baseline corrected data")
# pl.xlabel("Scandist [Deg]")
# pl.ylabel("Ta [K]")
# #pl.plot(x,y)
# pl.plot(x, yCorrected, 'k',label="baseline corrected data")
# #pl.plot(x[main_beam], yCorrected[main_beam])
# pl.plot(x[baseLocs], yCorrected[baseLocs],".")
# pl.plot(x[lb], yCorrected[lb],".")
# #pl.plot(x,s)
# pl.plot(x,np.zeros_like(x),'k')
# #pl.grid()
# pl.legend(loc="best")
# try:
# pl.savefig(saveFolder+"baseline_corrected_data.png")
# except:
# pass
# #pl.show()
# pl.close()
# #sys.exit()
# fit a polynomial to peak data
msg_wrapper("info", log.info, "Fit the peak")
print("*"*60)
hmain_beam=np.where((x>=autoFit['peaklocs'][0]) & (x<=autoFit['peaklocs'][1]))[0]
ypeak = np.polyval(np.polyfit(x[hmain_beam],
yCorrected[hmain_beam], 2), x[hmain_beam])
# get residual and rms of peak fit
fitRes, err_peak = calc_residual(yCorrected[hmain_beam], ypeak)
# pl.title("Plot of final peak fitted data")
# pl.xlabel("Scandist [Deg]")
# pl.ylabel("Ta [K]")
# pl.plot(x, yCorrected, "k", label="corrected data")
# #pl.plot(x[main_beam],yCorrected[main_beam])
# pl.plot(x[hmain_beam], yCorrected[hmain_beam])
# #pl.plot(x,fit)
# pl.plot(x[hmain_beam],ypeak,"r",label="Ta[K] = %.3f +- %.3f" %(max(ypeak),err_peak))
# pl.plot(x,np.zeros(scanLen),"k")
# #pl.grid()
# pl.legend(loc="best")
# try:
# pl.savefig(saveFolder+"peak_fit_data.png")
# except:
# pass
# #pl.show()
# pl.close()
#sys.exit()
# find final peak loc
ploc = np.where(ypeak == max(ypeak))[0]
peakLoc = (x[hmain_beam])[ploc[0]]
# return max(ypeak), ypeak, err_peak, yCorrected, hmain_beam, "", driftRes, driftRms, driftCoeffs, fitRes, coeff[1],
# flag, baseLocsl, baseLocsr, baseLocs, peakLoc,lb,rb
ret={
"peakFit":max(ypeak),"peakModel":ypeak, "peakRms":err_peak,"correctedData":yCorrected,"peakPts":hmain_beam,
"msg":"","driftRes":driftRes,"driftRms":driftRms,"driftCoeffs":driftCoeffs,"fitRes":fitRes,"midXValue":coeff[1],
"flag":flag,"baseLocsLeft":baseLocsl,"baseLocsRight":baseLocsr,"baseLocsCombined":baseLocs,"peakLoc":peakLoc,
"baseLeft":lb,"baseRight":rb
}
return ret
sys.exit()
else:
# 1. Locate baseline blocks
# These are the blocks that will be used to correct the drift in the data
if force=="y" and flag==3:
baseLocsl=[]
baseLocsr=[]
else:
if fitTheoretical=="y":
#baseLocs, baseLocsl, baseLocsr, sidelobes, flag, msg, minPosOnLeftOfPeak, minPosOnRightOfPeak, lb, rb = locate_baseline_blocks_auto(
#x, y, coeff[1], hfnbw,log)
baseLocs, baseLocsl, baseLocsr, sidelobes = locate_baseline_blocks_fnbw(
x, hfnbw,)
#print(baseLocsl,baseLocsr)
flag = 26
msg = "Fitting left and right theoretical (FNBW) points"
minPosOnLeftOfPeak = baseLocsl[int(len(baseLocsl)/2)]
minPosOnRightOfPeak = baseLocsr[int(len(baseLocsr)/2)]
lb = [baseLocsl[0],baseLocsl[-1]]
rb = [baseLocsr[0],baseLocsr[-1]]
#sys.exit()
else:
msg_wrapper("info",log.info,"AUTO fitting started.")
baseLocs, baseLocsl, baseLocsr, sidelobes, flag, msg, minPosOnLeftOfPeak, minPosOnRightOfPeak, lb, rb = locate_baseline_blocks_auto(
x, y, coeff[1], hfnbw,log, saveTag)
if 'int' in type(minPosOnRightOfPeak).__name__ or 'float' in type(minPosOnRightOfPeak).__name__ or len(minPosOnRightOfPeak) == 0:
print("Failed to locate right min pos")
flag=6
# return np.nan, [], np.nan, [], [], msg, [], [], [], [], np.nan, 6, [], [], [], np.nan,0,0
return {"peakFit":np.nan,"peakModel":[],"peakRms":np.nan,"correctedData":[],"peakPts":[],"msg":msg,
"driftRes":[],"driftRms":[],"driftCoeffs":[],"fitRes":[],"midXValue":np.nan,"flag":flag,
"baseLocsLeft":[],"baseLocsRight":[],"baseLocsCombined":[],"peakLoc":np.nan,"baseLeft":[],
"baseRight":[]}
elif 'int' in type(minPosOnLeftOfPeak).__name__ or len(minPosOnLeftOfPeak) == 0:
print("Failed to locate left min pos")
flag=7
# return np.nan, [], np.nan, [], [], msg, [], [], [], [], np.nan, 7, [], [], [], np.nan,0,0
return {"peakFit":np.nan,"peakModel":[],"peakRms":np.nan,"correctedData":[],"peakPts":[],"msg":msg,
"driftRes":[],"driftRms":[],"driftCoeffs":[],"fitRes":[],"midXValue":np.nan,"flag":flag,
"baseLocsLeft":[],"baseLocsRight":[],"baseLocsCombined":[],"peakLoc":np.nan,"baseLeft":[],
"baseRight":[]}
else:
pass
if len(baseLocsl) == 0 or len(baseLocsr) == 0:
# one side of the basline has could not be located possibly due to rfi
msg = "*failed to locate base locs"
msg_wrapper("warning", log.warning, msg)
sys.exit()
flag=30
# Force the fit
if force=="y":
print("Forcing the fit")
# set baseline points and fit edges or fnbw points
#print()
# 1. try fitting the data with the centre at 0
baseLocs, baseLocsl, baseLocsr, sidelobes, flag, msg, minPosOnLeftOfPeak, minPosOnRightOfPeak, lb, rb = locate_baseline_blocks_auto(x, y, 0, hfnbw,log)
msg="force fitted local mins"
if len(baseLocsl) == 0 or len(baseLocsr) == 0:
# 2. try fitting the hfnbw points
baseLocsl = (np.where(x <= (-hfnbw))[0])
baseLocsr = (np.where(x >= (hfnbw))[0])
baseLocs = list(baseLocsl)+list(baseLocsr)
msg="force fitted fnbw locs"
sidelobes=2 #error fitting the data
# if both methods failed, exit
if len(baseLocsl) == 0 or len(baseLocsr) == 0:
# return np.nan, [], np.nan, [], [], msg, [], [], [], [], np.nan, flag, [], [], [], np.nan
return {"peakFit":np.nan,"peakModel":[],"peakRms":np.nan,"correctedData":[],"peakPts":[],"msg":msg,
"driftRes":[],"driftRms":[],"driftCoeffs":[],"fitRes":[],"midXValue":np.nan,"flag":flag,
"baseLocsLeft":[],"baseLocsRight":[],"baseLocsCombined":[],"peakLoc":np.nan,"baseLeft":[],
"baseRight":[]}
'''pl.title("Plot of baseline corrected data")
pl.xlabel("Scandist [Deg]")
pl.ylabel("Ta [K]")
#pl.plot(x,y)
pl.plot(x, y, 'k',label="baseline corrected data")
#pl.plot(x[main_beam], yCorrected[main_beam])
pl.plot(x[baseLocs], y[baseLocs],".")
pl.show()
pl.close()
sys.exit()'''
# fit the baseline
dataModel, driftModel, driftRes, driftRms, driftCoeffs = correct_drift(x[baseLocs], y[baseLocs], x,log)
# get main beam
try:
main_beam = np.where(np.logical_and(x >= coeff[1]-hfnbw, x <= coeff[1]+hfnbw))[0]
except:
print("Failed to get coeff[1], setting beam to fnbw window")
main_beam = np.where(np.logical_and(
x >= hfnbw, x <= hfnbw))[0]
# 4) apply a polynomial fit to the baseline data
lin_first_null = np.poly1d(np.polyfit(x[baseLocs], y[baseLocs], 1))
# 5) Subtract the polynomial fitted to the baseline to get corrected beam
yCorrected = y - lin_first_null(x)
# 6) get sline of corrected data
s = spline(x, yCorrected)
pl.title("Plot of baseline corrected data")
pl.xlabel("Scandist [Deg]")
pl.ylabel("Ta [K]")
#pl.plot(x,y)
pl.plot(x, yCorrected, 'k',label="baseline corrected data")
#pl.plot(x[main_beam], yCorrected[main_beam])
pl.plot(x[baseLocsl], yCorrected[baseLocsl],".")
pl.plot(x[baseLocsr], yCorrected[baseLocsr],".")
#pl.plot(x,s)
pl.plot(x,np.zeros_like(x),'k')
pl.grid()
pl.legend(loc="best")
try:
pl.savefig(saveFolder+"baseline_corrected_data.png")
except:
pass
#pl.show()
pl.close()
#sys.exit()
# fit the peak
# 3) GET LOCATIONS OF DATA FOR THE MAIN BEAM ONLY
# this is limited by the fnbw
# check peak is at centre
main_beam = np.where(np.logical_and(
x >= -hfnbw, x <= hfnbw))[0]
# 4) apply a polynomial fit to the baseline data
lin_first_null = np.poly1d(np.polyfit(x[baseLocs], y[baseLocs], 1))
# 5) Subtract the polynomial fitted to the baseline to get corrected beam
yCorrected = y - lin_first_null(x)
# 6) get sline of corrected data
s = spline(x, yCorrected)
# 7. Fit the peak
# find max location
msg_wrapper("info", log.info, "Fit the peak")
print("*"*60)
maxp = max(s)
maxloc = np.where(s == maxp)[0]
xmaxloc = x[maxloc[0]]
# max is not at center, found at extreme ends of scan
if (max(s) == s[0]) or (max(s) == s[-1]):
msg = "max of peak is not at center, failed peak fit "
msg_wrapper("warning", log.warning, msg)
flag = 10
# return np.nan, [], np.nan, [], [], msg, [], [], [], [], np.nan, flag, [], [], [],np.nan,0,0
return {"peakFit":np.nan,"peakModel":[],"peakRms":np.nan,"correctedData":[],"peakPts":[],"msg":msg,
"driftRes":[],"driftRms":[],"driftCoeffs":[],"fitRes":[],"midXValue":np.nan,"flag":flag,
"baseLocsLeft":[],"baseLocsRight":[],"baseLocsCombined":[],"peakLoc":np.nan,"baseLeft":[],
"baseRight":[]}
else:
pass
# 7) Get indices where the x values are within the main beam
# Get top 30% of beam
xMainBeam = x[main_beam]
yMainBeam = s[main_beam]
maxs = max(s[main_beam])
loc = np.where(s[main_beam] == maxs)[0]
midxMainBeam = xMainBeam[loc[0]]
# Get main beam for peak fitting
try:
hmain_beamn = np.where(np.logical_and(
xMainBeam >= coeff[1]-hhpbw, xMainBeam <= coeff[1]+hhpbw))[0]
except:
hmain_beamn = np.where(np.logical_and(
xMainBeam >= -hhpbw, xMainBeam <= hhpbw))[0]
# Fit top 50% or 30% depending on sidelobe confirmation
if sidelobes == 1:
hmain_beamp = np.where(yMainBeam[hmain_beamn] >= 0.5*maxp)[0]
flag = 9
msg = "large sidelobes detected"
msg_wrapper("warning", log.warning, msg)
else:
hmain_beamp = np.where(yMainBeam[hmain_beamn] >= 0.7*maxp)[0]
flag = 0
if (len(hmain_beamp) == 0) or len(hmain_beamn) ==0:
msg = "couldn't find peak main beam data"
flag=4
msg_wrapper("warning", log.warning, msg)
# find peak of spline
sloc = np.where(s == max(s))[0]
print("sloc: ", sloc, ", len: ", len(
yCorrected), ", mid: ", len(yCorrected)/2)
#print(data)
pl.title("Baseline corrected data")
pl.xlabel("Scandist [Deg]")
pl.ylabel("Ta [K]")
pl.plot(x,y)
pl.plot(x, yCorrected, 'k',label="baseline corrected data")
pl.plot(x[main_beam], yCorrected[main_beam])
pl.plot(x[baseLocsl], yCorrected[baseLocsl],".")
pl.plot(x[baseLocsr], yCorrected[baseLocsr],".")
#pl.plot(x,s)
pl.plot(x,np.zeros_like(x),'k')
#pl.grid()
pl.legend(loc="best")
#pl.savefig(saveFolder+"baseline_corrected_data.png")
pl.show()
pl.close()
sys.exit()
# peak shifted left
if sloc < len(yCorrected)/2 and sloc != 0:
print("Peak is shifted to left, but not first element")
print(x[sloc], hhpbw)
# look for peak around new max loc
beam = np.where(np.logical_and(
x >= x[sloc]-hhpbw, x <= x[sloc]+hhpbw))[0]
# fit peak
if len(beam) > 0:
ypeak = np.polyval(np.polyfit(
x[beam], yCorrected[beam], 2), x[beam])
# get residual and rms of peak fit
fitRes, err_peak = calc_residual(
yCorrected[beam], ypeak)
# find final peak loc
ploc = np.where(ypeak == max(ypeak))[0]
peakLoc = (x[beam])[ploc[0]]
# return max(ypeak), ypeak, err_peak, yCorrected, beam, msg, driftRes, driftRms, driftCoeffs, fitRes,
# np.nan, flag, baseLocsl, baseLocsr, baseLocs, peakLoc
ret={"peakFit":max(ypeak),"peakModel":ypeak, "peakRms":err_peak,"correctedData":yCorrected,"peakPts":beam,
"msg":"","driftRes":driftRes,"driftRms":driftRms,"driftCoeffs":driftCoeffs,"fitRes":fitRes,"midXValue":np.nan,
"flag":flag,"baseLocsLeft":baseLocsl,"baseLocsRight":baseLocsr,"baseLocsCombined":baseLocs,"peakLoc":peakLoc,
"baseLeft":[],"baseRight":[]
}
return ret
else:
# couldn't locate peak
# return np.nan, [], np.nan, [], [], msg, [], [], [], [], np.nan, flag, [], [], [], np.nan,0,0
return {"peakFit":np.nan,"peakModel":[],"peakRms":np.nan,"correctedData":[],"peakPts":[],"msg":msg,
"driftRes":[],"driftRms":[],"driftCoeffs":[],"fitRes":[],"midXValue":np.nan,"flag":flag,
"baseLocsLeft":[],"baseLocsRight":[],"baseLocsCombined":[],"peakLoc":np.nan,"baseLeft":[],
"baseRight":[]}
else:
# peak shifted right
print("Peak is shifted to left, but not first element")
print(x[sloc], hhpbw)
# look for peak around new max loc
beam = np.where(np.logical_and(
x >= x[sloc]-hhpbw, x <= x[sloc]+hhpbw))[0]
# fit peak
if len(beam) > 0:
ypeak = np.polyval(np.polyfit(
x[beam], yCorrected[beam], 2), x[beam])
# get residual and rms of peak fit
fitRes, err_peak = calc_residual(
yCorrected[beam], ypeak)
# find final peak loc
ploc = np.where(ypeak == max(ypeak))[0]
peakLoc = (x[beam])[ploc[0]]
#sys.exit()
# return max(ypeak), ypeak, err_peak, yCorrected, beam, msg, driftRes, driftRms, driftCoeffs, fitRes, np.nan,
# flag, baseLocsl, baseLocsr, baseLocs, peakLoc
return {"peakFit":max(ypeak),"peakModel":ypeak, "peakRms":err_peak,"correctedData":yCorrected,"peakPts":beam,
"msg":"","driftRes":driftRes,"driftRms":driftRms,"driftCoeffs":driftCoeffs,"fitRes":fitRes,"midXValue":np.nan,
"flag":flag,"baseLocsLeft":baseLocsl,"baseLocsRight":baseLocsr,"baseLocsCombined":baseLocs,"peakLoc":peakLoc,
"baseLeft":[],"baseRight":[]
}
# return np.nan, [], np.nan, [], [], msg, [], [], [], [], np.nan, flag, [], [], [],np.nan
return {"peakFit":np.nan,"peakModel":[],"peakRms":np.nan,"correctedData":[],"peakPts":[],"msg":msg,
"driftRes":[],"driftRms":[],"driftCoeffs":[],"fitRes":[],"midXValue":np.nan,"flag":flag,
"baseLocsLeft":[],"baseLocsRight":[],"baseLocsCombined":[],"peakLoc":np.nan,"baseLeft":[],
"baseRight":[]}
hmain_beam = hmain_beamp + hmain_beamn[0]+main_beam[0]
if hmain_beam[0] > baseLocsr[-1] or hmain_beam[-1] < baseLocsl[0]:
# peak loctaion is beyond fnbw locations
msg = "peak location is beyond fnbw locations"
msg_wrapper("warning", log.warning, msg)
flag = 24
# return np.nan, [], np.nan, [], [], msg, [], [], [], [], np.nan, flag, [], [], [],np.nan
return {"peakFit":np.nan,"peakModel":[],"peakRms":np.nan,"correctedData":[],"peakPts":[],"msg":msg,
"driftRes":[],"driftRms":[],"driftCoeffs":[],"fitRes":[],"midXValue":np.nan,"flag":flag,
"baseLocsLeft":[],"baseLocsRight":[],"baseLocsCombined":[],"peakLoc":np.nan,"baseLeft":[],
"baseRight":[]}
# fit a polynomial to peak data
ypeak = np.polyval(np.polyfit(x[hmain_beam],
yCorrected[hmain_beam], 2), x[hmain_beam])
# check if peak was fit correctly
if max(ypeak) == ypeak[0] or max(ypeak) == ypeak[-1]:
# peak is concave, try fitting wider range
hmain_beamp = np.where(yMainBeam[hmain_beamn] >= 0.5*maxp)[0]
hmain_beam = hmain_beamp + hmain_beamn[0]+main_beam[0]
ypeak = np.polyval(np.polyfit(x[hmain_beam],
yCorrected[hmain_beam], 2), x[hmain_beam])
if (max(ypeak) == ypeak[0]) or (max(ypeak) == ypeak[-1]):
# still struggling to fit ?, stop fitting
msg = "failed to accurately establish peak fit location, 1"
msg_wrapper("warning", log.warning, msg)
flag = 5
pl.title("Plot of baseline corrected data")
pl.xlabel("Scandist [Deg]")
pl.ylabel("Ta [K]")
#pl.plot(x,y)
pl.plot(x, yCorrected, 'k',label="baseline corrected data")
pl.plot(x,s)
pl.plot(x[hmain_beamp], yCorrected[hmain_beamp],label=msg)
pl.plot(x[baseLocsl], yCorrected[baseLocsl],".")
pl.plot(x[baseLocsr], yCorrected[baseLocsr],".")
#pl.plot(x,s)
pl.plot(x,np.zeros_like(x),'k')
pl.grid()
pl.legend(loc="best")
#pl.savefig(saveFolder+"baseline_corrected_data.png")
#pl.show()
pl.close()
#sys.exit()
# find peak of spline
sloc = np.where(s==max(s))[0]
print("sloc: ",sloc,", len: ", len(yCorrected),", mid: ",len(yCorrected)/2)
# peak shifted left
if sloc < len(yCorrected)/2 and sloc!=0:
print("Peak is shifted to left, but not first element")
print(x[sloc],hhpbw)
# look for peak around new max loc
beam = np.where(np.logical_and(
x >= x[sloc]-hhpbw, x <= x[sloc]+hhpbw))[0]
# fit peak
if len(beam) > 0:
ypeak = np.polyval(np.polyfit(
x[beam], yCorrected[beam], 2), x[beam])
# get residual and rms of peak fit
fitRes, err_peak = calc_residual(yCorrected[beam], ypeak)
# find final peak loc
ploc = np.where(ypeak == max(ypeak))[0]
peakLoc = (x[beam])[ploc[0]]
# return max(ypeak), ypeak, err_peak, yCorrected, beam, msg, driftRes, driftRms, driftCoeffs, fitRes, np.nan,
# flag, baseLocsl, baseLocsr, baseLocs,peakLoc
return {
"peakFit":max(ypeak),"peakModel":ypeak, "peakRms":err_peak,"correctedData":yCorrected,"peakPts":beam,
"msg":"","driftRes":driftRes,"driftRms":driftRms,"driftCoeffs":driftCoeffs,"fitRes":fitRes,"midXValue":np.nan,
"flag":flag,"baseLocsLeft":baseLocsl,"baseLocsRight":baseLocsr,"baseLocsCombined":baseLocs,"peakLoc":peakLoc,
"baseLeft":[],"baseRight":[],
}
else:
# couldn't locate peak
# return np.nan, [], np.nan, [], [], msg, [], [], [], [], np.nan, flag, [], [], [],np.nan
return {"peakFit":np.nan,"peakModel":[],"peakRms":np.nan,"correctedData":[],"peakPts":[],"msg":msg,
"driftRes":[],"driftRms":[],"driftCoeffs":[],"fitRes":[],"midXValue":np.nan,"flag":flag,
"baseLocsLeft":[],"baseLocsRight":[],"baseLocsCombined":[],"peakLoc":np.nan,"baseLeft":[],
"baseRight":[]}
sys.exit()
# return np.nan, [], np.nan, [], [], msg, [], [], [], [], np.nan, flag, [], [], [],np.nan
return {"peakFit":np.nan,"peakModel":[],"peakRms":np.nan,"correctedData":[],"peakPts":[],"msg":msg,
"driftRes":[],"driftRms":[],"driftCoeffs":[],"fitRes":[],"midXValue":np.nan,"flag":flag,
"baseLocsLeft":[],"baseLocsRight":[],"baseLocsCombined":[],"peakLoc":np.nan,"baseLeft":[],
"baseRight":[]}
# get residual and rms of peak fit
fitRes, err_peak = calc_residual(yCorrected[hmain_beam], ypeak)
# find final peak loc
ploc = np.where(ypeak == max(ypeak))[0]
peakLoc = (x[hmain_beam])[ploc[0]]
# return max(ypeak), ypeak, err_peak, yCorrected, hmain_beam, "", driftRes, driftRms, driftCoeffs, fitRes, np.nan,
# flag, baseLocsl, baseLocsr, baseLocs,peakLoc
return {
"peakFit":max(ypeak),"peakModel":ypeak, "peakRms":err_peak,"correctedData":yCorrected,"peakPts":hmain_beam,
"msg":"","driftRes":driftRes,"driftRms":driftRms,"driftCoeffs":driftCoeffs,"fitRes":fitRes,"midXValue":np.nan,
"flag":flag,"baseLocsLeft":baseLocsl,"baseLocsRight":baseLocsr,"baseLocsCombined":baseLocs,"peakLoc":peakLoc,
"baseLeft":[],"baseRight":[]
}
else:
flag = 37
# return np.nan, [], np.nan, [], [], msg, [], [], [], [], np.nan, flag, [], [], [],np.nan,0,0
return {"peakFit":np.nan,"peakModel":[],"peakRms":np.nan,"correctedData":[],"peakPts":[],"msg":msg,
"driftRes":[],"driftRms":[],"driftCoeffs":[],"fitRes":[],"midXValue":np.nan,"flag":flag,
"baseLocsLeft":[],"baseLocsRight":[],"baseLocsCombined":[],"peakLoc":np.nan,"baseLeft":[],
"baseRight":[]}
else:
pass
# 2) correct the drift in the data and get the residual and rms
dataModel, driftModel, driftRes, driftRms, driftCoeffs = correct_drift(
x[baseLocs], y[baseLocs], x,log)
# 3) GET LOCATIONS OF DATA FOR THE MAIN BEAM ONLY
# this is limited by the fnbw
# check peak is at centre
main_beam = np.where(np.logical_and(
x >= coeff[1]-hfnbw, x <= coeff[1]+hfnbw))[0]
lin_first_null = np.poly1d(np.polyfit(x[baseLocs], y[baseLocs], 1)) # 4) apply a polynomial fit to the baseline data
yCorrected = y - lin_first_null(x) # 5) Subtract the polynomial fitted to the baseline to get corrected beam
s = spline(x, yCorrected) # 6) get spline of corrected data
saveTo=f'{saveTag}corrected.png'
plotCorrectedData(x,yCorrected,baseLocsl,baseLocsr,"baseline corrected data",'baseLocs','Plot of corrected data',saveTo)
# 7. Fit the peak
# find max location
msg_wrapper("info", log.info, "\n")
msg_wrapper("info", log.info, "-"*30)
msg_wrapper("info", log.info, "Fit the peak")
msg_wrapper("info", log.info,"-"*30)
maxp = max(s)
maxloc = np.where(s == maxp)[0]
# xmaxloc = x[maxloc[0]]
# max is not at center, found at extreme ends of scan
if (max(s) == s[0]) or (max(s) == s[-1]):
msg = "max of peak is not at center, failed peak fit "
msg_wrapper("warning", log.warning, msg)
flag = 10
# return np.nan, [], np.nan, [], [], msg, [], [], [], [], np.nan, flag, [], [], [],np.nan,0,0
return {"peakFit":np.nan,"peakModel":[],"peakRms":np.nan,"correctedData":[],"peakPts":[],"msg":msg,
"driftRes":[],"driftRms":[],"driftCoeffs":[],"fitRes":[],"midXValue":np.nan,"flag":flag,
"baseLocsLeft":[],"baseLocsRight":[],"baseLocsCombined":[],"peakLoc":np.nan,"baseLeft":[],
"baseRight":[]}
else:
pass
# 7) Get indices where the x values are within the main beam
# Get top 30% of beam
xMainBeam = x[main_beam]
yMainBeam = s[main_beam]
try:
maxs = max(s[main_beam])
except:
msg = "max of peak is not at center, failed peak fit "
msg_wrapper("warning", log.warning, msg)
flag = 10
# return np.nan, [], np.nan, [], [], msg, [], [], [], [], np.nan, flag, [], [], [],np.nan,0,0
return {"peakFit":np.nan,"peakModel":[],"peakRms":np.nan,"correctedData":[],"peakPts":[],"msg":msg,
"driftRes":[],"driftRms":[],"driftCoeffs":[],"fitRes":[],"midXValue":np.nan,"flag":flag,
"baseLocsLeft":[],"baseLocsRight":[],"baseLocsCombined":[],"peakLoc":np.nan,"baseLeft":[],
"baseRight":[]}
loc = np.where(s[main_beam] == maxs)[0]
midxMainBeam = xMainBeam[loc[0]]
# Try to fit a gaussian to confirm center peak location after correction of scan
try:
p0=[maxs, midxMainBeam, p[2], 0, 0]
coeff, covarMatrix = curve_fit(gauss_lin, x,yCorrected, p0)
fit = gauss_lin(x, *coeff)
# coeff, fit = fit_gauss_lin(
# x, yCorrected, [maxs, midxMainBeam, p[2], 0, 0])
except Exception:
msg = "couldnt find max in main beam"
msg_wrapper("warning", log.warning, msg)
flag = 11
# return np.nan, [], np.nan, [], [], msg, [], [], [], [], np.nan, flag, [], [], [],np.nan,0,0
return {"peakFit":np.nan,"peakModel":[],"peakRms":np.nan,"correctedData":[],"peakPts":[],"msg":msg,
"driftRes":[],"driftRms":[],"driftCoeffs":[],"fitRes":[],"midXValue":np.nan,"flag":flag,
"baseLocsLeft":[],"baseLocsRight":[],"baseLocsCombined":[],"peakLoc":np.nan,"baseLeft":[],
"baseRight":[]}
# Get main beam for peak fitting
hmain_beamn = np.where(np.logical_and(
xMainBeam >= coeff[1]-hhpbw, xMainBeam <= coeff[1]+hhpbw))[0]
# Fit top 50% or 30% depending on sidelobe confirmation
if sidelobes == 1:
hmain_beamp = np.where(yMainBeam[hmain_beamn] >= 0.5*maxp)[0]
flag = 9
msg_wrapper("warning", log.warning, "large sidelobes detected")
else:
hmain_beamp = np.where(yMainBeam[hmain_beamn] >= 0.7*maxp)[0]
flag = 0
if (len(hmain_beamp) == 0) or len(hmain_beamn) ==0:
msg = "couldn't find peak main beam data"
flag=4
msg_wrapper("warning", log.warning, msg)
# sys.exit()
#print(data)
# pl.title("Baseline corrected data")
# pl.xlabel("Scandist [Deg]")
# pl.ylabel("Ta [K]")
# pl.plot(x,y)
# #pl.plot(x, yCorrected, 'k',label="baseline corrected data")
# pl.plot(x[main_beam], yCorrected[main_beam])
# pl.plot(x[baseLocsl], yCorrected[baseLocsl],".")
# pl.plot(x[baseLocsr], yCorrected[baseLocsr],".")
# #pl.plot(x,s)
# pl.plot(x,np.zeros_like(x),'k')
# #pl.grid()
# pl.legend(loc="best")
# #pl.savefig(saveFolder+"baseline_corrected_data.png")
# #pl.show()
# pl.close()
# #sys.exit()
# #sys.exit()
# return np.nan, [], np.nan, [], [], msg, [], [], [], [], np.nan, flag, [], [], [], np.nan,0,0
return {"peakFit":np.nan,"peakModel":[],"peakRms":np.nan,"correctedData":[],"peakPts":[],"msg":msg,
"driftRes":[],"driftRms":[],"driftCoeffs":[],"fitRes":[],"midXValue":np.nan,"flag":flag,
"baseLocsLeft":[],"baseLocsRight":[],"baseLocsCombined":[],"peakLoc":np.nan,"baseLeft":[],
"baseRight":[]}
if (len(hmain_beamp) < 0.5*len(hmain_beamn)):
msg = "peak main beam data too noisy"
flag = 23
msg_wrapper("warning", log.warning, msg)
# sys.exit()
# return np.nan, [], np.nan, [], [], msg, [], [], [], [], np.nan, flag, [], [], [],np.nan,0,0
return {"peakFit":np.nan,"peakModel":[],"peakRms":np.nan,"correctedData":[],"peakPts":[],"msg":msg,
"driftRes":[],"driftRms":[],"driftCoeffs":[],"fitRes":[],"midXValue":np.nan,"flag":flag,
"baseLocsLeft":[],"baseLocsRight":[],"baseLocsCombined":[],"peakLoc":np.nan,"baseLeft":[],
"baseRight":[]}
hmain_beam = hmain_beamp + hmain_beamn[0]+main_beam[0]
if hmain_beam[0] > baseLocsr[-1] or hmain_beam[-1] < baseLocsl[0]:
# peak loctaion is beyond fnbw locations
msg = "peak location is beyond fnbw locations"
msg_wrapper("warning", log.warning, msg)
flag = 24
# sys.exit()
# return np.nan, [], np.nan, [], [], msg, [], [], [], [], np.nan, flag, [], [], [],np.nan,0,0
return {"peakFit":np.nan,"peakModel":[],"peakRms":np.nan,"correctedData":[],"peakPts":[],"msg":msg,
"driftRes":[],"driftRms":[],"driftCoeffs":[],"fitRes":[],"midXValue":np.nan,"flag":flag,
"baseLocsLeft":[],"baseLocsRight":[],"baseLocsCombined":[],"peakLoc":np.nan,"baseLeft":[],
"baseRight":[]}
# fit a polynomial to peak data
ypeak = np.polyval(np.polyfit(x[hmain_beam],
yCorrected[hmain_beam], 2), x[hmain_beam])
# # check if peak was fit correctly
msg=f'Test fit makes sense - left: {ypeak[0]:.3f}, center: {max(ypeak):.3f}, right: {ypeak[-1]:.3f}'
msg_wrapper("debug", log.debug, msg)
# print(max(ypeak),ypeak[0],ypeak[-1])
if max(ypeak) == ypeak[0] or max(ypeak) == ypeak[-1]:
# peak is concave, try fitting wider range
hmain_beamp = np.where(yMainBeam[hmain_beamn] >= 0.5*maxp)[0]
hmain_beam = hmain_beamp + hmain_beamn[0]+main_beam[0]
ypeak = np.polyval(np.polyfit(x[hmain_beam],
yCorrected[hmain_beam], 2), x[hmain_beam])
if (max(ypeak) == ypeak[0]) or (max(ypeak) == ypeak[-1]):
# still struggling to fit ?, try this then stop fitting
try:
# LOCATE LOCAL MINUMUM/MAXIMUM POSITIONS
# these values are used to figure out where to
# select our baseline points/locations
localMinPositions = (np.diff(np.sign(np.diff(s))) > 0).nonzero()[
0] + 1 # local min positions / first nulls
localMaxPositions = (np.diff(np.sign(np.diff(s))) < 0).nonzero()[
0] + 1 # local max positions / peak
msg=f'mins: {localMinPositions}, maxs: {localMaxPositions}'
msg_wrapper("debug", log.debug, msg)
# find peak closest to zero
locs=x[localMaxPositions]
k = min(locs, key=lambda x:abs(x-0))
locs=list(locs)
#print(f'locs: {locs}, k: {k}')
msg=f'pos of possible center peak: {locs.index(k)}'
msg_wrapper("debug", log.debug, msg)
# use this peak as the main peak
kpos=np.where(x==k)[0]
msg=f'peak is at loc: {kpos}'
msg_wrapper("debug", log.debug, msg)
ymax=s[kpos] #max(s)
yhalf= ymax/2
# find surrounding mins
l=min(localMinPositions, key=lambda x:abs(x-kpos))
ppos=list(localMinPositions).index(l)
# print(l,ppos,kpos)
# print(localMinPositions)
if l>kpos:
v=[localMinPositions[ppos-1],localMinPositions[ppos]]
print('window - ',localMinPositions[ppos-1],kpos,localMinPositions[ppos])
left=localMinPositions[ppos-1]
right=localMinPositions[ppos]
elif l < kpos:
print(localMinPositions[ppos],kpos,localMinPositions[ppos+1])
left=localMinPositions[ppos]
right=localMinPositions[ppos+1]
print('l < kpos')
#sys.exit()
else:
print(localMinPositions[ppos],kpos,localMinPositions[ppos+1])
left=localMinPositions[ppos]
right=localMinPositions[ppos+1]
print('l ? kpos')
sys.exit()
top = np.where((s >= yhalf)&(x>x[left])&(x<x[right]))
# peak is concave, try fitting wider range
hmain_beamp = top#np.where(yMainBeam[hmain_beamn] >= 0.5*maxp)[0]
#hmain_beam = hmain_beamp + hmain_beamn[0]+main_beam[0]
ypeak = np.polyval(np.polyfit(x[hmain_beamp],
yCorrected[hmain_beamp], 2), x[hmain_beamp])
# get residual and rms of peak fit
fitRes, err_peak = calc_residual(yCorrected[hmain_beamp], ypeak)
# find final peak loc
#ploc = np.where(ypeak == max(ypeak))[0]
peakLoc = (x[kpos])#[ploc[0]]
msg = "attempted to accurately establish peak fit location"
msg_wrapper("warning", log.warning, msg)
flag = 37
sys.exit()
pl.title("Plot of final peak fitted data")
pl.xlabel("Scandist [Deg]")
pl.ylabel("Ta [K]")
pl.plot(x, yCorrected, "k", label="corrected data")
#pl.plot(x[main_beam],yCorrected[main_beam])
pl.plot(x[hmain_beamp], yCorrected[hmain_beamp])
#pl.plot(x,fit)
pl.plot(x[hmain_beamp],ypeak,"r",label="Ta[K] = %.3f +- %.3f" %(max(ypeak),err_peak))
pl.plot(x,np.zeros(scanLen),"k")
#pl.grid()
pl.legend(loc="best")
try:
pl.savefig(saveFolder+"peak_fit_data.png")
except:
pass
#pl.show()
pl.close()
#sys.exit()
return {
"peakFit":max(ypeak),"peakModel":ypeak, "peakRms":err_peak,"correctedData":yCorrected,"peakPts":hmain_beamp,
"msg":"","driftRes":driftRes,"driftRms":driftRms,"driftCoeffs":driftCoeffs,"fitRes":fitRes,"midXValue":coeff[1],
"flag":flag,"baseLocsLeft":baseLocsl,"baseLocsRight":baseLocsr,"baseLocsCombined":baseLocs,"peakLoc":peakLoc,
"baseLeft":lb,"baseRight":rb,
}
# return max(ypeak), ypeak, err_peak, yCorrected, hmain_beamp, "", driftRes, driftRms, driftCoeffs, fitRes,
# coeff[1], flag, baseLocsl, baseLocsr, baseLocs, peakLoc,lb,rb
except:
msg = "failed to accurately establish peak fit location"
msg_wrapper("warning", log.warning, msg)
flag = 5
print(msg)
# sys.exit()
# return np.nan, [], np.nan, [], [], msg, [], [], [], [], np.nan, flag, [], [], [],np.nan,0,0
return {"peakFit":np.nan,"peakModel":[],"peakRms":np.nan,"correctedData":[],"peakPts":[],"msg":msg,
"driftRes":[],"driftRms":[],"driftCoeffs":[],"fitRes":[],"midXValue":np.nan,"flag":flag,
"baseLocsLeft":[],"baseLocsRight":[],"baseLocsCombined":[],"peakLoc":np.nan,"baseLeft":[],
"baseRight":[]}
# print(max(ypeak),ypeak[0],ypeak[-1],abs(min(yCorrected)) > max(yCorrected))
if (abs(min(yCorrected)) > max(yCorrected)):
# still struggling to fit ?, stop fitting
msg = "min > max"
msg_wrapper("warning", log.warning, msg)
flag = 25
# sys.exit()
if force=="y":
pass
else:
# return np.nan, [], np.nan, [], [], msg, [], [], [], [], np.nan, flag, [], [], [],np.nan,0,0
return {"peakFit":np.nan,"peakModel":[],"peakRms":np.nan,"correctedData":[],"peakPts":[],"msg":msg,
"driftRes":[],"driftRms":[],"driftCoeffs":[],"fitRes":[],"midXValue":np.nan,"flag":flag,
"baseLocsLeft":[],"baseLocsRight":[],"baseLocsCombined":[],"peakLoc":np.nan,"baseLeft":[],
"baseRight":[]}
else:
pass
# get residual and rms of peak fit
fitRes, err_peak = calc_residual(yCorrected[hmain_beam], ypeak)
saveTo=f'{saveTag}_peak_fit.png'
plotPeakFit("Plot of final peak fitted data",x,yCorrected,ypeak,err_peak,hmain_beam,saveTo)
# find final peak loc
ploc = np.where(ypeak == max(ypeak))[0]
peakLoc = (x[hmain_beam])[ploc[0]]
# print('out')
return {
"peakFit":max(ypeak),"peakModel":ypeak, "peakRms":err_peak,"correctedData":yCorrected,"peakPts":hmain_beam,
"msg":"","driftRes":driftRes,"driftRms":driftRms,"driftCoeffs":driftCoeffs,"fitRes":fitRes,"midXValue":coeff[1],
"flag":flag,"baseLocsLeft":baseLocsl,"baseLocsRight":baseLocsr,"baseLocsCombined":baseLocs,"peakLoc":peakLoc,
"baseLeft":lb,"baseRight":rb,
}
# return max(ypeak), ypeak, err_peak, yCorrected, hmain_beam, "", driftRes, driftRms, driftCoeffs, fitRes, coeff[1], flag, baseLocsl, baseLocsr, baseLocs, peakLoc,lb,rb
[docs]
def fit_dual_beam(x, y, hpbw, fnbw, factor,saveTo,log): #, npts, dec, srcType, data, scanNum, force, log):
# Setup parameters
scanLen = len(x) # length of the scan
midLeftLoc = int(scanLen/4) # estimated location of peak on left beam
midRightLoc = midLeftLoc * 3 # estimated location of peak on right beam
hhpbw = hpbw/2 # half of the hpbw
hfnbw = fnbw/2 # half of the fnbw
factoredfnbw = (fnbw*factor) # fnbw multiplied by factor
flag=0 # default flag set to zero
msg="" # flag message to go on plot image
# saveFolder="currentScanPlots/"
fl = 0 # failed left gaussian fit
fr = 0 # failed right gaussian fit
# LOCATE BASELINE BLOCKS
msg_wrapper("debug", log.debug, "-- Locate baseline blocks")
# we don't worry about sidelobes here so baseline
# blocks are set to edges or fnbw points
ptLimit = int(scanLen*0.04) # Point limit, number of allowed points per base block
baseLocsLeft = np.arange(0,ptLimit,1)
baseLocsRight = np.arange(scanLen-ptLimit,scanLen,1)
baseLocs = list(baseLocsLeft)+list(baseLocsRight)
if len(baseLocsLeft) == 0 or len(baseLocsRight) == 0:
msg = "failed to locate base locs"
flag = 30
msg_wrapper("warning", log.warning, msg)
# sys.exit()
return {"correctedData":[],"driftRes":[],"driftRms":np.nan,
"driftCoeffs":[], "baseLocsCombined":[],
"baseLocsLeft":[],"baseLocsRight":[],
"leftPeakData":[],"leftPeakModelData":[],
"leftPeakFit":np.nan, "leftPeakFitErr":np.nan,"leftPeakFitRes":[],
"rightPeakData":[],"rightPeakModelData":[],
"rightPeakFit":np.nan, "rightPeakFitErr":[],"rightPeakFitRes":[],
"msg":"","midXValueLeft":[],"midXValueRight":[],
"flag":flag
}
msg_wrapper("debug", log.debug,
f'BaseLocsLeft: {baseLocsLeft[0]} to {baseLocsLeft[-1]} = {len(baseLocsLeft)} pts')
msg_wrapper("debug", log.debug,
f'BaseLocsLeft: {baseLocsRight[0]} to {baseLocsRight[-1]} = {len(baseLocsRight)} pts')
dataModel, driftModel, driftRes, driftRms, driftCoeffs = correct_drift(x[baseLocs], y[baseLocs], x,log)
# Correct the data and get global max/min
yCorrected = y-dataModel
maxyloc = np.argmax(yCorrected)
minyloc = np.argmin(yCorrected)
maxy = yCorrected[maxyloc]
miny = yCorrected[minyloc]
# Spline the data and get global max/min
yspl = spline(x, yCorrected)
ysplmaxloc = np.argmax(yspl)
ysplminloc = np.argmin(yspl)
ysplmax = yspl[ysplmaxloc]
ysplmin = yspl[ysplminloc]
# make plots
plotCorrectedData(x,yCorrected,baseLocsLeft,baseLocsRight,'Corrected data','Baseline blocks','Plot of baseline corrected data',f'{saveTo}corrected.png',xlabel="",ylabel="")
plot_overlap(x,yCorrected,x,yspl,'Plot of splined data','corrected','spline fit',f'{saveTo}splined.png',xlabel="",ylabel="")
# sys.exit()
msg_wrapper("debug", log.debug, "maxyloc: {}, maxy: {:.3f}, minyloc: {}, miny: {:.3f}".format(maxyloc, maxy, minyloc, miny))
msg_wrapper("debug", log.debug, "maxyloc: {}, maxy: {:.3f}, minyloc: {}, miny: {:.3f}\n".format(ysplmaxloc, ysplmax, ysplminloc, ysplmin))
msg_wrapper("debug", log.debug, "A beam peakloc: {} \n# B beam peakloc: {}".format(ysplmaxloc, ysplminloc))
# A/B BEAM DATA PROCESSING
AbeamScan = np.where(np.logical_and(x > -factoredfnbw, x < 0))[0]
BbeamScan = np.where(np.logical_and(x > 0, x < factoredfnbw))[0]
if len(AbeamScan) == 0 or len(BbeamScan) == 0:
msg="A/B beam scan data == 0, no data"
flag = 8
msg_wrapper("warning", log.warning, msg)
return {"correctedData":[],"driftRes":[],"driftRms":np.nan,
"driftCoeffs":[], "baseLocsCombined":[],
"baseLocsLeft":[],"baseLocsRight":[],
"leftPeakData":[],"leftPeakModelData":[],
"leftPeakFit":np.nan, "leftPeakFitErr":np.nan,"leftPeakFitRes":[],
"rightPeakData":[],"rightPeakModelData":[],
"rightPeakFit":np.nan, "rightPeakFitErr":[],"rightPeakFitRes":[],
"msg":"","midXValueLeft":[],"midXValueRight":[],
"flag":flag
}
msg_wrapper("debug", log.debug, "- Beam seperation:")
msg_wrapper("debug", log.debug, "Left beam indeces: {} to {}".format(AbeamScan[0], AbeamScan[-1]))
msg_wrapper("debug", log.debug, "Right beam indeces: {} to {}\n".format(
BbeamScan[0], BbeamScan[-1]))
msg_wrapper("debug", log.debug, "- Data Limits")
msg_wrapper("debug", log.debug, "base left: {}, drift A left: {}, peak A: {}, drift A right: {}".format(baseLocsLeft[-1], AbeamScan[0], ysplminloc, AbeamScan[-1]))
msg_wrapper("debug", log.debug, "drift B left: {}, peak B: {}, drift B right: {}, base right: {}\n".format(
BbeamScan[0], ysplmaxloc, BbeamScan[-1], baseLocsRight[0]))
# figure out orientation of beam. With some scans the beams
# are flipped e.g, pks2326-502, A beam is positive, whereas
# j0450-81 A beam is negative.
# find value closest to zero and use the other value to determine
# which side to fit positive/negative peak
lstA=[min(yspl[AbeamScan]), max(yspl[AbeamScan])]
minA = min(lstA,key=abs)
fa=""
if minA==min(yspl[AbeamScan]):
# fit A beam positive B beam negative
#print("Fitting positive")
fa="p"
else:
# fit A beam negative B beam positive
#print("Fitting negative")
fa="n"
# TODO: should make this an option
# Decided to change this because the software now treats both
# target sources and calibrators the same way
beamCut = 0.6 # fitting top 40%, 0.7 (30% for cals) or 0.5 (50% for targets)
# Ensure peak is within accepted limits
if fa=="p":
if ysplmaxloc > AbeamScan[-1]:# or ysplminloc > beamScan[0]:
msg = "Max is beyond left baseline block"
flag = 31
msg_wrapper("warning", log.warning, msg)
return {"correctedData":[],"driftRes":[],"driftRms":np.nan,
"driftCoeffs":[], "baseLocsCombined":[],
"baseLocsLeft":[],"baseLocsRight":[],
"leftPeakData":[],"leftPeakModelData":[],
"leftPeakFit":np.nan, "leftPeakFitErr":np.nan,"leftPeakFitRes":[],
"rightPeakData":[],"rightPeakModelData":[],
"rightPeakFit":np.nan, "rightPeakFitErr":[],"rightPeakFitRes":[],
"msg":"","midXValueLeft":[],"midXValueRight":[],
"flag":flag
}
if ysplminloc < BbeamScan[0]:#baseLocsRight[0]: # ysplminloc < BbeamScan[0] or
msg="Min is beyond right baseline block"
# print(msg)
flag=32
msg_wrapper("warning", log.warning, msg)
return {"correctedData":[],"driftRes":[],"driftRms":np.nan,
"driftCoeffs":[], "baseLocsCombined":[],
"baseLocsLeft":[],"baseLocsRight":[],
"leftPeakData":[],"leftPeakModelData":[],
"leftPeakFit":np.nan, "leftPeakFitErr":np.nan,"leftPeakFitRes":[],
"rightPeakData":[],"rightPeakModelData":[],
"rightPeakFit":np.nan, "rightPeakFitErr":[],"rightPeakFitRes":[],
"msg":"","midXValueLeft":[],"midXValueRight":[],
"flag":flag
}
# Try to fit a gaussian to determine peak parameters
try:
# set initial parameters for data fitting
p = [max(y), x[midLeftLoc], hpbw, .01, .0]
# Try to fit a gaussian to determine peak location
# this also works as a bad scan filter
coeffl, fitl, flagl = test_gauss_fit(x[AbeamScan], y[AbeamScan], p,log)
# coeffl, covar_matrixl, fitLeft = fit_gauss_lin(
# x[AbeamScan], y[AbeamScan], p)
except Exception:
fl=1
msg = "gaussian curve_fit algorithm failed"
msg_wrapper("debug", log.debug, msg)
fitLeft = y[AbeamScan]
flag = 3
try:
# set initial parameters for data fitting
p = [min(y), x[midRightLoc], hpbw, .01, .0]
coeffr, fitr, flagr = test_gauss_fit(x[AbeamScan], y[AbeamScan], p,log)
# coeffr, covar_matrixr, fitRight = fit_gauss_lin(
# x[BbeamScan], y[BbeamScan], p)
except Exception:
fr = 1
msg = "gaussian curve_fit algorithm failed"
msg_wrapper("debug", log.debug, msg)
fitRight = y[BbeamScan]
flag = 3
# Determine peak fitting location
BbeamLeftLimit = x[ysplminloc]-hhpbw #coeffl[1] - hhpbw # *2*.6
BbeamRightLimit = x[ysplminloc]+hhpbw
AbeamLeftLimit = x[ysplmaxloc]-hhpbw #coeffl[1] - hhpbw # *2*.6
AbeamRightLimit = x[ysplmaxloc]+hhpbw
leftlocs = np.where(np.logical_and(
x >= AbeamLeftLimit, x <= AbeamRightLimit))[0]
rightlocs = np.where(np.logical_and(
x >= BbeamLeftLimit, x <= BbeamRightLimit))[0]
# select part of beam to fit
if ysplmax < 0.1:
flag = 36
msg = "fit entire left beam, max yspl < 0.1"
msg_wrapper("warning", log.warning, msg)
leftMainBeamLocs = leftlocs
else:
topCut = np.where(yspl[leftlocs] >= ysplmax*beamCut)[0]
leftMainBeamLocs = leftlocs[0]+np.array(topCut)
if ysplmin > -0.1:
flag = 35
msg = "fit entire right beam, min yspl > -0.1"
msg_wrapper("warning", log.warning, msg)
rightMainBeamLocs = rightlocs
else:
bottomCut = np.where(yspl[rightlocs] <= ysplmin*beamCut)[0]
rightMainBeamLocs = rightlocs[0]+np.array(bottomCut)
if fa=="n":
# A beam is min
if ysplmaxloc < AbeamScan[-1]:# or ysplminloc > beamScan[0]:
msg = "Max is beyond left baseline block"
flag = 31
msg_wrapper("warning", log.warning, msg)
return {"correctedData":[],"driftRes":[],"driftRms":np.nan,
"driftCoeffs":[], "baseLocsCombined":[],
"baseLocsLeft":[],"baseLocsRight":[],
"leftPeakData":[],"leftPeakModelData":[],
"leftPeakFit":np.nan, "leftPeakFitErr":np.nan,"leftPeakFitRes":[],
"rightPeakData":[],"rightPeakModelData":[],
"rightPeakFit":np.nan, "rightPeakFitErr":[],"rightPeakFitRes":[],
"msg":"","midXValueLeft":[],"midXValueRight":[],
"flag":flag
}
if ysplminloc > BbeamScan[0]:#baseLocsRight[0]: # ysplminloc < BbeamScan[0] or
msg="Min is beyond right baseline block"
# print(msg)
flag=32
msg_wrapper("warning", log.warning, msg)
return {"correctedData":[],"driftRes":[],"driftRms":np.nan,
"driftCoeffs":[], "baseLocsCombined":[],
"baseLocsLeft":[],"baseLocsRight":[],
"leftPeakData":[],"leftPeakModelData":[],
"leftPeakFit":np.nan, "leftPeakFitErr":np.nan,"leftPeakFitRes":[],
"rightPeakData":[],"rightPeakModelData":[],
"rightPeakFit":np.nan, "rightPeakFitErr":[],"rightPeakFitRes":[],
"msg":"","midXValueLeft":[],"midXValueRight":[],
"flag":flag
}
# Try to fit a gaussian to determine peak parameters
try:
# set initial parameters for data fitting
p = [min(y), x[midLeftLoc], hpbw, .01, .0]
coeffl, fitl, flag = test_gauss_fit(x[AbeamScan], y[AbeamScan], p,log)
# coeffl, covar_matrixl, fitLeft = fit_gauss_lin(
# x[AbeamScan], y[AbeamScan], p)
except Exception:
fl=1
msg = "gaussian curve_fit algorithm failed"
msg_wrapper("debug", log.debug, msg)
fitLeft = y[AbeamScan]
flag = 3
try:
# set initial parameters for data fitting
p = [max(y), x[midRightLoc], hpbw, .01, .0]
coeffr, fitr, flagr = test_gauss_fit( x[BbeamScan], y[BbeamScan], p,log)
# coeffr, covar_matrixr, fitRight = fit_gauss_lin(
# x[BbeamScan], y[BbeamScan], p)
except Exception:
fr = 1
msg = "gaussian curve_fit algorithm failed"
msg_wrapper("debug", log.debug, msg)
fitRight = y[BbeamScan]
flag = 3
# Determine peak fitting location
AbeamLeftLimit = x[ysplminloc]-hhpbw #coeffl[1] - hhpbw # *2*.6
AbeamRightLimit = x[ysplminloc]+hhpbw
BbeamLeftLimit = x[ysplmaxloc]-hhpbw #coeffl[1] - hhpbw # *2*.6
BbeamRightLimit = x[ysplmaxloc]+hhpbw
leftlocs = np.where(np.logical_and(
x >= AbeamLeftLimit, x <= AbeamRightLimit))[0]
rightlocs = np.where(np.logical_and(
x >= BbeamLeftLimit, x <= BbeamRightLimit))[0]
#select part of beam to fit
if ysplmax < 0.1:
flag = 36
msg = "fit entire left beam, max yspl < 0.1"
msg_wrapper("warning", log.warning, msg)
rightMainBeamLocs = rightlocs
else:
topCut = np.where(yspl[rightlocs] >= ysplmax*beamCut)[0]
rightMainBeamLocs = rightlocs[0]+np.array(topCut)
if ysplmin > -0.1:
flag = 35
msg = "fit entire right beam, min yspl > -0.1"
msg_wrapper("warning", log.warning, msg)
leftMainBeamLocs = leftlocs
else:
bottomCut = np.where(yspl[leftlocs] <= ysplmin*beamCut)[0]
leftMainBeamLocs = leftlocs[0]+np.array(bottomCut)
# fit left peak
ypeakl = np.polyval(np.polyfit(x[leftMainBeamLocs], yCorrected[leftMainBeamLocs], 2), x[leftMainBeamLocs])
fitResl, err_peakl = calc_residual(yCorrected[leftMainBeamLocs], ypeakl)
# fit right peak
ypeakr = np.polyval(np.polyfit(
x[rightMainBeamLocs], yCorrected[rightMainBeamLocs], 2), x[rightMainBeamLocs])
fitResr, err_peakr = calc_residual(yCorrected[rightMainBeamLocs], ypeakr)
if fa=="p":
ymin = min(ypeakr)
ymax = max(ypeakl)
else:
ymin = min(ypeakl)
ymax = max(ypeakr)
msg_wrapper("debug", log.debug, "A/B beam peak")
if fa=="p":
msg_wrapper("debug", log.debug, "left: {:.3f}, max: {:.3f}, right: {:.3f}".format(
ypeakl[0], ymax, ypeakl[-1]))
msg_wrapper("debug", log.debug, "left: {:.3f}, min: {:.3f}, right{:.3f}".format(
ypeakr[0], ymin, ypeakr[-1]))
ypeakrdata = x[rightMainBeamLocs]
ypeakldata = x[leftMainBeamLocs]
# check data doesn't overlap
overlapRight = set(baseLocsRight) & set(rightMainBeamLocs)
overlapLeft = set(baseLocsLeft) & set(leftMainBeamLocs)
# overlapbeams = set(leftMainBeamLocs) & set(rightMainBeamLocs)
msg=("checking for overlapping beams: ")
msg_wrapper("debug", log.debug, msg)
if len(overlapLeft) != 0:
msg = "beams don't overlap on left"
msg_wrapper("debug", log.debug, msg)
if leftMainBeamLocs[0] > baseLocsLeft[int(
len(baseLocsLeft)*.8)]:
pass
else:
overlap = next(iter(overlapLeft))
shift = list(leftMainBeamLocs).index(int(overlap))
msg = "Overlap found on A beam"
flag = 33
msg_wrapper("warning", log.warning, msg)
# move beam to the left
f = abs(len(leftMainBeamLocs)-shift)
leftMainBeamLocs = abs(leftMainBeamLocs+f)
# fit left peak
ypeakl = np.polyval(np.polyfit(
x[leftMainBeamLocs], yCorrected[leftMainBeamLocs], 2), x[leftMainBeamLocs])
fitResl, err_peakl = calc_residual(
yCorrected[leftMainBeamLocs], ypeakl)
ymax = max(ypeakl)
msg="left: {:.3f}, max: {:.3f}, right: {:.3f}".format(
ypeakl[0], ymax, ypeakl[-1])
msg_wrapper("debug", log.debug, msg)
if(ypeakl[0] >= ymax or ypeakl[-1] >= ymax):
ymax = np.nan
err_peakl = np.nan
else:
flag = 36
msg = "fit entire left beam"
msg_wrapper("debug", log.debug, msg)
return {"correctedData":[],"driftRes":[],"driftRms":np.nan,
"driftCoeffs":[], "baseLocsCombined":[],
"baseLocsLeft":[],"baseLocsRight":[],
"leftPeakData":[],"leftPeakModelData":[],
"leftPeakFit":np.nan, "leftPeakFitErr":np.nan,"leftPeakFitRes":[],
"rightPeakData":[],"rightPeakModelData":[],
"rightPeakFit":np.nan, "rightPeakFitErr":[],"rightPeakFitRes":[],
"msg":"","midXValueLeft":[],"midXValueRight":[],
"flag":flag
}
if len(overlapRight) != 0:
overlap = next(iter(overlapRight))
shift = list(rightMainBeamLocs).index(int(overlap))
msg = "Overlap found on B beam"
flag = 34
msg_wrapper("warning", log.warning, msg)
# move beam to the RIGHT
f = abs(len(rightMainBeamLocs)-shift)
rightMainBeamLocs = abs(rightMainBeamLocs-f)
msg="beam shifted to left by {} points".format(f)
msg_wrapper("debug", log.debug, msg)
# fit right peak
ypeakr = np.polyval(np.polyfit(
x[rightMainBeamLocs], yCorrected[rightMainBeamLocs], 2), x[rightMainBeamLocs])
fitResr, err_peakr = calc_residual(
yCorrected[rightMainBeamLocs], ypeakr)
ymin = min(ypeakr)
msg="left: {:.3f}, min: {:.3f}, right{:.3f}".format(
ypeakr[0], ymin, ypeakr[-1])
msg_wrapper("debug", log.debug, msg)
if(ypeakr[0] <= ymin or ypeakr[-1] <= ymin):
ymin = np.nan
err_peakr = np.nan
else:
flag = 35
msg = "fit entire right beam"
msg_wrapper("debug", log.debug, msg)
ypeakrdata = x[rightMainBeamLocs]
#pl.plot(x, yCorrected)
'''#pl.plot(x[leftlocs],yCorrected[leftlocs])
#pl.plot(x[leftMainBeamLocs], yCorrected[leftMainBeamLocs])
#pl.plot(x[rightlocs],yCorrected[rightlocs])
pl.plot(x[rightMainBeamLocs], yCorrected[rightMainBeamLocs])
#pl.plot(x[leftMainBeamLocs], ypeakl)
pl.plot(x[rightMainBeamLocs], ypeakr)
pl.plot(x, np.zeros(scanLen))
pl.plot(x[baseLocs], yCorrected[baseLocs], ".")
#pl.show()
pl.close()
#sys.exit()'''
return {"correctedData":[],"driftRes":[],"driftRms":np.nan,
"driftCoeffs":[], "baseLocsCombined":[],
"baseLocsLeft":[],"baseLocsRight":[],
"leftPeakData":[],"leftPeakModelData":[],
"leftPeakFit":np.nan, "leftPeakFitErr":np.nan,"leftPeakFitRes":[],
"rightPeakData":[],"rightPeakModelData":[],
"rightPeakFit":np.nan, "rightPeakFitErr":[],"rightPeakFitRes":[],
"msg":"","midXValueLeft":[],"midXValueRight":[],
"flag":flag
}
if ((x[leftMainBeamLocs])[-1] > 0):
flag = 28
msg = "left beam data goes beyond midpoint"
msg_wrapper("warning", log.warning, msg)
return {"correctedData":[],"driftRes":[],"driftRms":np.nan,
"driftCoeffs":[], "baseLocsCombined":[],
"baseLocsLeft":[],"baseLocsRight":[],
"leftPeakData":[],"leftPeakModelData":[],
"leftPeakFit":np.nan, "leftPeakFitErr":np.nan,"leftPeakFitRes":[],
"rightPeakData":[],"rightPeakModelData":[],
"rightPeakFit":np.nan, "rightPeakFitErr":[],"rightPeakFitRes":[],
"msg":"","midXValueLeft":[],"midXValueRight":[],
"flag":flag
}
if ((x[rightMainBeamLocs])[-1] < 0):
flag = 29
msg = "right beam data goes beyond midpoint"
msg_wrapper("warning", log.warning, msg)
return {"correctedData":[],"driftRes":[],"driftRms":np.nan,
"driftCoeffs":[], "baseLocsCombined":[],
"baseLocsLeft":[],"baseLocsRight":[],
"leftPeakData":[],"leftPeakModelData":[],
"leftPeakFit":np.nan, "leftPeakFitErr":np.nan,"leftPeakFitRes":[],
"rightPeakData":[],"rightPeakModelData":[],
"rightPeakFit":np.nan, "rightPeakFitErr":[],"rightPeakFitRes":[],
"msg":"","midXValueLeft":[],"midXValueRight":[],
"flag":flag
}
# import matplotlib.pyplot as pl
# pl.plot(x, yCorrected, label="corrected data")
# pl.plot(x[leftlocs], yCorrected[leftlocs], label="peakl data")
# pl.plot(x[leftMainBeamLocs], yCorrected[leftMainBeamLocs])
# pl.plot(x[rightlocs], yCorrected[rightlocs], label="peakr data")
# pl.plot(x[rightMainBeamLocs], yCorrected[rightMainBeamLocs])
# pl.plot(x[leftMainBeamLocs],
# ypeakl, label="peak fitl: {:.3f} +- {:.3f}".format(ymax, err_peakl))
# pl.plot(x[rightMainBeamLocs], ypeakr,
# label="peak fitr: {:.3f} +- {:.3f}".format(ymin, err_peakr))
# pl.plot(x[baseLocs], yCorrected[baseLocs], ".", label="baselocs")
# pl.plot(x, np.zeros(scanLen))
# pl.legend(loc="best")
# pl.grid()
# #pl.show()
# try:
# pl.savefig(saveFolder+"fitted.png")
# except:
# pass
# pl.close()
# sys.exit()
msg_wrapper("info", log.info, "\n")
msg_wrapper("info", log.info, "-"*30)
msg_wrapper("info", log.info, "Fit the peaks")
msg_wrapper("info", log.info,"-"*30)
msg="\npeak left: {:.3f} +- {:.3f} K\npeak right: {:.3f} +- {:.3f} K\n".format(
ymax, err_peakl, ymin, err_peakr)
msg_wrapper("info", log.info, msg)
# find final peak loc
ploca = np.where(ypeakl == ymax)[0]
if len(ploca)==0:
peakLoca=np.nan
else:
peakLoca = (x[leftMainBeamLocs])[ploca[0]]
# find final peak loc
plocb = np.where(ypeakr == ymin)[0]
if len(plocb)==0:
peakLocb=np.nan
else:
peakLocb = (x[rightMainBeamLocs])[plocb[0]]
return {"correctedData":yCorrected,"driftRes":driftRes,"driftRms":driftRms,
"driftCoeffs":driftCoeffs, "baseLocsCombined":baseLocs,
"baseLocsLeft":leftMainBeamLocs,"baseLocsRight":rightMainBeamLocs,
"leftPeakData":ypeakldata,"leftPeakModelData":ypeakl,
"leftPeakFit":ymax, "leftPeakFitErr":err_peakl,"leftPeakFitRes":fitResl,
"rightPeakData":ypeakrdata,"rightPeakModelData":ypeakr,
"rightPeakFit":ymin, "rightPeakFitErr":err_peakr,"rightPeakFitRes":fitResr,
"msg":"","midXValueLeft":peakLoca,"midXValueRight":peakLocb,
"flag":flag
}
else:
msg_wrapper("debug", log.debug, "left: {:.3f}, min: {:.3f}, right: {:.3f}".format(
ypeakl[0], ymin, ypeakl[-1]))
msg_wrapper("debug", log.debug, "left: {:.3f}, max: {:.3f}, right{:.3f}".format(
ypeakr[0], ymax, ypeakr[-1]))
ypeakrdata = x[rightMainBeamLocs]
ypeakldata = x[leftMainBeamLocs]
# check data doesn't overlap
overlapRight = set(baseLocsRight) & set(rightMainBeamLocs)
overlapLeft = set(baseLocsLeft) & set(leftMainBeamLocs)
# overlapbeams = set(leftMainBeamLocs) & set(rightMainBeamLocs)
msg=("checking for overlapping beams: ")
msg_wrapper("debug", log.debug, msg)
if len(overlapLeft) != 0:
msg = "beams don't overlap on left"
msg_wrapper("debug", log.debug, msg)
if leftMainBeamLocs[0] > baseLocsLeft[int(
len(baseLocsLeft)*.8)]:
pass
else:
overlap = next(iter(overlapLeft))
shift = list(leftMainBeamLocs).index(int(overlap))
msg = "Overlap found on A beam"
flag = 33
msg_wrapper("warning", log.warning, msg)
# move beam to the left
f = abs(len(leftMainBeamLocs)-shift)
leftMainBeamLocs = abs(leftMainBeamLocs+f)
# fit left peak
ypeakl = np.polyval(np.polyfit(
x[leftMainBeamLocs], yCorrected[leftMainBeamLocs], 2), x[leftMainBeamLocs])
fitResl, err_peakl = calc_residual(
yCorrected[leftMainBeamLocs], ypeakl)
ymax = max(ypeakl)
msg="left: {:.3f}, max: {:.3f}, right: {:.3f}".format(
ypeakl[0], ymax, ypeakl[-1])
msg_wrapper("debug", log.debug, msg)
if(ypeakl[0] <= ymax or ypeakl[-1] <= ymax):
ymax = np.nan
err_peakl = np.nan
else:
flag = 36
msg = "fit entire left beam"
msg_wrapper("debug", log.debug, msg)
ypeakldata = x[leftMainBeamLocs]
'''pl.plot(x, yCorrected)
pl.plot(x[leftMainBeamLocs], yCorrected[leftMainBeamLocs], 'b')
#pl.plot(x[rightlocs],yCorrected[rightlocs])
#pl.plot(x[rightMainBeamLocs], yCorrected[rightMainBeamLocs])
pl.plot(x[leftMainBeamLocs], ypeakl, 'r')
#pl.plot(x[rightMainBeamLocs], ypeakr)
pl.plot(x, np.zeros(scanLen))
pl.plot(x[baseLocs], yCorrected[baseLocs], ".")
#pl.show()
pl.close()
#sys.exit()'''
return {"correctedData":[],"driftRes":[],"driftRms":np.nan,
"driftCoeffs":[], "baseLocsCombined":[],
"baseLocsLeft":[],"baseLocsRight":[],
"leftPeakData":[],"leftPeakModelData":[],
"leftPeakFit":np.nan, "leftPeakFitErr":np.nan,"leftPeakFitRes":[],
"rightPeakData":[],"rightPeakModelData":[],
"rightPeakFit":np.nan, "rightPeakFitErr":[],"rightPeakFitRes":[],
"msg":"","midXValueLeft":[],"midXValueRight":[],
"flag":flag
}
if len(overlapRight) != 0:
overlap = next(iter(overlapRight))
shift = list(rightMainBeamLocs).index(int(overlap))
msg = "Overlap found on B beam"
flag = 34
msg_wrapper("warning", log.warning, msg)
# move beam to the RIGHT
f = abs(len(rightMainBeamLocs)-shift)
rightMainBeamLocs = abs(rightMainBeamLocs-f)
msg="beam shifted to left by {} points".format(f)
msg_wrapper("debug", log.debug, msg)
# fit right peak
ypeakr = np.polyval(np.polyfit(
x[rightMainBeamLocs], yCorrected[rightMainBeamLocs], 2), x[rightMainBeamLocs])
fitResr, err_peakr = calc_residual(
yCorrected[rightMainBeamLocs], ypeakr)
ymin = min(ypeakr)
msg="left: {:.3f}, min: {:.3f}, right{:.3f}".format(
ypeakr[0], ymin, ypeakr[-1])
msg_wrapper("debug", log.debug, msg)
if(ypeakr[0] <= ymin or ypeakr[-1] <= ymin):
ymin = np.nan
err_peakr = np.nan
else:
flag = 35
msg = "fit entire right beam"
msg_wrapper("debug", log.debug, msg)
ypeakrdata = x[rightMainBeamLocs]
#pl.plot(x, yCorrected)
'''#pl.plot(x[leftlocs],yCorrected[leftlocs])
#pl.plot(x[leftMainBeamLocs], yCorrected[leftMainBeamLocs])
#pl.plot(x[rightlocs],yCorrected[rightlocs])
pl.plot(x[rightMainBeamLocs], yCorrected[rightMainBeamLocs])
#pl.plot(x[leftMainBeamLocs], ypeakl)
pl.plot(x[rightMainBeamLocs], ypeakr)
pl.plot(x, np.zeros(scanLen))
pl.plot(x[baseLocs], yCorrected[baseLocs], ".")
#pl.show()
pl.close()
#sys.exit()'''
return {"correctedData":[],"driftRes":[],"driftRms":np.nan,
"driftCoeffs":[], "baseLocsCombined":[],
"baseLocsLeft":[],"baseLocsRight":[],
"leftPeakData":[],"leftPeakModelData":[],
"leftPeakFit":np.nan, "leftPeakFitErr":np.nan,"leftPeakFitRes":[],
"rightPeakData":[],"rightPeakModelData":[],
"rightPeakFit":np.nan, "rightPeakFitErr":[],"rightPeakFitRes":[],
"msg":"","midXValueLeft":[],"midXValueRight":[],
"flag":flag
}
if ((x[leftMainBeamLocs])[-1] > 0):
flag = 28
msg = "left beam data goes beyond midpoint"
msg_wrapper("warning", log.warning, msg)
return {"correctedData":[],"driftRes":[],"driftRms":np.nan,
"driftCoeffs":[], "baseLocsCombined":[],
"baseLocsLeft":[],"baseLocsRight":[],
"leftPeakData":[],"leftPeakModelData":[],
"leftPeakFit":np.nan, "leftPeakFitErr":np.nan,"leftPeakFitRes":[],
"rightPeakData":[],"rightPeakModelData":[],
"rightPeakFit":np.nan, "rightPeakFitErr":[],"rightPeakFitRes":[],
"msg":"","midXValueLeft":[],"midXValueRight":[],
"flag":flag
}
if ((x[rightMainBeamLocs])[-1] < 0):
flag = 29
msg = "right beam data goes beyond midpoint"
msg_wrapper("warning", log.warning, msg)
# return [], [], [], np.nan, [], \
# [], [], np.nan, np.nan, [], \
# [], [], np.nan, np.nan, [],\
# msg, flag, np.nan, np.nan
return {"correctedData":[],"driftRes":[],"driftRms":np.nan,
"driftCoeffs":[], "baseLocsCombined":[],
"baseLocsLeft":[],"baseLocsRight":[],
"leftPeakData":[],"leftPeakModelData":[],
"leftPeakFit":np.nan, "leftPeakFitErr":np.nan,"leftPeakFitRes":[],
"rightPeakData":[],"rightPeakModelData":[],
"rightPeakFit":np.nan, "rightPeakFitErr":[],"rightPeakFitRes":[],
"msg":"","midXValueLeft":[],"midXValueRight":[],
"flag":flag
}
# import matplotlib.pyplot as pl
# pl.plot(x, yCorrected, label="corrected data")
# pl.plot(x[leftlocs], yCorrected[leftlocs], label="peakl data")
# pl.plot(x[leftMainBeamLocs], yCorrected[leftMainBeamLocs])
# pl.plot(x[rightlocs], yCorrected[rightlocs], label="peakr data")
# pl.plot(x[rightMainBeamLocs], yCorrected[rightMainBeamLocs])
# pl.plot(x[leftMainBeamLocs],
# ypeakl, label="peak fitl: {:.3f} +- {:.3f}".format(ymin, err_peakl))
# pl.plot(x[rightMainBeamLocs], ypeakr,
# label="peak fitr: {:.3f} +- {:.3f}".format(ymax, err_peakr))
# pl.plot(x[baseLocs], yCorrected[baseLocs], ".", label="baselocs")
# pl.plot(x, np.zeros(scanLen))
# pl.legend(loc="best")
# pl.show()
# try:
# pl.savefig(saveFolder+"fitted.png")
# except:
# pass
# pl.close()
# sys.exit()
msg_wrapper("info", log.info, "\n")
msg_wrapper("info", log.info, "-"*30)
msg_wrapper("info", log.info, "Fit the peaks.")
msg_wrapper("info", log.info,"-"*30)
msg="\npeak left: {:.3f} +- {:.3f} K\npeak right: {:.3f} +- {:.3f} K\n".format(
ymin, err_peakl, ymax, err_peakr)
msg_wrapper("info", log.info, msg)
# find final peak loc
ploca = np.where(ypeakl == ymin)[0]
if len(ploca)==0:
peakLoca=np.nan
else:
peakLoca = (x[leftMainBeamLocs])[ploca[0]]
# find final peak loc
plocb = np.where(ypeakr == ymax)[0]
if len(plocb)==0:
peakLocb=np.nan
else:
peakLocb = (x[rightMainBeamLocs])[plocb[0]]
return {"correctedData":yCorrected,"driftRes":driftRes,"driftRms":driftRms,
"driftCoeffs":driftCoeffs, "baseLocsCombined":baseLocs,
"baseLocsLeft":leftMainBeamLocs,"baseLocsRight":rightMainBeamLocs,
"leftPeakData":ypeakldata,"leftPeakModelData":ypeakl,
"leftPeakFit":ymax, "leftPeakFitErr":err_peakl,"leftPeakFitRes":fitResl,
"rightPeakData":ypeakrdata,"rightPeakModelData":ypeakr,
"rightPeakFit":ymin, "rightPeakFitErr":err_peakr,"rightPeakFitRes":fitResr,
"msg":"","midXValueLeft":peakLoca,"midXValueRight":peakLocb,
"flag":flag
}
# return yCorrected, driftRes, driftRms, driftCoeffs[0], baseLocs, \
# ypeakldata, ypeakl, ymin, err_peakl, fitResl, \
# ypeakrdata, ypeakr, ymax, err_peakr, fitResr,\
# msg, flag, peakLoca, peakLocb
[docs]
def get_base(localMinPositions, block_width, scan_len):
""" get the baseline block/s from data with large sidelobes. """
base=[] # entire basline block points
localMinPositions=sorted(localMinPositions)
basel=[] # left baseline blocks
baser=[] # right baseline blocks
lp=[]
rp=[]
basell=[]
baserr=[]
ind=4
for i in range(len(localMinPositions)):
#print(localMinPositions[i], block_width)
if localMinPositions[i]<=block_width:
# get everything before and after
# print("0",localMinPositions[i], block_width+localMinPositions[i])
base=base+list(np.arange(0,block_width+localMinPositions[i]+1))
if localMinPositions[i] <= scan_len/2:
basel = basel+list(np.arange(0, block_width+localMinPositions[i]+1))
lp=lp+[localMinPositions[i]]
#print("1- ",localMinPositions[i])
basell.append(0)
basell.append(block_width+localMinPositions[i]+1)
elif localMinPositions[i] > scan_len/2:
baser = baser + \
list(np.arange(0, block_width+localMinPositions[i]+1))
rp=rp+[localMinPositions[i]]
#print("2+ ", localMinPositions[i])
baserr.append(0)
baserr.append(block_width+localMinPositions[i]+1)
elif localMinPositions[i]>=scan_len-block_width:
# get everything before and after
#print(localMinPositions[i]-block_width, localMinPositions[i], scan_len)
base=base+list(np.arange(localMinPositions[i]-block_width,scan_len))
if localMinPositions[i] <= scan_len/2:
basel = basel+list(
np.arange(localMinPositions[i]-block_width, scan_len))
lp=lp+[localMinPositions[i]]
#print("3- ", localMinPositions[i])
basell.append(localMinPositions[i]-block_width)
basell.append(scan_len)
elif localMinPositions[i] > scan_len/2:
baser = baser+list(
np.arange(localMinPositions[i]-block_width, scan_len-1))
baserr.append(localMinPositions[i]-block_width)
baserr.append(scan_len-1)
rp = rp+[localMinPositions[i]]
#print("4+ ", localMinPositions[i])
else:
#print(localMinPositions[i]-block_width, localMinPositions[i], block_width+localMinPositions[i])
base=base+list(np.arange(localMinPositions[i]-block_width,block_width+localMinPositions[i]))
if localMinPositions[i] <= scan_len/2:
#print("5- ", localMinPositions[i])
basel=basel+list(
np.arange(localMinPositions[i]-block_width, block_width+localMinPositions[i]))
#print("6- ",lp,localMinPositions[i])
lp = lp+[localMinPositions[i]]
basell.append(localMinPositions[i]-block_width)
basell.append(block_width+localMinPositions[i])
elif localMinPositions[i] > scan_len/2:
baser= baser+list(
np.arange(localMinPositions[i]-block_width, block_width+localMinPositions[i]))
rp = rp+[localMinPositions[i]]
end = block_width+localMinPositions[i]
baserr.append(localMinPositions[i]-block_width)
if end >=scan_len:
baserr.append(scan_len)
return base, basel,baser,basell,baserr,lp,rp
# GUI operation
[docs]
def get_base_pts(x, y, base_index_list):
""" Get baseline points from a list of
indexes.
"""
# Get data between the indexes selected
ind_list = sorted(base_index_list)
# get location of all points
xb_data = []
yb_data = []
if len(ind_list) == 2:
ind_1 = ind_list[0]
ind_2 = ind_list[1]
#print(i, i+1, ind_1, ind_2, len(ind_list))
xb_data = xb_data + list(x[ind_1:ind_2])
yb_data = yb_data + list(y[ind_1:ind_2])
else:
for i in range(len(ind_list)):
if i % 2 == 0 and i != 1:
ind_1 = ind_list[i]
ind_2 = ind_list[i+1]
#print(i, i+1, ind_1, ind_2, len(ind_list))
xb_data = xb_data + list(x[ind_1:ind_2])
yb_data = yb_data + list(y[ind_1:ind_2])
return xb_data, yb_data
[docs]
def filter_scans(x, window_len=10, window='flat'):
"""
smooth the data using a window with requested size.
This method is based on the convolution of a scaled window with the signal.
The signal is prepared by introducing reflected copies of the signal
(with the window size) in both ends so that transient parts are minimized
in the begining and end part of the output signal.
Args:
x: the input signal
window_len: the dimension of the smoothing window; should be an odd integer
window: the type of window from 'flat', 'hanning', 'hamming', 'bartlett',
'blackman' flat window will produce a moving average smoothing.
Returns:
the smoothed signal
"""
if x.ndim != 1:
raise (ValueError, "smooth only accepts 1 dimension arrays.")
if x.size < window_len:
raise (ValueError, "Input vector needs to be bigger than window size.")
if window_len < 3:
return x
if not window in ['flat', 'hanning', 'hamming', 'bartlett', 'blackman']:
raise (
ValueError, "Window is one of 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'")
s = np.r_[x[window_len-1:0:-1], x, x[-2:-window_len-1:-1]]
if window == 'flat': # moving average
w = np.ones(window_len, 'd')
else:
w = eval('np.'+window+'(window_len)')
y = np.convolve(w/w.sum(), s, mode='valid')
return y
[docs]
def fit_poly_peak(xp, yp, order,log):
"""
Fit the peak and estimate the errors.
"""
peakCoeffs = poly_coeff(xp, yp, order)
peakModel = np.polyval(peakCoeffs, xp)
# Calculate the residual and rms of the peak fit
peakRes, peakRms = calc_residual(peakModel, yp,log)
return peakRes, peakRms, peakModel