Source code for src.common.sqlite_db

# =========================================================================== #
# File    : sqlite_db.py                                                      #
# Author  : Pfesesani V. van Zyl                                              #
# =========================================================================== #

# Standard library imports
# --------------------------------------------------------------------------- #
import os
import sys
import sqlite3
import numpy as np
import time
import subprocess

from dataclasses import dataclass
# from .
try:
    from .msgConfiguration import msg_wrapper
except:
    from msgConfiguration import msg_wrapper

try:
    from .miscellaneousFunctions import set_table_name
except:
    from common.miscellaneousFunctions import set_table_name
# Local imports
# --------------------------------------------------------------------------- #
[docs] @dataclass class SQLiteDB: dbPath: str # name of database log: object # logger # __dict__: dict
[docs] def create_db(self): """ Create a new database. """ msg_wrapper('debug',self.log.debug,'Opened new Database') self.conn = sqlite3.connect(self.dbPath) self.c = self.conn.cursor()
[docs] def close_db(self): """ Close the database connection.""" msg_wrapper('debug',self.log.debug,'Closed new Database') if self.c: self.conn.close()
[docs] def commit_changes(self): """ Commit/save changes you have implemented to the database. """ self.conn.commit() print('>>>> Commited changes')
[docs] def create_table_stmt(self, data, tableName): """ Create table from dictionary. """ sqlStmt = "" for key, value in data.items(): # print(key,value, type(value).__name__) if type(value).__name__ == 'list': # pass data[key]=';'.join(value) # print(type(value)) # Make the filename a foreign key if key == "FILENAME": sqlStmt +=f'CREATE TABLE IF NOT EXISTS {tableName} (' idKey = sqlStmt + "id INTEGER PRIMARY KEY AUTOINCREMENT" + ", " sqlStmt = idKey + key + " " + "TEXT" + " UNIQUE , " elif isinstance(value, float): sqlStmt = sqlStmt + key + " " + "REAL" + " , " elif type(value).__name__ == "float64": sqlStmt = sqlStmt + key + " " + "REAL" + " , " elif isinstance(value, int): sqlStmt = sqlStmt + key + " " + "INTEGER" + " , " elif isinstance(value, str): sqlStmt = sqlStmt + key + " " + "TEXT" + " , " elif isinstance(value, list): sqlStmt = sqlStmt + key + " " + "TEXT" + " , " return sqlStmt[:-2] + ")"
[docs] def create_table(self, data, tableName): """ Create an sql statement to create a table.""" tableName=set_table_name(tableName,self.log).upper() sqlStmt = self.create_table_stmt(data, tableName) # print(sqlStmt) # sys.exit() # self.c.execute(sqlStmt) try: # sqlStmt = self.create_table_stmt(data, tableName) self.c.execute(sqlStmt) # print('added') except: # tableName=f'_{tableName}' # sqlStmt = self.create_table_stmt(data, tableName) # self.c.execute(sqlStmt) # print(data) print('Already exists') # print('added') return tableName
[docs] def insert_into_table_stmt_with_pk(self, data, tableName): """ Insert values into table and create a primary key.""" sqlStmt = "" dataListKey = list(data.keys()) dataListKeyString = "" dataListValues = list(data.values()) dataListValueString = "" # print(dataListKey) for i in range(len(data)): if i == 0: dataListKeyString = dataListKeyString + dataListKey[i] else: dataListKeyString = dataListKeyString + ", " + dataListKey[i] placeHolders = "?,"*len(data) sqlStmt = "INSERT INTO " + tableName + \ " (" + dataListKeyString + ") VALUES (" + placeHolders[:-1] + ")" return sqlStmt, dataListValues
[docs] def populate_table(self, data, tableName, key=""): """ populate a database table with values. """ try: self.create_db() except Exception as e: # print(e) pass #print('data: ',data) sqlStmt, values = self.insert_into_table_stmt_with_pk(data, tableName) # for i in range(len(values)): # self.c.execute(sqlStmt, values) # sys.exit() # print(data) # print('\n') # print('*'*50) fp=data['FILEPATH'] cf=int(data['CENTFREQ']) sp=fp.split('/') # print(f'FILEPATH: {fp}') # print(sp[-2]) # print(f'CENTFREQ: {cf}') # print(str(cf) in fp) x=fp.replace(str(sp[-2]),str(cf)) # print(x) # print('*'*50) # print('\n') # sys.exit() # print(sqlStmt) # print(values) # self.c.execute(sqlStmt, values) # print('here') # sys.exit() try: self.c.close() self.create_db() self.c.execute(sqlStmt, values) self.commit_changes() self.c.close() except sqlite3.IntegrityError as e: print('Already in file') sys.exit() except sqlite3.OperationalError as e: print('SQLite operational error, skipping until I find a solution on how to handle this error') # time.sleep(10) # cmd=['fuser', 'HART26DATA.db'] # print(f'fuser HART26DATA.db') # output = subprocess.Popen( cmd, stdout=subprocess.PIPE ).communicate()[0] # output=subprocess.run("fuser HART26DATA.db", capture_output=True).stdout # print('out: ',output) # s=os.system(f'fuser HART26DATA.db') pass except Exception as e: # TODO: fix this exception print(e) time.sleep(10) self.c.close() self.create_db() self.c.execute(sqlStmt, values) self.commit_changes() self.c.close() msg_wrapper('debug',self.log.debug,f"issue: {e}") if 'UNIQUE constraint failed:' in str(e): print('--',str(e)) if (str(cf) in fp)==False: print('hello') # try: # os.system(f'mv {fp} {x}') # print(f'moved {v1} to {newpath}') # print('Moving on\n') # except: # print(f'Can not move {v1} to {newpath}') # print('Closing program') # sys.exit() elif 'unable to open database file' in str(e): assert os.path.exists(self.dbPath), "The file doesn't exist" print('PAth: ',self.dbPath) self.c.close() self.create_db() self.c.execute(sqlStmt, values) print('catch me if you can') print(e) else: print(e, 'out') sys.exit() s=sqlStmt.split(' ') v1=values[1] v=values[1].upper() s=(s[2]).replace('_','/') # v=v[] # print(s,v) # print(s in v) # print('/'.join((v1.split('/'))[:-3])+'/'+s) newpath='/'.join((v1.split('/'))[:-3])+'/'+s if s not in v: try: os.makedirs(newpath) print('Crated new directory: ',newpath) except: print(f"Cant create {newpath}, already exists") try: os.system(f'mv {v1} {newpath}') print(f'moved {v1} to {newpath}') print('Moving on\n') except: print(f'Can not move {v1} to {newpath}') print('Closing program') sys.exit() # # if s in v: # # print('\n',s, v,'\n') else: print(f'File {s} already exists in the database') # sys.exit() sys.exit()
[docs] def set_database_name(self, databaseName): """ Set the name of the database. """ msg_wrapper("debug", self.log.debug, "Setup database name") if ".db" in databaseName: self.databaseName = databaseName else: self.databaseName = databaseName+".db"
[docs] def get_table_names(self,db): """Get table names from the database. Args: db (str): The name of the database Returns: table_names (list): List of table names """ msg_wrapper('debug',self.log.debug,f"Getting tables from: {db}") self.set_database_name(db) table_names = [] sql_stmt = "SELECT name FROM sqlite_master WHERE type = 'table';" # print(sql_stmt) # tables = self.c.execute(sql_stmt).fetchall() try: tables = self.c.execute(sql_stmt).fetchall() except sqlite3.OperationalError: print("Failed to fetch data from the server") sys.exit() # print('Tables: ',tables) for i in range(len(tables)): if tables[i][0].startswith("data"): table_names.append(tables[i][0]) for i in range(len(tables)): if tables[i][0].startswith("sqlite_sequence"): pass else: table_names.append(tables[i][0]) return table_names
[docs] def get_rows(self, tbname): """ Get the rows in the database table. Parameters ---------- tbname : str table name Returns ------- rows: str table row list """ # print(tbname) # open the database # self.create_db() # read from selected table stmt = f"SELECT * FROM '{tbname}' ORDER BY FILENAME ASC;" # print(stmt) self.c.execute(stmt) data = self.c.fetchall() # get filenames and return them rows = [] for row in data: rows.append(row) return rows
[docs] def get_all_table_coloumns(self, table_name): """ Get coloumns of table return index, coloumn name and coloumn type """ col_ind = [] col_name = [] col_type = [] res = self.c.execute("PRAGMA table_info('%s') " % table_name).fetchall() # print(res) # sys.exit() for i in range(len(res)): col_ind.append(res[i][0]) col_name.append(res[i][1]) col_type.append(res[i][2]) return col_ind, col_name, col_type
[docs] def get_rows_of_cols(self, tbname,cols): """ Get the rows in the database table. Parameters ---------- tbname : str table name Returns ------- rows: str table row list """ print('Getting rows of cols for table '+tbname) colNames="" for i in range(len(cols)): if i == len(cols)-1: colNames = colNames + cols[i]+" " else: colNames = colNames + cols[i]+", " # read from selected table stmt = "SELECT "+colNames[:-1]+" FROM '"+tbname+"' ORDER BY FILENAME ASC;" # print(stmt) # print('\nExecuting: ',stmt,'\n') self.c.execute(stmt) data = self.c.fetchall() # get filenames and return them rows = [] for row in data: rows.append(row) return rows