# ============================================================================#
# File: _auto.py #
# Author: Pfesesani V. van Zyl #
# =========================================================================== #
# Standard library imports
# --------------------------------------------------------------------------- #
import os, sys
from dataclasses import dataclass, field
import numpy as np
from datetime import datetime
import argparse
from config import __version__, __DBNAME__
import pandas as pd
import sqlite3
# Module imports
# --------------------------------------------------------------------------- #
import common.exceptions as ex
from common.contextManagers import open_file
from common.driftScans import DriftScans
from common.enums import ScanType
from common.miscellaneousFunctions import set_dict_item, create_current_scan_directory, delete_logs, set_table_name,fast_scandir
from common.logConfiguration import configure_logging
from common.msgConfiguration import msg_wrapper, load_prog
from common.sqlite_db import SQLiteDB
# =========================================================================== #
[docs]
def get_freq_band2(freq:int):
if freq >= 1000 and freq<= 2000:
return 'L', '18.0S'
elif freq > 2000 and freq<= 4000: #
return 'S', '13.0S'
elif freq > 4000 and freq<= 6000:
return 'C', '05.0D'
elif freq > 6000 and freq<= 8000:
# TODO: get correct frequency band for masers - ask Fanie vdHeever
return 'M', '04.5S'# for methanol masers
elif freq > 8000 and freq<= 12000: # 8580
return 'X', '03.5D'
elif freq > 12000 and freq<= 18000:
return 'Ku', '02.5S'
elif freq >= 18000 and freq<= 27000:
return 'K', '01.3S'
[docs]
@dataclass
class Observation:
"""
Observation object containing the observation data of the observation.
"""
# -- Observation parameters
FILEPATH: str # path to observing file
theoFit: str # theoretical fit implemented y/n
autoFit:str # automated fit implemented y/n
log:object # logger
dbCols: dict # dictionary of table columns
# -- Observation parameters not initialized when class called
HDULIST: str = field(init=False) # list of HDU objects
HDULENGTH: str = field(init=False) # length of HDULIST
INFOHEADER: str = field(init=False) # summarizes the content of the opened FITS file
def __post_init__(self):
""" Open file and get file info.
"""
# TODO: remember to include the value and description for these items when you're done
# print('\n dbCol: ',self.dbCols)
# sys.exit()
if len(self.dbCols)==0:
# probably gui self processing tool
# print(self.__dict__)
# setup a dictionary with all the parameters pre-set
try:
msg_wrapper("info",self.log.info,f"Opening file {self.FILEPATH}")
with open_file(self.FILEPATH) as f:
self.HDULIST = f
self.HDULENGTH=len(self.HDULIST)
self.INFOHEADER = f.info
# set values to dictionary
msg_wrapper("debug",self.log.debug,f"Setting FILEPATH, HDULIST, HDULENGTH and INFOHEADER to internal dict")
self.__dict__["HDULIST"]={'value':self.HDULIST, 'description':"Header data unit for observing file"}
self.__dict__["HDULENGTH"]={'value':self.HDULENGTH, 'description':"Number of HDUs in observing file"}
self.__dict__["INFOHEADER"]={'value':self.INFOHEADER, 'description':"Information header for observing file"}
self.__dict__["FILEPATH"]={'value':self.FILEPATH, 'description':"Path to file or filename"}
except Exception as e:
# TODO: put in proper file exception handling
# see context manager
return f'{self.FILEPATH} is corrupt'
sys.exit()
else:
msg_wrapper('info',self.log.info,'Using predefined cols')
self.dbCols['keys']=list(self.dbCols.keys())
self.dbCols['values']=list(self.dbCols.values())
self.dbCols['FILEPATH']={'value':self.FILEPATH, 'description':'Path to file'}
self.dbCols['theoFit']={'value':self.theoFit, 'description':'Theoretical fit to be used'}
self.dbCols['autoFit']={'value':self.autoFit, 'description':'Automated fitting to be used'}
self.dbCols['log']=self.log
# print('--++--\n',self.dbCols)
# sys.exit()
try:
msg_wrapper("info",self.log.info,f"Opening file {self.FILEPATH}")
with open_file(self.FILEPATH) as f:
self.HDULIST = f
self.HDULENGTH=len(self.HDULIST)
self.INFOHEADER = f.info
# # set values to dictionary
msg_wrapper("debug",self.log.debug,f"Setting FILEPATH, HDULIST, HDULENGTH and INFOHEADER to internal dict")
self.dbCols["HDULIST"]={'value':self.HDULIST, 'description':"Header data unit for observing file"}
self.dbCols["HDULENGTH"]={'value':self.HDULENGTH, 'description':"Number of HDUs in observing file"}
self.dbCols["INFOHEADER"]={'value':self.INFOHEADER, 'description':"Information header for observing file"}
self.__dict__= self.dbCols
except Exception as e:
# TODO: put in proper file exception handling
print('error: ',e)
# sys.exit()
# see context manager
return f'{self.FILEPATH} is corrupt'
# sys.exit()
for k,v in self.__dict__.items():
if type(v).__name__ =='dict' or k=='log' or k=='keys' or k=='values':
pass
else:
# print(k,v)
self.__dict__[k]={'value':np.nan, 'description':''}
# for k,v in self.__dict__.items():
# print(k,v)
# sys.exit()
[docs]
def set_key_value_pairs(self, key1, desc1, key2, desc2,indexKey,keys):
"""
Set key/value pairs for keys that may be missing from the dictionary
Args:
key1 (_type_): missing key
desc1 (_type_): description of missing key
key2 (_type_): reference key
desc2 (_type_): description of reference key
indexKey (_type_): index key
"""
if key1 in keys:
pass
else:
keys=self.__dict__.keys()
pos = list(keys).index(indexKey)
items = list(self.__dict__.items())
items.insert(pos+1, (key2, {'value':np.nan, 'description': desc2}))
items.insert(pos+1, (key1, {'value':np.nan, 'description': desc1}))
self.__dict__=dict(items)
# print(f'No {key1}')
msg=f'No {key1}'
msg_wrapper("debug",self.log.debug,msg)
return
[docs]
def get_data_only(self,noqv=False):
""" Get data from fits file hdu. This is for the quick file view.
"""
msg_wrapper("debug",self.log.debug,f"Getting data from fits file hdulist")
msg_wrapper("debug",self.log.debug,f"Create dict object to store read parameters")
self.__dict__['CARDS']={} #{'value':[], 'description':"Placeholder for hdu card titles or names"} # holds hdu card titles or names
#
CURDATETIME = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
msg_wrapper("info",self.log.info,f"Date and time of data processing: {CURDATETIME}")
self.__dict__["CURDATETIME"]={'value':CURDATETIME, 'description':"Current date and time of the data processing"}
msg_wrapper("debug",self.log.debug,f"Looping over each HDU object in hdulist")
try:
hdulen=self.HDULENGTH['value']
except:
msg_wrapper("debug",self.log.debug,f'File is a symlink: {self.FILEPATH}. Stopped processing')
return
for index in range(hdulen):
self.read_data_from_hdu_lists(index)
keys=self.__dict__.keys()
self.set_key_value_pairs('HZPERK1', 'HZPERK1', 'HZKERR1', '[Hz/K] Counter cal error','TCAL2',keys)
self.set_key_value_pairs('HZPERK2', 'HZPERK2', 'HZKERR2', '[Hz/K] Counter cal error','HZKERR1',keys)
self.set_key_value_pairs('TSYS1', 'TSYS1 [K]', 'TSYSERR1', '[K] System temperature','HZKERR2',keys)
self.set_key_value_pairs('TSYS2', 'TSYS2[K]', 'TSYSERR2', '[K] System temperature','TSYSERR1',keys)
# add other important bits
# ----------------------------------
# get drift scans
# use hdu frontend to determine path to data processing
frontend = self.__dict__['FRONTEND']['value']
src = self.__dict__['OBJECT']['value']
freq = self.__dict__['CENTFREQ']['value']
# create_current_scan_directory()
self.create_final_plot_directory(src,freq)
if 'S' in (frontend):
if '13.0S' in frontend or '18.0S' in frontend:
set_dict_item(self.__dict__,'BEAMTYPE',ScanType.SBW.name, 'wide single beam drift scan')
elif '02.5S' in frontend or '04.5S' in frontend or '01.3S' in frontend:
set_dict_item(self.__dict__,'BEAMTYPE',ScanType.SBN.name, 'narrow single beam drift scan')
else:
print(f'Unknown beam type :{frontend} - contact author to have it included\n')
sys.exit()
# get driftscan data from file
driftScans=DriftScans(self.__dict__)
driftScans.process_data_only() # process the data
del driftScans # release from memory
elif 'D' in (frontend):
set_dict_item(self.__dict__,'BEAMTYPE',ScanType.DB.name, 'dual beam drift scan')
# sys.exit()
# get driftscan data from file
driftScans=DriftScans(self.__dict__)
# sys.exit()
driftScans.process_data_only() # process the data
del driftScans # release from memory
[docs]
def get_data(self):
""" Get data from fits file hdu
"""
# print('get data')
# sys.exit()
msg_wrapper("debug",self.log.debug,f"Getting data from fits file hdulist")
msg_wrapper("debug",self.log.debug,f"Create dict object to store read parameters")
self.__dict__['CARDS']={} #{'value':[], 'description':"Placeholder for hdu card titles or names"} # holds hdu card titles or names
CURDATETIME = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
msg_wrapper("info",self.log.info,f"Date and time of data processing: {CURDATETIME}")
self.__dict__["CURDATETIME"]={'value':CURDATETIME, 'description':"Current date and time of the data processing"}
msg_wrapper("debug",self.log.debug,f"Looping over each HDU object in hdulist")
try:
hdulen=self.HDULENGTH['value']
except:
with open('faultyFiles.txt','a') as f:
f.write(f'{self.FILEPATH}\n')
print(f'\nFile is a symlink: {self.FILEPATH}. Stopped processing')
return
for index in range(hdulen):
self.read_data_from_hdu_lists(index)
keys=self.__dict__.keys()
self.set_key_value_pairs('HZPERK1', 'HZPERK1', 'HZKERR1', '[Hz/K] Counter cal error','TCAL2',keys)
self.set_key_value_pairs('HZPERK2', 'HZPERK2', 'HZKERR2', '[Hz/K] Counter cal error','HZKERR1',keys)
self.set_key_value_pairs('TSYS1', 'TSYS1 [K]', 'TSYSERR1', '[K] System temperature','HZKERR2',keys)
self.set_key_value_pairs('TSYS2', 'TSYS2[K]', 'TSYSERR2', '[K] System temperature','TSYSERR1',keys)
# add other important bits
# ----------------------------------
# get drift scans
# use hdu frontend to determine path to data processing
frontend = self.__dict__['FRONTEND']['value']
src = self.__dict__['OBJECT']['value']
freq = self.__dict__['CENTFREQ']['value']
# print(freq,type(freq))
if str(freq)=="nan":
print('Fronend is nan')
freq='nans'
try :
f=int(freq)
except:
# if freq == np.nan:
print("Couldn't find frequency, creating frequency from path")
f=self.__dict__['FILEPATH']['value'].split('/')[-2]
try:
freq=int(f)
print(freq)
# if freq>= and freq<= :
band,frontend=get_freq_band2(freq)
print(f'Freq: {freq}, band: {band}, frontend: {frontend}')
self.__dict__['FRONTEND']['value']=frontend
# sys.exit()
except:
print('Path not in valid path layout')
sys.exit()
print(f)
# sys.exit()
# create_current_scan_directory()
self.create_final_plot_directory(src,freq)
print(f'For frontend: {frontend}')
# sys.exit()
self.__dict__['CENTFREQ']['value']=freq
if 'S' in (frontend):
if '13.0S' in frontend or '18.0S' in frontend:
set_dict_item(self.__dict__,'BEAMTYPE',ScanType.SBW.name, 'wide single beam drift scan')
elif '02.5S' in frontend or '04.5S' in frontend or '01.3S' in frontend:
set_dict_item(self.__dict__,'BEAMTYPE',ScanType.SBN.name, 'narrow single beam drift scan')
else:
print(f'Unknown beam type :{frontend} - contact author to have it included\n')
sys.exit()
# get driftscan data from file
for k,v in self.__dict__.items():
if 'CENTFREQ' in k:
freq=int(v['value'])
if 'FILEPATH' in k:
fpath=v['value']
# TODO: decide what to do with this
if str(freq) in fpath:
pass
else:
print('Frequency and path dont match')
print(fpath,freq)
# check if source has been processed
# create required table
driftScans=DriftScans(self.__dict__)
driftScans.process_data() # process the data
del driftScans # release from memory
elif 'D' in (frontend):
set_dict_item(self.__dict__,'BEAMTYPE',ScanType.DB.name, 'dual beam drift scan')
# get driftscan data from \file
driftScans=DriftScans(self.__dict__)
driftScans.process_data() # process the data
del driftScans # release from memory
[docs]
def get_data_from_predefined_cols(self):
""" Get data from fits file hdu
"""
msg_wrapper("debug",self.log.debug,f"Getting data from fits file hdulist")
msg_wrapper("debug",self.log.debug,f"Create dict object to store read parameters")
self.__dict__['CARDS']={} #{'value':[], 'description':"Placeholder for hdu card titles or names"} # holds hdu card titles or names
# print(self.__dict__)
# sys.exit()
#
CURDATETIME = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
msg_wrapper("info",self.log.info,f"Date and time of data processing: {CURDATETIME}")
self.__dict__["CURDATETIME"]={'value':CURDATETIME, 'description':"Current date and time of the data processing"}
msg_wrapper("debug",self.log.debug,f"Looping over each HDU object in hdulist")
try:
hdulen=self.HDULENGTH['value']
except:
with open('faultyFiles.txt','a') as f:
f.write(f'{self.FILEPATH}\n')
print(f'\nFile is a symlink: {self.FILEPATH}. Stopped processing')
return
for index in range(hdulen):
self.read_data_from_hdu_lists(index)
keys=self.__dict__.keys()
self.set_key_value_pairs('HZPERK1', 'HZPERK1', 'HZKERR1', '[Hz/K] Counter cal error','TCAL2',keys)
self.set_key_value_pairs('HZPERK2', 'HZPERK2', 'HZKERR2', '[Hz/K] Counter cal error','HZKERR1',keys)
self.set_key_value_pairs('TSYS1', 'TSYS1 [K]', 'TSYSERR1', '[K] System temperature','HZKERR2',keys)
self.set_key_value_pairs('TSYS2', 'TSYS2[K]', 'TSYSERR2', '[K] System temperature','TSYSERR1',keys)
# add other important bits
# ----------------------------------
# get drift scans
# use hdu frontend to determine path to data processing
# print(self.__dict__.keys())
# sys.exit()
frontend = self.__dict__['FRONTEND']['value']
src = self.__dict__['OBJECT']['value']
freq = self.__dict__['CENTFREQ']['value']
keys=self.__dict__['keys']
values=self.__dict__['values'][:-1]
lenValues=len(values)
lenKeys=len(keys)
# create table with frequency:
# -------------------------------
# fn=
# print()
# sys.exit()
print(freq)
# if freq=='nan':
fp=self.__dict__["FILEPATH"]['value']
fn=fp.split('/')[-1]
# print(fn)
# sys.exit()
# print(freq)
tbFreq=int(freq)
table=f'{src}_{tbFreq}'.upper().replace(' ', '')
# print(table)
sys.exit()
cnx = sqlite3.connect(__DBNAME__)
dbTables= pd.read_sql_query("SELECT name FROM sqlite_schema WHERE type='table'", cnx)
tables=list(dbTables['name'])
print(tables)
if table in tables:
pass
else:
sqlStmt=''
if lenKeys==lenValues:
for i in range(lenValues):
# print('---',key,value)
if keys[i]=='FILENAME':
sqlStmt +=f'CREATE TABLE IF NOT EXISTS {table} ('
idKey = sqlStmt + "id INTEGER PRIMARY KEY AUTOINCREMENT" + ", "
sqlStmt = idKey + keys[i] + " " + "TEXT" + " UNIQUE , "
# print(sqlStmt)
else:
if keys[i]=='log' or keys[i]=='CARDS' or keys[i]=='INFOHEADER' or keys[i]=='HDULIST':
pass
else:
sqlStmt = sqlStmt + keys[i] + f" {values[i]}, "
# print(key,value)
pass
stmt=sqlStmt[:-2]+')'
print(stmt)
cnx.close()
# sys.exit()
# create_current_scan_directory()
self.create_final_plot_directory(src,tbFreq)
# sys.exit()
if 'S' in (frontend):
if '13.0S' in frontend or '18.0S' in frontend:
set_dict_item(self.__dict__,'BEAMTYPE',ScanType.SBW.name, 'wide single beam drift scan')
elif '02.5S' in frontend or '04.5S' in frontend or '01.3S' in frontend:
set_dict_item(self.__dict__,'BEAMTYPE',ScanType.SBN.name, 'narrow single beam drift scan')
else:
print(f'Unknown beam type :{frontend} - contact author to have it included\n')
sys.exit()
print('about to process')
sys.exit()
# get driftscan data from file
driftScans=DriftScans(self.__dict__)
driftScans.process_data() # process the data
del driftScans # release from memory
elif 'D' in (frontend):
set_dict_item(self.__dict__,'BEAMTYPE',ScanType.DB.name, 'dual beam drift scan')
# sys.exit()
# get driftscan data from file
driftScans=DriftScans(self.__dict__)
driftScans.process_data() # process the data
del driftScans # release from memory
# sys.exit()
[docs]
def get_hdu_info(self, hduindex: int) -> None:
"""Get information from individual hdu
Args:
hduindex (int): index of hdu to get info from
Returns:
None
"""
msg_wrapper("debug",self.log.debug,f"Getting hdulist info for index {hduindex}")
return (self.HDULIST['value'])[hduindex].header
[docs]
def read_data_from_hdu_lists(self, hduindex:int):
""" Read data from fits file hdu.
Args:
hduindex (int): index of hdu to read data from
"""
# read data from all hdu lists
hdu=self.get_hdu_info(hduindex)
hduIndexName=self.HDULIST['value'][hduindex].name
self.__dict__['CARDS'][f'{hduindex}'] = hduIndexName
cols=list(hdu) # columns from hdu lists
# print(hduIndexName)
# sys.exit()
msg_wrapper("debug",self.log.debug,f"Getting observing parameters from {hduIndexName} HEADER")
# TODO: Decide on which data you want to save in the database
# go through each column and only save relevant ones
# print(cols)
for column in cols:
# print(column)
if 'COMMENT' in column or 'SIMPLE' in column or 'BITPIX' in column or 'NAXIS' in column\
or 'EXTEND' in column or 'SIMULATE' in column or 'START' in column or 'STOP' in column\
or 'SCANS' in column or 'TTYPE' in column or 'TFORM' in column or 'TUNIT' in column \
or 'TDISP' in column or 'PCOUNT' in column or 'GCOUNT' in column or 'TFIELDS' in column\
or column=='SCAN' or 'XT' in column or 'TCALDAT' in column or 'SCANANGL' in column\
or 'TCALFRQ' in column or 'HZZER' in column or 'SCANTYPE' in column or 'STEPSEQ' in column\
or 'TCALSIG' in column or 'HAPOINTC' in column or 'DEPOINTC' in column:
# self.__dict__[f'{column}_{hduindex}'] = {'value':hdu[column],'description':hdu.comments[column]}
pass
else:
if 'BANDWDTH' in column or 'INSTRUME' in column or 'INSTFLAG' in column\
or 'CENTFREQ' in column or 'SCANDIST' in column or 'SCANTIME' in column:
if '_ZC' in hduIndexName:
self.__dict__[f'{column}'] = {'value':hdu[column],'description':hdu.comments[column]}
msg_wrapper("debug",self.log.debug,f"{column}: {str(self.__dict__[f'{column}'])}")
elif 'FRONTEND' in column:
if '_CAL' in hduIndexName:
self.__dict__[f'{column}'] = {'value':hdu[column],'description':hdu.comments[column]}
msg_wrapper("debug",self.log.debug,f"{column}: {str(self.__dict__[f'{column}'])}")
elif 'TCAL' in column or 'HZPERK' in column or 'HZKERR' in column:
# print('------',column)
try:
msg=f"{self.__dict__['FRONTEND']}, {hduIndexName}"
msg_wrapper("debug",self.log.debug,msg)
except:
self.__dict__['FRONTEND'] = {'value':hdu['FRONTEND'],'description':hdu.comments['FRONTEND']}
if 'D' in str(self.__dict__['FRONTEND']['value']) and '_CAL' in hduIndexName:
if '_CAL' in hduIndexName:
# print(column)#,hdu.columns())
# use low noise diode
try:
msg_wrapper("debug",self.log.debug,f"Using low noise diode for {self.__dict__['CENTFREQ']}")
except:
pass
self.__dict__[column] = {'value':hdu[column],'description':hdu.comments[column]}
msg_wrapper("debug",self.log.debug,f"{column}: {str(self.__dict__[f'{column}'])}")
else:
pass
elif 'S' in str(self.__dict__['FRONTEND']['value']):
# print('---',hduIndexName)
if 'Chart' in hduIndexName:
# use high noise diode
try:
msg_wrapper("debug",self.log.debug,f"Using high noise diode for {self.__dict__['CENTFREQ']}")
except:
pass
self.__dict__[f'{column}'] = {'value':hdu[column],'description':hdu.comments[column]}
msg_wrapper("debug",self.log.debug,f"{column}: {str(self.__dict__[f'{column}'])}")
else:
if '04.5' in self.__dict__['FRONTEND']['value'] and '_CAL' in hduIndexName:
# print('******',hduIndexName,self.__dict__['FRONTEND']['value'])
self.__dict__[f'{column}'] = {'value':hdu[column],'description':hdu.comments[column]}
msg_wrapper("debug",self.log.debug,f"{column}: {str(self.__dict__[f'{column}'])} - couldn't find this value in chart so may cause problems down the line")
elif column=='DATE':
date=hdu[column].split('T')
self.__dict__[f'{column}'] = {'value':hdu[column],'description':hdu.comments[column]}
self.__dict__['OBSDATE'] = {'value':date[0],'description':'Date of source observation, file creation date'}
self.__dict__['OBSTIME'] = {'value':date[1],'description':'Time of source observation'}
self.__dict__['OBSDATETIME'] = {'value':' '.join(date),'description':'Datetime of source observation'}
msg_wrapper("debug",self.log.debug,f"{column}: {str(self.__dict__[f'{column}'])}")
else:
# data.append(f'{hduindex}_{column}')
# print(column,' - ',hdu[column]," : ",hdu.comments[column], hduIndexName)
self.__dict__[f'{column}'] = {'value':hdu[column],'description':hdu.comments[column]}
msg_wrapper("debug",self.log.debug,f"{column}: {str(self.__dict__[f'{column}'])}")
[docs]
def create_final_plot_directory(self, src: str,freq: float):
"""
Create directory where final plots will be saved. The function takes
the source name and the frequency in MHz and creates a directory
with the source name and the frequency in MHz. The directory is
created if it does not already exist.
Args:
src (str): source name
freq (float): frequency in MHz
Returns:
None
"""
print(src, freq)
self.plotDir=(f'plots/{src}/{int(freq)}').replace(' ','')
msg_wrapper("info",self.log.debug,f"Creating directory to store processed plots: {self.plotDir}")
try:
os.makedirs(self.plotDir)
except:
pass