Source code for csengine.utility

"""
    @Name:                  Cam Engine Utilities
    @Description:           This file includes all the Utility Classes. These are used by
                            the Core application as well as the Plugins.
    @Created on:            Feb-2021
    @Created by:            Vinimay Kaul
    @Last Modified:         17-March_2021
    @Last Modified by:      Vinimay Kaul
"""

import os, sys, inspect, base64
import subprocess, pkgutil
import importlib, importlib.util
import traceback
import csv

import inspect
from sys import platform
import os.path
from os import path
from os.path import expanduser
from datetime import datetime

from pathlib import Path
from subprocess import Popen

import logging
from logging.handlers import RotatingFileHandler
from logging.handlers import TimedRotatingFileHandler

import AppStrings
from csengine.globals import CEEnvironment
from csengine import globals

from io import StringIO  # Python 3
import sys

# Class for Debug Utility
[docs] class DebugUtility: logfile = None exc_hook = None stderr_buf = None LOG_LIFE_TIME = 3 #Lifetime of log files in days debugFile = None debugToFile = None log_path = "./" # intializing. If value not changed, then logs will appear in the app folder log_level = 0 # 0 for Logging Off | 1 for Errors Only | 2 for Errors and Log | 3 for Verbose logging frozen = False printActive = False FORMATTER = logging.Formatter("%(asctime)s%(name)s%(levelname)s%(funcName)s:%(lineno)d%(message)s") LOG_FILE = "C:\\CSLOGS\\logs.txt"
[docs] @staticmethod def get_console_handler(): console_handler = logging.StreamHandler(sys.stdout) console_handler.setFormatter(DebugUtility.FORMATTER) return console_handler
[docs] @staticmethod def get_file_handler(logfile): ## We can used TimedRotatingFileHandler if we need to clear logs at some scheduled time daily # file_handler = TimedRotatingFileHandler(DebugUtility.LOG_FILE, when='midnight') # But akes sense to use RotatingFileHandler based on max filesize file_handler = RotatingFileHandler(logfile, maxBytes=8000000, backupCount=2) file_handler.setFormatter(DebugUtility.FORMATTER) return file_handler
[docs] @staticmethod def get_logger(logger_name, level, logfile): logger = logging.getLogger(logger_name) # The log level value is set from the number on DebugFile read in Camskool.py before calling DebugInit if level >= 2: logger.setLevel(logging.DEBUG) elif level < 2 and level > 1: logger.setLevel(logging.INFO) else: logger.setLevel(logging.WARN) logger.addHandler(DebugUtility.get_console_handler()) logger.addHandler(DebugUtility.get_file_handler(logfile)) logger.propagate = False return logger
[docs] @staticmethod def purgeOldLogs(): #remove all files that are older than a certain age(LOG_LIFE_TIME)! now = datetime.now() dir_list = os.listdir(DebugUtility.logdir) for file in dir_list: path = str(Path(DebugUtility.logdir) / file) if os.path.isfile(path): file_mod_time = datetime.fromtimestamp(os.path.getmtime(path)) age = now - file_mod_time if age.days > DebugUtility.LOG_LIFE_TIME: os.remove(path)
[docs] @staticmethod def initLogging(): if OSUtility.GetPlatformID() == 'MAC' and not globals.CS_LOGGER: logdir = "/Users/Shared/CamSkool/Logging/" elif OSUtility.GetPlatformID() == 'WIN' and not globals.CS_LOGGER and not len(str(DebugUtility.log_path)) < 4: logdir = str(CEEnvironment.GetAppDataFolder().absolute() / "Logging") # log_path value comes from CreateWindowsPath() method below else: logdir = str(CEEnvironment.GetAppDataFolder().absolute() / "Logging") # log_path value comes from CreateWindowsPath() method below if not globals.CS_LOGGER: from kivy.config import Config # intialize Kivy Config Config.set('kivy', 'log_enable', '0') Config.set('kivy', 'log_level', 'info') Config.set('kivy', 'log_name', 'camskool_%y-%m-%d_%_.txt') Config.set('kivy', 'log_dir', logdir) Config.set('kivy', 'log_maxfiles', '20') Config.set('kivy', 'log_enable', '1') from kivy.logger import Logger, FileHandler globals.CS_LOGGER = Logger #DebugUtility.get_logger("cs_logger", DebugUtility.log_level, filepath) globals.CS_LOGGER.debug("################## LOGGING INITIALIZED ###################") #logging.basicConfig(filename=filepath, format="%(asctime)s — %(name)s — %(levelname)s — %(funcName)s:%(lineno)d — %(message)s") DebugUtility.logdir = logdir DebugUtility.purgeOldLogs() #DebugUtility.exc_hook = exceptionhookpath if getattr(sys, 'frozen', False): DebugUtility.frozen = True #DebugUtility.redirect_stderr() return
[docs] @staticmethod def activatePrint(activate = True): DebugUtility.printActive = activate
[docs] @staticmethod def closeLogging(): if DebugUtility.debugFile: DebugUtility.debugFile.flush() DebugUtility.debugFile.close() DebugUtility.debugFile = None
# USE THE Log() METHOD ONLY FOR ANNOUNCING BEGIN OR END OF ACTIONS
[docs] @staticmethod def Log(data): #THIS FUNCTION WILL LOG IN THE LOGGING FILE ONLY IF LOG LEVEL IS SET TO 2 OR HIGHER now = datetime.now() formatted_date_time = now.strftime(" %Y-%m-%d %H-%M-%S : ") if globals.CS_LOGGER: globals.CS_LOGGER.info(str(formatted_date_time + str(data))) else: print(data) if DebugUtility.printActive: print(data) return
# USE THE Debug() METHOD IN MOST SITUATIONS
[docs] @staticmethod def Debug(data): #THIS FUNCTION WILL LOG IN THE LOGGING FILE ONLY IF LOG LEVEL IS SET TO 3 OR HIGHER now = datetime.now() formatted_date_time = now.strftime(" %Y-%m-%d %H-%M-%S : ") if globals.CS_LOGGER: globals.CS_LOGGER.debug(formatted_date_time + str(data)) else: print(data) return
# USE THE Print() METHOD ONLY FOR ANNOUNCING ON CONSOLE OR TEMPORARY DEBUGGING
[docs] @staticmethod def Print(data): #THIS FUNCTION WILL PRINT ON CONSOLE. IT WILL ALWAYS PRINT IRRESPECTIVE OF LOG LEVEL print(str(data))
# use the above function for temporary debugging reasons only. # USE THE LogToFile() METHOD WHEN YOU NEED TO LOG TO A DIFFERENT FILE THAN THE LOGS.LOG FILE
[docs] @staticmethod # DEPRACATED def LogToFile(data, filename): #THIS FUNCTION WILL LOG IN THE LOGGING FILE ONLY IF LOG LEVEL IS SET TO 2 OR HIGHER now = datetime.now().strftime("%H:%M:%S.%f")[:-3] DebugUtility.debugToFile = open(DebugUtility.log_path + "\\" + filename, "a") if DebugUtility.debugToFile and DebugUtility.log_level > 1: DebugUtility.debugToFile.write(now + " |--| " + str(data) + "\n") DebugUtility.debugToFile.flush() DebugUtility.debugToFile.close() DebugUtility.debugToFile = None
# USE THE Err() METHOD ONLY FOR ANNOUNCING ERRORS. MOSTLY WITHIN EXCEPTIONS
[docs] @staticmethod def Err(data, i, console=""): #THIS FUNCTION WILL LOG IN THE LOGGING FILE ONLY IF LOG LEVEL IS SET TO 1 OR HIGHER now = datetime.now() formatted_date_time = now.strftime(" %Y-%m-%d %H-%M-%S : ") if globals.CS_LOGGER: globals.CS_LOGGER.error(formatted_date_time + str(data)) else: print(data) if not console == "": print(console) if not DebugUtility.frozen: try: sys.stdout.write(str(data) + "\n") sys.stdout.write(traceback.format_exc()) sys.stdout.flush() except: pass return
# USE InspectFrame() Method Before calling any of the logging methods and use the line number file name etc. # USE ONLY FOR ERRORS NOT LOGS
[docs] @staticmethod def InspectFrame(): f = inspect.currentframe() i = inspect.getframeinfo(f.f_back) return i
# Exception Hook that will be called if Main Thread Crashes
[docs] @staticmethod def csExceptionHook(type, value, err_traceback): logpath = "./" tb = traceback.extract_tb(err_traceback).format() tbs = "\n \n" + str(datetime.now().strftime("%H:%M:%S.%f")[:-3]) + "\n" count = 0 for line in tb: # filter out recursion Exception caused by kivy sometimes if count < 10: count += 1 if "Base: Leaving application in progress...')" in line: return tbs += line DebugUtility.Err(tbs,None) DebugUtility.Err(type,None) DebugUtility.Err(value,None)
# Class for all File related Utilities
[docs] class FileUtility:
[docs] @staticmethod def SaveToFile(filepath, filename, filedata, append = False): filepath = Path(filepath) filename = Path(filename) fullfilepath = filepath / filename DebugUtility.Log("Saving to File: " + str(fullfilepath)) # This method return True if the data has been saved successfully to the file else returns False. if os.path.exists(filepath): try: if append: f = open(fullfilepath, "a") # Open File in the current folder called cred.txt else: f = open(fullfilepath, "w") # Open File in the current folder called cred.txt f.write(filedata) # Save Entered Username & Password Locally f.close() except: DebugUtility.Err("Could not Save to File.", DebugUtility.InspectFrame()) return False else: DebugUtility.Log(("Successfully Saved to File: " + str(fullfilepath))) return True else: DebugUtility.Err("Specified Path Does Not Exist. Creating Directory.", DebugUtility.InspectFrame()) return False
[docs] @staticmethod def SaveBinaryToFile(filepath, filename, filedata): filepath = Path(filepath) filename = Path(filename) fullfilepath = filepath / filename DebugUtility.Log("Saving to File: " + str(fullfilepath)) # This method return True if the data has been saved successfully to the file else returns False. if os.path.exists(filepath): try: f = open(fullfilepath, "w+b") # Open File in the current folder called cred.txt f.write(filedata) # Save data to file f.close() except: DebugUtility.Err("Could not Save to File.", DebugUtility.InspectFrame()) return False else: DebugUtility.Log(("Successfully Saved to File: " + str(fullfilepath))) return True else: DebugUtility.Err("Specified Path Does Not Exist. Creating Directory.", DebugUtility.InspectFrame()) return False
[docs] @staticmethod def ReadAllFromFile(filepath, filename): filepath = Path(filepath) filename = Path(filename) fullfilepath = filepath / filename DebugUtility.Log("Reading All from File: " + str(fullfilepath)) if os.path.exists(fullfilepath): readData = [] try: f = open(fullfilepath, "r") for line in f: readData.append(line.rstrip("\n")) f.close() except FileNotFoundError: DebugUtility.Err("Error reading file at FileUtility.ReadAllFromFile : " + FileNotFoundError.strerror, DebugUtility.InspectFrame()) return [0] except IOError: DebugUtility.Err("Error reading file at FileUtility.ReadAllFromFile : " + IOError.strerror, DebugUtility.InspectFrame()) return [0] except: DebugUtility.Err("Error reading file at FileUtility.ReadAllFromFile : UNKNOWN", DebugUtility.InspectFrame()) return [0] else: return readData else: DebugUtility.Log("File Not Found : " + str(fullfilepath)) return [0]
[docs] @staticmethod def ReadLinesFromFile(filepath, filename, amount): filepath = Path(filepath) filename = Path(filename) fullfilepath = filepath / filename DebugUtility.Log("Reading " + str(amount) + " lines from File: " + str(fullfilepath)) if os.path.exists(fullfilepath): readData = [] try: i = 0 f = open(fullfilepath, "r") while (i < amount): rdString = f.readline().rstrip("\n") if rdString != '': readData.append(rdString) i = i + 1 f.close() except FileNotFoundError: DebugUtility.Err("Error reading file at FileUtility.ReadLineFromFile : " + FileNotFoundError.strerror, DebugUtility.InspectFrame()) return [0] except IOError: DebugUtility.Err("Error reading file at FileUtility.ReadLineFromFile : " + IOError.strerror, DebugUtility.InspectFrame()) return [0] except: DebugUtility.Err("Error reading file at FileUtility.ReadLineFromFile : UNKNOWN" + str(len(readData)), DebugUtility.InspectFrame()) if len(readData) > 0: return readData return [0] else: return readData else: DebugUtility.Err("File Not Found : " + str(fullfilepath), DebugUtility.InspectFrame()) return [0]
[docs] @staticmethod def ReadLineNumberFromFile(filepath, filename, index): filepath = Path(filepath) filename = Path(filename) fullfilepath = filepath / filename DebugUtility.Log("Reading value at line: " + str(index) + " in File: " + str(fullfilepath)) if os.path.exists(fullfilepath): readData = [] try: f = open(fullfilepath, "r") for i, line in enumerate(f): if i == index + 1: readData.append(f.readline().rstrip("\n")) break f.close() except FileNotFoundError: DebugUtility.Err( "Error reading file at FileUtility.ReadLineFromFile : " + FileNotFoundError.strerror, DebugUtility.InspectFrame()) return [0] except IOError: DebugUtility.Err("Error reading file at FileUtility.ReadLineFromFile : " + IOError.strerror, DebugUtility.InspectFrame()) return [0] except: DebugUtility.Err("Error reading file at FileUtility.ReadLineFromFile : UNKNOWN", DebugUtility.InspectFrame()) return [0] else: return readData else: DebugUtility.Log("File Not Found : " + str(fullfilepath)) return [0]
################################################################ ################################################################ ################################################################ # Class for all OS related Methods
[docs] class OSUtility:
[docs] @staticmethod def Restart(testMode=False): testM = "" if testMode: testM = ' --test-mode' if OSUtility.GetPlatformID() == "WIN": if getattr(sys, 'frozen', False): application_path = os.path.dirname(sys.executable) OSUtility.ExecuteNonBlocking('timeout /t 1 && "' + application_path + '\\CamSkool.exe"' + testM) else: OSUtility.ExecuteNonBlocking('timeout /t 1 && python "' + os.getcwd() + '\\CamSkool.py"' + testM) else: application_path = OSUtility.GetMacBundlePath() # get the right path even if loaded as a bundle DebugUtility.Log(application_path) if getattr(sys, 'frozen', False): if testMode: testM = '--args --test-mode' OSUtility.ExecuteNonBlocking('sleep 2; open "' + application_path + '/../../../CamSkool.app" ' + testM ) else: OSUtility.ExecuteNonBlocking('sleep 1; python "' + application_path + '/CamSkool.py"' + testM) from kivy.app import App App.get_running_app().stop()
[docs] @staticmethod def GetPlatformID(): if platform == 'win32': PlatformID = 'WIN' # for Windows elif platform == 'darwin': PlatformID = 'MAC' # for Mac else: PlatformID = 'NON' # for anything else for now. return PlatformID
[docs] @staticmethod def ExpandMacPath(user, m_path): assert ('darwin' in sys.platform) DebugUtility.Log("Expanding Mac Path : " + user + " and " + m_path) # function that expands user and adds the path to it. For example GetMacPath("~", ".rewardpanel") home = expanduser(user) folder_path = home + m_path os.makedirs(folder_path, exist_ok=True) return str(folder_path)
[docs] @staticmethod def MakeMacPath(folder_path): assert ('darwin' in sys.platform) # Function that creates the given path on Mac # returns True if it was able to make a directory. For Example MakeMacPath(ExpandMacPath("~", ".rewardpanel/")) if not path.exists(folder_path): try: # drwxr-xr-x DebugUtility.Log("Creating Folder For Mac") os.mkdir(folder_path, 0o755) except OSError: DebugUtility.Err(OSError, DebugUtility.InspectFrame()) return False else: DebugUtility.Log("Successfully created folder for Mac.") return True else: DebugUtility.Log("Path :" + str(folder_path) + "already exists. No need to create") return True
[docs] @staticmethod def GetMacBundlePath(): assert ('darwin' in sys.platform) # Function that is used to make sure the correct path is detected when bundled via PyInstaller. # get the right path even if loaded as a bundle! if getattr(sys, 'frozen', False): # If the application is run as a bundle, the PyInstaller bootloader # extends the sys module by a flag frozen=True application_path = os.path.dirname(sys.executable) return application_path else: return os.getcwd()
#return os.path.dirname(os.path.abspath(__file__))
[docs] @staticmethod def GetWindowsBundlePath(): assert ('darwin' in sys.platform) # Function that is used to make sure the correct path is detected when bundled via PyInstaller. # get the right path even if loaded as a bundle! if getattr(sys, 'frozen', False): # If the application is run as a bundle, the PyInstaller bootloader # extends the sys module by a flag frozen=True application_path = os.path.dirname(sys.executable) return application_path else: return os.getcwd()
# return os.path.dirname(os.path.abspath(__file__))
[docs] @staticmethod def InstallMacCamLib(application_path, plugin_path, pkg_path, audio_plugin_path): assert ('darwin' in sys.platform) # check if the plugin is installed - otherwise run the camera installer # For Example installMacCamLib("", "/Library/CoreMediaIO/Plug-Ins/DAL/RewardPanelCamera.plugin", # "/../Resources/RewardPanel_Camera_Installer.pkg") FileUtility.SaveToFile("/tmp/", "rp_file_location.txt", application_path) #here we always want to install! if True: #not path.exists(plugin_path) or not path.exists(audio_plugin_path + "\n"): #os.system("open " + application_path + pkg_path) #install using osascript applescript = ('do shell script "cd $(cat /tmp/rp_file_location.txt | sed \'s/%//g\') ; pwd >> /tmp/install_plugins.log; ../Resources/installPlugins >> /tmp/install_plugins.log"' 'with administrator privileges') subprocess.call(['osascript', '-e', applescript]) DebugUtility.Log("Installing Mac Library from Package") return True else: DebugUtility.Log("Mac Library already found. No need to reinstall.") return False
[docs] def ExecuteNonBlocking( command): #assert ('darwin' in sys.platform) # check if the plugin is installed - otherwise run the camera installer # For Example installMacCamLib("", "/Library/CoreMediaIO/Plug-Ins/DAL/RewardPanelCamera.plugin", # "/../Resources/RewardPanel_Camera_Installer.pkg") try: process = subprocess.Popen(command, shell=True) DebugUtility.Log("Execution of file: " + str("application_path" + path) + "SUCCESSFULL - with returncode: " + str(process.returncode) ) return True except: DebugUtility.Log("Execution of command: " + str(command) + "FAILED") return False
[docs] @staticmethod def isPluginsInstalled(application_path, plugin_path, pkg_path, audio_plugin_path): assert ('darwin' in sys.platform) # check if the plugins are installed print(pkg_path) print(os.popen("pwd")) installedAudioVersion = os.popen( "defaults read /Library/Audio/Plug-Ins/HAL/RewardPanelAudio.driver/Contents/Info CFBundleShortVersionString").read().replace("\n", "").replace(" ", "") localAudioVersion = os.popen( "defaults read " + str(os.getcwd()) + "/../Resources/RewardPanelAudio.driver/Contents/Info CFBundleShortVersionString").read().replace("\n", "").replace(" ", "") DebugUtility.Log("Plugin Versions: " + str(installedAudioVersion) + " " + str(localAudioVersion)) audioPluginMatches = False if installedAudioVersion == localAudioVersion: DebugUtility.Log("Audio Driver Version matches") audioPluginMatches = True pkg_version = globals.APPLICATION_VERSION try: with open(CEEnvironment.GetAppDataFolder() / "version.txt", "r") as f: version = f.read() if not version == pkg_version: int("abc") except: audioPluginMatches = False DebugUtility.Log("Camskool Version does not match - re-install") with open(CEEnvironment.GetAppDataFolder() / "version.txt", "w") as f: f.write(pkg_version) if not path.exists(audio_plugin_path) or not audioPluginMatches: return False else: DebugUtility.Log("Mac Library already found. No need to reinstall.") return True
[docs] def Execute(application_path, path): assert ('darwin' in sys.platform) # check if the plugin is installed - otherwise run the camera installer # For Example installMacCamLib("", "/Library/CoreMediaIO/Plug-Ins/DAL/RewardPanelCamera.plugin", # "/../Resources/RewardPanel_Camera_Installer.pkg") try: process = subprocess.Popen(application_path + path, shell=False, stdout=subprocess.PIPE) process.wait() DebugUtility.Log("Execution of file: " + str(application_path + path) + "SUCCESSFULL - with returncode: " + str(process.returncode) ) return True except: DebugUtility.Log("Execution of file: " + str(application_path + path) + "FAILED") return False
# Create User Directories on Windows
[docs] @staticmethod def CreateWindowsPath(user, pth): app_path = Path(os.path.expanduser(user + pth)) if not os.path.exists(app_path): try: os.makedirs(app_path) except OSError as e: DebugUtility.Log("FileUtility:: Error Creating Path ::" + e) finally: DebugUtility.Log("FileUtility::Created Path: " + str(app_path)) return app_path else: DebugUtility.Log("FileUtility::Path " + str(app_path) + " already exists") DebugUtility.log_path = app_path # Set the Logging folder path for Windows return app_path
# Create User Directories on Windows
[docs] @staticmethod def CreateMacPath(user, pth): app_path = Path(os.path.expanduser(user)) / Path(pth) DebugUtility.log_path = app_path # Set the Logging folder path for Windows DebugUtility.Print("creating macpath" + str(app_path)) if OSUtility.MakeMacPath(app_path): return app_path else: return Path("./")
# Function to create a File choosing Dialog # @staticmethod # def GetFileFromSystem(): # return fd.askopenfilename() ################################################################ ################################################################ ################################################################
[docs] class PluginUtility: _plugins_list = [] #list of modules to be imported (string names) Plg_Anim_List = [] # All the items in this array will be added to Animation section Plg_Tool_List = [] # All the items in this array will be added to Tool section Plg_Img_List = [] # # All the items in this array will be added to Image section Plg_Sound_List = [] # # All the items in this array will be added to Sound section Plg_SEffect_List = [] # All the items in this array will be added as Stackable Effects section Plg_Favorites = [] # All plugins that have been favorited. Plg_SearchResults = [] # This will only store current search results. Will be emptied when no search is there Plg_SearchWeights= [] # This will be a list of the weights of the search result items
[docs] @staticmethod def BrowsePlugins(ns_pkg): # Specifying the second argument (prefix) to iter_modules makes the # returned name an absolute name instead of a relative one. This allows # import_module to work without having to do additional modification to # the name. return pkgutil.iter_modules(ns_pkg.__path__, ns_pkg.__name__ + ".")
[docs] @staticmethod def GetAllPlugins(ns_pkg): DebugUtility.Log("##################################################") DebugUtility.Log("Getting All Plugins") iter_plugin = PluginUtility.BrowsePlugins(ns_pkg) for _, name, _ in iter_plugin: # importlib.import_module(name) PluginUtility._plugins_list.append(name) return PluginUtility._plugins_list
[docs] @staticmethod def validatePlugin(name): ### Security stuff - make sure the user is allowed to load that plugin! ce = globals.GLOBAL_CE plan = globals.USER_PLAN if CEEnvironment.TEST_MODE_ACTIVE: return True #Maybe this is required for windows as well??? if OSUtility.GetPlatformID() == 'MAC': path = str(CEEnvironment.GetAppDataFolder() / "PluginsCE" / name / "__pycache__") if os.path.exists(path): #Remove __pycache__ folder which can destroy the checksum calculation - not possible directly from python Popen("rm -rf " + path, shell=True).communicate() if not ce.validatePlugin( CEEnvironment.GetAppDataFolder() / "PluginsCE" / name): DebugUtility.Log("Error Loading Plugin - Signature Invalid!") return False try: with open(CEEnvironment.GetAppDataFolder() / "PluginsCE" / name / "plans.sec", "r") as planfile: supportedPlans = int(planfile.read()) if plan < supportedPlans: DebugUtility.Log( "Current Subscription " + str(plan) + " does not support this plugin. Supported plans: " + str( supportedPlans)) return False except Exception as e: DebugUtility.Err("Error while getting plugin plan", e) return False #plugin succesfully verified return True
[docs] @staticmethod def ActivatePlugins(): # Import and run all Plugins detected # Each Plugin will load itself to the DebugUtility.Log("##################################################") DebugUtility.Log("Activating Plugins Init") #plan, valid = ce.getPlanValidation(CEEnvironment.GetAppDataFolder()) ce = globals.GLOBAL_CE plan = globals.USER_PLAN valid = globals.USER_VALID DebugUtility.Log("USERINFO: Plan is valid (" + str(valid) + ") with planID: " + str(plan)) for modname in PluginUtility._plugins_list: DebugUtility.Log("Importing " + str(modname)) try: # Security - check if package contains a valid checksum! targetNames = modname.split(".") if not PluginUtility.validatePlugin(str(targetNames[len(targetNames) - 1])): continue #register plugin DebugUtility.Log("Importing path " + str(os.getcwd())) importlib.import_module(modname) except Exception as e: DebugUtility.Err("EXCEPTION LOADING PLUGIN", DebugUtility.InspectFrame()) DebugUtility.Log(modname) DebugUtility.Log(str(e))
[docs] @staticmethod def searchUtility(keywords, searchType="all"): searchText = keywords.split() PluginUtility.Plg_SearchResults = [] PluginUtility.Plg_SearchWeights = [] if searchType == "all": anim = PluginUtility.searchAnim(searchText) tool = PluginUtility.searchTool(searchText) img = PluginUtility.searchImage(searchText) return PluginUtility.Plg_SearchResults, PluginUtility.Plg_SearchWeights
[docs] @staticmethod def searchAnim(searchText): for plugin in PluginUtility.Plg_Anim_List: weight = 0 for word in searchText: if word.lower() in plugin.name.lower(): weight += 2 if word.lower() in plugin.description.lower(): weight += 1 if word.lower() in plugin.plugin_name.lower(): weight += 2 if any(word.lower() in tag.rstrip().lstrip() for tag in plugin.tags): weight += 2 if weight != 0 and plugin not in PluginUtility.Plg_SearchResults: PluginUtility.Plg_SearchResults.append(plugin) PluginUtility.Plg_SearchWeights.append(weight)
[docs] @staticmethod def searchTool(searchText): for plugin in PluginUtility.Plg_Tool_List: weight = 0 for word in searchText: if word.lower() in plugin.name.lower(): weight += 2 if word.lower() in plugin.description.lower(): weight += 1 if word.lower() in plugin.plugin_name.lower(): weight += 2 if any(word.lower() in tag.rstrip().lstrip() for tag in plugin.tags): weight += 2 if weight != 0 and plugin not in PluginUtility.Plg_SearchResults: PluginUtility.Plg_SearchResults.append(plugin) PluginUtility.Plg_SearchWeights.append(weight)
[docs] @staticmethod def searchImage(searchText): for plugin in PluginUtility.Plg_Img_List: weight = 0 for word in searchText: if word.lower() in plugin.name.lower(): weight += 2 if word.lower() in plugin.description.lower(): weight += 1 if word.lower() in plugin.plugin_name.lower(): weight += 2 if any(word.lower() in tag.rstrip().lstrip() for tag in plugin.tags): weight += 2 if weight != 0 and plugin not in PluginUtility.Plg_SearchResults: PluginUtility.Plg_SearchResults.append(plugin) PluginUtility.Plg_SearchWeights.append(weight)
[docs] @staticmethod def SaveFavoritesToFile(): favs = "" for item in PluginUtility.Plg_Favorites: favs += item.plugin_name favs += "," FileUtility.SaveToFile(str(CEEnvironment.GetAppDataFolder()), "favorites.csk", favs[:-1], False)
[docs] @staticmethod def LoadFavoritesFromFile(pluginObject): favs = FileUtility.ReadLinesFromFile(str(CEEnvironment.GetAppDataFolder()) , "favorites.csk", 1) try: favList = favs[0].split(",") except: return for item in favList: if item == pluginObject.plugin_name and pluginObject not in PluginUtility.Plg_Favorites: PluginUtility.Plg_Favorites.append(pluginObject)
################################################################ ################################################################ ################################################################
[docs] class MiscUtility: # Encrypt to Base64
[docs] @staticmethod def Encrypt(data): d = data.encode("utf-8") cr = base64.b64encode(d) return cr
# Decrypt from Base64 to Text
[docs] @staticmethod def Decrypt(data): d = base64.b64decode(data) cr = d.decode("utf-8") return cr
[docs] @staticmethod def SetupAppFolder(): if (OSUtility.GetPlatformID() == 'WIN'): appfolder = OSUtility.CreateWindowsPath('~', AppStrings.STR_APPDATA_PATH_WIN) elif (OSUtility.GetPlatformID() == 'MAC'): appfolder = OSUtility.CreateMacPath('/', AppStrings.STR_APPDATA_PATH_MAC) else: appfolder = "./" CEEnvironment.SetAppDataFolder(appfolder) return appfolder
[docs] class FireBaseUtility: def __init__(self): self.firebase = None#FirebaseAuth() self.storage = self.firebase.fbstorage self.cloudPath = "Logs/"
[docs] def uploadFilefilePath(self,cloudFileName, localFileName): if os.path.exists(localFileName): try: DebugUtility.Log("Uploading Log : " + str(localFileName)) print(self.cloudPath + cloudFileName) print(localFileName) self.storage.child(self.cloudPath + cloudFileName).put(localFileName) except Exception as e: DebugUtility.Err(e, DebugUtility.InspectFrame(), console="Could Not Upload Logs to Firebase. Check Logs") else: DebugUtility.Log(str(localFileName) + " not found. Not uploading")
[docs] class NotificationUtility: NotificationObject = None
[docs] @staticmethod def setNotificationObject(object): NotificationUtility.NotificationObject = object
[docs] @staticmethod def notify(message, auto_close=True): NotificationUtility.NotificationObject.notify(text=message, auto_close=auto_close)