From a1276c6de6611d4b52430faaff62d050cb89486d Mon Sep 17 00:00:00 2001 From: Markus Nyman Date: Thu, 12 Jan 2023 15:58:39 +0200 Subject: [PATCH] Simple refactor --- TimeToTrakt.py | 191 +++++++++++++++++++++---------------------------- 1 file changed, 80 insertions(+), 111 deletions(-) diff --git a/TimeToTrakt.py b/TimeToTrakt.py index 42db17b..657988d 100644 --- a/TimeToTrakt.py +++ b/TimeToTrakt.py @@ -43,15 +43,13 @@ class Config: gdpr_workspace_path: str -def is_authenticated(): +def is_authenticated() -> bool: with open("pytrakt.json") as f: data = json.load(f) days_before_expiration = ( datetime.fromtimestamp(data["OAUTH_EXPIRES_AT"]) - datetime.now() ).days - if days_before_expiration < 1: - return False - return True + return days_before_expiration >= 1 def get_configuration() -> Config: @@ -82,7 +80,7 @@ FOLLOWED_SHOWS_PATH = config.gdpr_workspace_path + "/followed_tv_show.csv" MOVIES_PATH = config.gdpr_workspace_path + "/tracking-prod-records.csv" -def init_trakt_auth(): +def init_trakt_auth() -> bool: if is_authenticated(): return True # Set the method of authentication @@ -133,7 +131,7 @@ def get_year_from_title(title) -> Title: # It seems to improve automation, and reduce manual selection.... -def check_title_name_match(tv_time_title, trakt_title): +def check_title_name_match(tv_time_title: str, trakt_title: str) -> bool: # If the name is a complete match, then don't bother comparing them! if tv_time_title == trakt_title: return True @@ -162,7 +160,7 @@ def check_title_name_match(tv_time_title, trakt_title): # in Trakt.TV either by automation, or asking the user to confirm. -def get_show_by_name(name, season_number, episode_number): +def get_show_by_name(name: str, season_number: str, episode_number: str): # Parse the TV Show's name for year, if one is present in the string title = get_year_from_title(name) @@ -170,15 +168,12 @@ def get_show_by_name(name, season_number, episode_number): if title.year: name = title.without_year - # Request the Trakt API for search results, using the name - tv_search = TVShow.search(name) - # Create an array of shows which have been matched shows_with_same_name = [] # Go through each result from the search - for show in tv_search: - # Check if the title is a match, based on our conditions (e.g over 50% of words match) + for show in TVShow.search(name): + # Check if the title is a match, based on our conditions (e.g. over 50% of words match) if check_title_name_match(name, show.title): # If the title included the year of broadcast, then we can be more picky in the results # to look for a show with a broadcast year that matches @@ -196,21 +191,17 @@ def get_show_by_name(name, season_number, episode_number): else: shows_with_same_name.append(show) - # Sweep through the results once more for 1:1 title name matches, - # then if the list contains one entry with a 1:1 match, then clear the array - # and only use this one! - complete_match_names = [] - for nameFromSearch in shows_with_same_name: - if nameFromSearch.title == name: - complete_match_names.append(nameFromSearch) - + complete_match_names = [name_from_search for name_from_search in shows_with_same_name if name_from_search.title == name] if len(complete_match_names) == 1: - shows_with_same_name = complete_match_names - - # If the search contains multiple results, then we need to confirm with the user which show - # the script should use, or access the local database to see if the user has already provided - # a manual selection - if len(shows_with_same_name) > 1: + return complete_match_names[0] + elif len(shows_with_same_name) == 1: + return shows_with_same_name[0] + elif len(shows_with_same_name) < 1: + return None + else: + # If the search contains multiple results, then we need to confirm with the user which show + # the script should use, or access the local database to see if the user has already provided + # a manual selection # Query the local database for existing selection user_matched_query = Query() @@ -257,14 +248,14 @@ def get_show_by_name(name, season_number, episode_number): "Please make a selection from above (or enter SKIP):" ) - if index_selected != "SKIP": - # Since the value isn't 'skip', check that the result is numerical - index_selected = int(index_selected) - 1 - # Exit the selection loop - break - # Otherwise, exit the loop - else: + # Exit the loop + if index_selected == "SKIP": break + + # Since the value isn't 'skip', check that the result is numerical + index_selected = int(index_selected) - 1 + # Exit the selection loop + break # Still allow the user to provide the exit input, and kill the program except KeyboardInterrupt: sys.exit("Cancel requested...") @@ -298,17 +289,9 @@ def get_show_by_name(name, season_number, episode_number): return selected_show - else: - if len(shows_with_same_name) > 0: - # If the search returned only one result, then awesome! - # Return the show, so the import automation can continue. - return shows_with_same_name[0] - else: - return None - -# Since the Trakt.Py starts the indexing of seasons in the array from 0 (e.g Season 1 in Index 0), then -# subtract the TV Time numerical value by 1 so it starts from 0 as well. However, when a TV series includes +# Since the Trakt.Py starts the indexing of seasons in the array from 0 (e.g. Season 1 in Index 0), then +# subtract the TV Time numerical value by 1, so it starts from 0 as well. However, when a TV series includes # a 'special' season, Trakt.Py will place this as the first season in the array - so, don't subtract, since # this will match TV Time's existing value. @@ -335,9 +318,7 @@ def parse_season_number(season_number, trakt_show_obj): return season_number -def process_watched_shows(): - # Total amount of rows in the CSV file - error_streak = 0 +def process_watched_shows() -> None: # Open the CSV file within the GDPR exported data with open(WATCHED_SHOWS_PATH, newline="") as csvfile: # Create the CSV reader, which will break up the fields using the delimiter ',' @@ -355,9 +336,9 @@ def process_watched_shows(): # Get the TV Time Episode id tv_show_episode_id = row["episode_id"] # Get the TV Time Season Number - tv_show_season_no = row["episode_season_number"] + tv_show_season_number = row["episode_season_number"] # Get the TV Time Episode Number - tv_show_episode_no = row["episode_number"] + tv_show_episode_number = row["episode_number"] # Get the date which the show was marked 'watched' in TV Time tv_show_date_watched = row["updated_at"] # Parse the watched date value into a Python type @@ -377,6 +358,7 @@ def process_watched_shows(): # If the query returned no results, then continue to import it into Trakt if len(query_result) == 0: # Create a repeating loop, which will break on success, but repeats on failures + error_streak = 0 while True: # If more than 10 errors occurred in one streak, whilst trying to import the episode # then give up, and move onto the next episode, but warn the user. @@ -391,24 +373,24 @@ def process_watched_shows(): # Other developers share the service, for free - so be considerate of your usage. time.sleep(DELAY_BETWEEN_EPISODES_IN_SECONDS) # Search Trakt for the TV show matching TV Time's title value - trakt_show_obj = get_show_by_name( - tv_show_name, tv_show_season_no, tv_show_episode_no + trakt_show = get_show_by_name( + tv_show_name, tv_show_season_number, tv_show_episode_number ) # If the method returned 'None', then this is an indication to skip the episode, and # move onto the next one - if trakt_show_obj is None: + if not trakt_show: break # Show the progress of the import on-screen logging.info( - f"({rowsCount + 1}/{rows_total}) - Processing '{tv_show_name}' Season {tv_show_season_no} /" - f"Episode {tv_show_episode_no}" + f"({rowsCount + 1}/{rows_total}) - Processing '{tv_show_name}' Season {tv_show_season_number} /" + f"Episode {tv_show_episode_number}" ) # Get the season from the Trakt API - season = trakt_show_obj.seasons[ - parse_season_number(tv_show_season_no, trakt_show_obj) + season = trakt_show.seasons[ + parse_season_number(tv_show_season_number, trakt_show) ] # Get the episode from the season - episode = season.episodes[int(tv_show_episode_no) - 1] + episode = season.episodes[int(tv_show_episode_number) - 1] # Mark the episode as watched! episode.mark_as_seen(tv_show_date_watched_converted) # Add the episode to the local database as imported, so it can be skipped, @@ -421,20 +403,20 @@ def process_watched_shows(): # an incorrect Trakt show has been selected, with season/episodes which don't match TV Time. # It can also occur due to a bug in Trakt Py, whereby some seasons contain an empty array of episodes. except IndexError: - tv_show_slug = trakt_show_obj.to_json()["shows"][0]["ids"]["ids"][ + tv_show_slug = trakt_show.to_json()["shows"][0]["ids"]["ids"][ "slug" ] logging.warning( - f"({rowsCount}/{rows_total}) - {tv_show_name} Season {tv_show_season_no}, " - f"Episode {tv_show_episode_no} does not exist in Trakt! " - f"(https://trakt.tv/shows/{tv_show_slug}/seasons/{tv_show_season_no}/episodes/{tv_show_episode_no})" + f"({rowsCount}/{rows_total}) - {tv_show_name} Season {tv_show_season_number}, " + f"Episode {tv_show_episode_number} does not exist in Trakt! " + f"(https://trakt.tv/shows/{tv_show_slug}/seasons/{tv_show_season_number}/episodes/{tv_show_episode_number})" ) break # Catch any errors which are raised because a show could not be found in Trakt except trakt.errors.NotFoundException: logging.warning( - f"({rowsCount}/{rows_total}) - {tv_show_name} Season {tv_show_season_no}, " - f"Episode {tv_show_episode_no} does not exist (search) in Trakt!" + f"({rowsCount}/{rows_total}) - {tv_show_name} Season {tv_show_season_number}, " + f"Episode {tv_show_episode_number} does not exist (search) in Trakt!" ) break # Catch errors because of the program breaching the Trakt API rate limit @@ -452,7 +434,7 @@ def process_watched_shows(): except json.decoder.JSONDecodeError: logging.warning( f"({rowsCount}/{rows_total}) - A JSON decode error occuring whilst processing {tv_show_name} " - + f"Season {tv_show_season_no}, Episode {tv_show_episode_no}! This might occur when the server is down and has produced " + + f"Season {tv_show_season_number}, Episode {tv_show_episode_number}! This might occur when the server is down and has produced " + "a HTML document instead of JSON. The script will wait 60 seconds before trying again." ) @@ -467,7 +449,7 @@ def process_watched_shows(): # Skip the episode else: logging.info( - f"({rowsCount}/{rows_total}) - Already imported, skipping '{tv_show_name}' Season {tv_show_season_no} / Episode {tv_show_episode_no}." + f"({rowsCount}/{rows_total}) - Already imported, skipping '{tv_show_name}' Season {tv_show_season_number} / Episode {tv_show_episode_number}." ) @@ -475,7 +457,7 @@ def process_watched_shows(): # in Trakt.TV either by automation, or asking the user to confirm. -def get_movie_by_name(name): +def get_movie_by_name(name: str): # Parse the Movie's name for year, if one is present in the string title = get_year_from_title(name) @@ -483,15 +465,12 @@ def get_movie_by_name(name): if title.year: name = title.without_year - # Request the Trakt API for search results, using the name - movie_search = Movie.search(name) - # Create an array of movies which have been matched movies_with_same_name = [] # Go through each result from the search - for movie in movie_search: - # Check if the title is a match, based on our conditions (e.g over 50% of words match) + for movie in Movie.search(name): + # Check if the title is a match, based on our conditions (e.g. over 50% of words match) if check_title_name_match(name, movie.title): # If the title included the year of broadcast, then we can be more picky in the results # to look for a movie with a broadcast year that matches @@ -509,21 +488,17 @@ def get_movie_by_name(name): else: movies_with_same_name.append(movie) - # Sweep through the results once more for 1:1 title name matches, - # then if the list contains one entry with a 1:1 match, then clear the array - # and only use this one! - complete_match_names = [] - for nameFromSearch in movies_with_same_name: - if nameFromSearch.title == name: - complete_match_names.append(nameFromSearch) - + complete_match_names = [name_from_search for name_from_search in movies_with_same_name if name_from_search.title == name] if len(complete_match_names) == 1: - movies_with_same_name = complete_match_names - - # If the search contains multiple results, then we need to confirm with the user which movie - # the script should use, or access the local database to see if the user has already provided - # a manual selection - if len(movies_with_same_name) > 1: + return complete_match_names[0] + elif len(movies_with_same_name) == 1: + return movies_with_same_name[0] + elif len(movies_with_same_name) < 1: + return None + else: + # If the search contains multiple results, then we need to confirm with the user which movie + # the script should use, or access the local database to see if the user has already provided + # a manual selection # Query the local database for existing selection user_matched_query = Query() @@ -610,14 +585,6 @@ def get_movie_by_name(name): return selected_movie - else: - if len(movies_with_same_name) > 0: - # If the search returned only one result, then awesome! - # Return the movie, so the import automation can continue. - return movies_with_same_name[0] - else: - return None - def process_movies(): # Total amount of rows which have been processed in the CSV file @@ -773,6 +740,7 @@ def process_movies(): f"({rows_count}/{rows_total}) - Already imported, skipping '{movie_name}'." ) + def menu_selection() -> int: # Display a menu selection print(">> What do you want to do?") @@ -802,30 +770,31 @@ def menu_selection() -> int: def start(): selection = menu_selection() + # Create the initial authentication with Trakt, before starting the process - if init_trakt_auth(): - # Start the process which is required - if selection == 1: - # Invoke the method which will import episodes which have been watched - # from TV Time into Trakt - logging.info("Processing watched shows.") - process_watched_shows() - # TODO: Add support for followed shows - elif selection == 2: - # Invoke the method which will import movies which have been watched - # from TV Time into Trakt - logging.info("Processing movies.") - process_movies() - elif selection == 3: - # Invoke both the episodes and movies import methods - logging.info("Processing both watched shows and movies.") - process_watched_shows() - process_movies() - else: + if not init_trakt_auth(): logging.error( "ERROR: Unable to complete authentication to Trakt - please try again." ) + # Start the process which is required + if selection == 1: + # Invoke the method which will import episodes which have been watched + # from TV Time into Trakt + logging.info("Processing watched shows.") + process_watched_shows() + # TODO: Add support for followed shows + elif selection == 2: + # Invoke the method which will import movies which have been watched + # from TV Time into Trakt + logging.info("Processing movies.") + process_movies() + elif selection == 3: + # Invoke both the episodes and movies import methods + logging.info("Processing both watched shows and movies.") + process_watched_shows() + process_movies() + if __name__ == "__main__": # Check that the user has provided the GDPR path