From 5b31823c272d59bb3f2da2ce67e1d1c0d1f5ed80 Mon Sep 17 00:00:00 2001 From: Markus Nyman Date: Thu, 12 Jan 2023 14:28:59 +0200 Subject: [PATCH] Config to dataclass --- TimeToTrakt.py | 99 +++++++++++++++++++++++--------------------------- 1 file changed, 45 insertions(+), 54 deletions(-) diff --git a/TimeToTrakt.py b/TimeToTrakt.py index a3a36ee..b61d465 100644 --- a/TimeToTrakt.py +++ b/TimeToTrakt.py @@ -6,14 +6,14 @@ import os import re import sys import time +from dataclasses import dataclass from datetime import datetime -from pathlib import Path import trakt.core from tinydb import Query, TinyDB from trakt import init -from trakt.tv import TVShow from trakt.movies import Movie +from trakt.tv import TVShow # Setup logger logging.basicConfig( @@ -38,48 +38,42 @@ class Expando(object): pass +@dataclass +class Config: + trakt_username: str + client_id: str + client_secret: str + gdpr_workspace_path: str + + def isAuthenticated(): with open("pytrakt.json") as f: data = json.load(f) daysBeforeExpiration = ( - datetime.fromtimestamp(data["OAUTH_EXPIRES_AT"]) - datetime.now() + datetime.fromtimestamp(data["OAUTH_EXPIRES_AT"]) - datetime.now() ).days if daysBeforeExpiration < 1: return False return True -def getConfiguration(): - configEx = Expando() - +def getConfiguration() -> Config: with open("config.json") as f: data = json.load(f) - configEx.TRAKT_USERNAME = data["TRAKT_USERNAME"] - configEx.CLIENT_ID = data["CLIENT_ID"] - configEx.CLIENT_SECRET = data["CLIENT_SECRET"] - configEx.GDPR_WORKSPACE_PATH = data["GDPR_WORKSPACE_PATH"] - - CONFIG_SINGLETON = configEx - - return CONFIG_SINGLETON + return Config( + data["TRAKT_USERNAME"], + data["CLIENT_ID"], + data["CLIENT_SECRET"], + data["GDPR_WORKSPACE_PATH"], + ) config = getConfiguration() -# Return the path to the CSV file contain the watched episode and movie data from TV Time - - -def getWatchedShowsPath(): - return config.GDPR_WORKSPACE_PATH + "/seen_episode.csv" - - -def getFollowedShowsPath(): - return config.GDPR_WORKSPACE_PATH + "/followed_tv_show.csv" - - -def getMoviesPath(): - return config.GDPR_WORKSPACE_PATH + "/tracking-prod-records.csv" +WATCHED_SHOWS_PATH = config.gdpr_workspace_path + "/seen_episode.csv" +FOLLOWED_SHOWS_PATH = config.gdpr_workspace_path + "/followed_tv_show.csv" +MOVIES_PATH = config.gdpr_workspace_path + "/tracking-prod-records.csv" def initTraktAuth(): @@ -88,10 +82,10 @@ def initTraktAuth(): # Set the method of authentication trakt.core.AUTH_METHOD = trakt.core.OAUTH_AUTH return init( - config.TRAKT_USERNAME, + config.trakt_username, store=True, - client_id=config.CLIENT_ID, - client_secret=config.CLIENT_SECRET, + client_id=config.client_id, + client_secret=config.client_secret, ) @@ -213,8 +207,7 @@ def getShowByName(name, seasonNo, episodeNo): # Query the local database for existing selection userMatchedQuery = Query() - queryResult = userMatchedShowsTable.search( - userMatchedQuery.ShowName == name) + queryResult = userMatchedShowsTable.search(userMatchedQuery.ShowName == name) # If the local database already contains an entry for a manual selection # then don't bother prompting the user to select it again! @@ -339,7 +332,7 @@ def processWatchedShows(): # Total amount of rows in the CSV file errorStreak = 0 # Open the CSV file within the GDPR exported data - with open(getWatchedShowsPath(), newline="") as csvfile: + with open(WATCHED_SHOWS_PATH, newline="") as csvfile: # Create the CSV reader, which will break up the fields using the delimiter ',' showsReader = csv.DictReader(csvfile, delimiter=",") # Get the total amount of rows in the CSV file, @@ -400,7 +393,7 @@ def processWatchedShows(): break # Show the progress of the import on-screen logging.info( - f"({rowsCount+1}/{rowsTotal}) - Processing '{tvShowName}' Season {tvShowSeasonNo} / Episode {tvShowEpisodeNo}" + f"({rowsCount + 1}/{rowsTotal}) - Processing '{tvShowName}' Season {tvShowSeasonNo} / Episode {tvShowEpisodeNo}" ) # Get the season from the Trakt API season = traktShowObj.seasons[ @@ -412,8 +405,7 @@ def processWatchedShows(): episode.mark_as_seen(tvShowDateWatchedConverted) # Add the episode to the local database as imported, so it can be skipped, # if the process is repeated - syncedEpisodesTable.insert( - {"episodeId": tvShowEpisodeId}) + syncedEpisodesTable.insert({"episodeId": tvShowEpisodeId}) # Clear the error streak on completing the method without errors errorStreak = 0 break @@ -530,8 +522,7 @@ def getMovieByName(name): # Query the local database for existing selection userMatchedQuery = Query() - queryResult = userMatchedMoviesTable.search( - userMatchedQuery.movie_name == name) + queryResult = userMatchedMoviesTable.search(userMatchedQuery.movie_name == name) # If the local database already contains an entry for a manual selection # then don't bother prompting the user to select it again! @@ -628,10 +619,10 @@ def processMovies(): # Total amount of rows in the CSV file errorStreak = 0 # Open the CSV file within the GDPR exported data - with open(getMoviesPath(), newline="") as csvfile: + with open(MOVIES_PATH, newline="") as csvfile: # Create the CSV reader, which will break up the fields using the delimiter ',' movieReaderTemp = csv.DictReader(csvfile, delimiter=",") - movieReader = filter(lambda p: '' != p['movie_name'], movieReaderTemp) + movieReader = filter(lambda p: "" != p["movie_name"], movieReaderTemp) # First, list all movies with watched type so that watchlist entry for them is not created watchedList = [] for row in movieReader: @@ -663,14 +654,13 @@ def processMovies(): # process, and just save time overall without needing to create network requests movieQuery = Query() queryResult = syncedMoviesTable.search( - (movieQuery.movie_name == movieName) & - (movieQuery.type == "watched") + (movieQuery.movie_name == movieName) & (movieQuery.type == "watched") ) watchlistQuery = Query() queryResultWatchlist = syncedMoviesTable.search( - (watchlistQuery.movie_name == movieName) & - (watchlistQuery.type == "watchlist") + (watchlistQuery.movie_name == movieName) + & (watchlistQuery.type == "watchlist") ) # If the query returned no results, then continue to import it into Trakt @@ -680,7 +670,8 @@ def processMovies(): # If movie is watched but this is an entry for watchlist, then skip if movieName in watchedList and activityType != "watch": logging.info( - f"Skipping '{movieName}' to avoid redundant watchlist entry.") + f"Skipping '{movieName}' to avoid redundant watchlist entry." + ) break # If more than 10 errors occurred in one streak, whilst trying to import the episode # then give up, and move onto the next episode, but warn the user. @@ -702,22 +693,23 @@ def processMovies(): break # Show the progress of the import on-screen logging.info( - f"({rowsCount+1}/{rowsTotal}) - Processing '{movieName}'" + f"({rowsCount + 1}/{rowsTotal}) - Processing '{movieName}'" ) if activityType == "watch": - traktMovieObj.mark_as_seen( - movieDateWatchedConverted) + traktMovieObj.mark_as_seen(movieDateWatchedConverted) # Add the episode to the local database as imported, so it can be skipped, # if the process is repeated syncedMoviesTable.insert( - {"movie_name": movieName, "type": "watched"}) + {"movie_name": movieName, "type": "watched"} + ) logging.info(f"Marked as seen") elif len(queryResultWatchlist) == 0: traktMovieObj.add_to_watchlist() # Add the episode to the local database as imported, so it can be skipped, # if the process is repeated syncedMoviesTable.insert( - {"movie_name": movieName, "type": "watchlist"}) + {"movie_name": movieName, "type": "watchlist"} + ) logging.info(f"Added to watchlist") else: logging.warning(f"Already in watchlist") @@ -790,8 +782,7 @@ def start(): menuSelection = 3 if not menuSelection else int(menuSelection) break except ValueError: - logging.warning( - "Invalid input. Please enter a numerical number.") + logging.warning("Invalid input. Please enter a numerical number.") # Check if the input is valid if not 1 <= menuSelection <= 4: logging.warning("Sorry - that's an unknown menu selection") @@ -829,12 +820,12 @@ if __name__ == "__main__": # Check that the user has created the config file if os.path.exists("config.json"): # Check that the user has provided the GDPR path - if os.path.isdir(config.GDPR_WORKSPACE_PATH): + if os.path.isdir(config.gdpr_workspace_path): start() else: logging.error( "Oops! The TV Time GDPR folder '" - + config.GDPR_WORKSPACE_PATH + + config.gdpr_workspace_path + "' does not exist on the local system. Please check it, and try again." ) else: