From e9c3d99c53187cd4adfa6504f4611167118c33da Mon Sep 17 00:00:00 2001 From: Markus Nyman Date: Thu, 12 Jan 2023 14:49:00 +0200 Subject: [PATCH] Naming to snake_case --- TimeToTrakt.py | 421 +++++++++++++++++++++++++------------------------ 1 file changed, 212 insertions(+), 209 deletions(-) diff --git a/TimeToTrakt.py b/TimeToTrakt.py index 2a84304..fb066fd 100644 --- a/TimeToTrakt.py +++ b/TimeToTrakt.py @@ -46,18 +46,18 @@ class Config: gdpr_workspace_path: str -def isAuthenticated(): +def is_authenticated(): with open("pytrakt.json") as f: data = json.load(f) - daysBeforeExpiration = ( + days_before_expiration = ( datetime.fromtimestamp(data["OAUTH_EXPIRES_AT"]) - datetime.now() ).days - if daysBeforeExpiration < 1: + if days_before_expiration < 1: return False return True -def getConfiguration() -> Config: +def get_configuration() -> Config: try: with open("config.json") as f: data = json.load(f) @@ -78,15 +78,15 @@ def getConfiguration() -> Config: ) -config = getConfiguration() +config = get_configuration() WATCHED_SHOWS_PATH = config.gdpr_workspace_path + "/seen_episode.csv" FOLLOWED_SHOWS_PATH = config.gdpr_workspace_path + "/followed_tv_show.csv" MOVIES_PATH = config.gdpr_workspace_path + "/tracking-prod-records.csv" -def initTraktAuth(): - if isAuthenticated(): +def init_trakt_auth(): + if is_authenticated(): return True # Set the method of authentication trakt.core.AUTH_METHOD = trakt.core.OAUTH_AUTH @@ -103,18 +103,18 @@ def initTraktAuth(): # the accuracy of Trakt results. -def getYearFromTitle(title): +def get_year_from_title(title): ex = Expando() try: # Use a regex expression to get the value within the brackets e.g The Americans (2017) - yearSearch = re.search(r"\(([A-Za-z0-9_]+)\)", title) - yearValue = yearSearch.group(1) + year_search = re.search(r"\(([A-Za-z0-9_]+)\)", title) + year_value = year_search.group(1) # Then, get the title without the year value included - titleValue = title.split("(")[0].strip() + title_value = title.split("(")[0].strip() # Put this together into an object - ex.titleWithoutYear = titleValue - ex.yearValue = int(yearValue) + ex.titleWithoutYear = title_value + ex.yearValue = int(year_value) return ex except Exception: # If the above failed, then the title doesn't include a year @@ -129,24 +129,24 @@ def getYearFromTitle(title): # It seems to improve automation, and reduce manual selection.... -def checkTitleNameMatch(tvTimeTitle, traktTitle): +def check_title_name_match(tv_time_title, trakt_title): # If the name is a complete match, then don't bother comparing them! - if tvTimeTitle == traktTitle: + if tv_time_title == trakt_title: return True # Split the TvTime title - tvTimeTitleSplit = tvTimeTitle.split() + tv_time_title_split = tv_time_title.split() # Create an array of words which are found in the Trakt title - wordsMatched = [] + words_matched = [] # Go through each word of the TV Time title, and check if it's in the Trakt title - for word in tvTimeTitleSplit: - if word in traktTitle: - wordsMatched.append(word) + for word in tv_time_title_split: + if word in trakt_title: + words_matched.append(word) # Then calculate what percentage of words matched - quotient = len(wordsMatched) / len(traktTitle.split()) + quotient = len(words_matched) / len(trakt_title.split()) percentage = quotient * 100 # If more than 50% of words in the TV Time title exist in the Trakt title, @@ -158,79 +158,78 @@ def checkTitleNameMatch(tvTimeTitle, traktTitle): # in Trakt.TV either by automation, or asking the user to confirm. -def getShowByName(name, seasonNo, episodeNo): +def get_show_by_name(name, season_number, episode_number): # Parse the TV Show's name for year, if one is present in the string - titleObj = getYearFromTitle(name) + title_obj = get_year_from_title(name) # Create a boolean to indicate if the title contains a year, # this is used later on to improve the accuracy of picking # from search results - doesTitleIncludeYear = titleObj.yearValue != -1 + does_title_include_year = title_obj.yearValue != -1 # If the title contains a year, then replace the local variable with the stripped version - if doesTitleIncludeYear: - name = titleObj.titleWithoutYear + if does_title_include_year: + name = title_obj.titleWithoutYear # Request the Trakt API for search results, using the name - tvSearch = TVShow.search(name) + tv_search = TVShow.search(name) # Create an array of shows which have been matched - showsWithSameName = [] + shows_with_same_name = [] # Go through each result from the search - for show in tvSearch: + for show in tv_search: # Check if the title is a match, based on our conditions (e.g over 50% of words match) - if checkTitleNameMatch(name, show.title): + if check_title_name_match(name, show.title): # If the title included the year of broadcast, then we can be more picky in the results # to look for a show with a broadcast year that matches - if doesTitleIncludeYear: + if does_title_include_year: # If the show title is a 1:1 match, with the same broadcast year, then bingo! - if (name == show.title) and (show.year == titleObj.yearValue): + if (name == show.title) and (show.year == title_obj.yearValue): # Clear previous results, and only use this one - showsWithSameName = [] - showsWithSameName.append(show) + shows_with_same_name = [show] break # Otherwise, only add the show if the broadcast year matches - if show.year == titleObj.yearValue: - showsWithSameName.append(show) + if show.year == title_obj.yearValue: + shows_with_same_name.append(show) # If the program doesn't have the broadcast year, then add all the results else: - showsWithSameName.append(show) + shows_with_same_name.append(show) # Sweep through the results once more for 1:1 title name matches, # then if the list contains one entry with a 1:1 match, then clear the array # and only use this one! - completeMatchNames = [] - for nameFromSearch in showsWithSameName: + complete_match_names = [] + for nameFromSearch in shows_with_same_name: if nameFromSearch.title == name: - completeMatchNames.append(nameFromSearch) + complete_match_names.append(nameFromSearch) - if len(completeMatchNames) == 1: - showsWithSameName = completeMatchNames + if len(complete_match_names) == 1: + shows_with_same_name = complete_match_names # If the search contains multiple results, then we need to confirm with the user which show # the script should use, or access the local database to see if the user has already provided # a manual selection - if len(showsWithSameName) > 1: + if len(shows_with_same_name) > 1: # Query the local database for existing selection - userMatchedQuery = Query() - queryResult = userMatchedShowsTable.search(userMatchedQuery.ShowName == name) + user_matched_query = Query() + query_result = userMatchedShowsTable.search(user_matched_query.ShowName == name) # If the local database already contains an entry for a manual selection # then don't bother prompting the user to select it again! - if len(queryResult) == 1: + if len(query_result) == 1: # Get the first result from the query - firstMatch = queryResult[0] + first_match = query_result[0] # Get the value contains the selection index - firstMatchSelectedIndex = int(firstMatch.get("UserSelectedIndex")) + first_match_selected_index = int(first_match.get("UserSelectedIndex")) # Check if the user previously requested to skip the show - skipShow = firstMatch.get("SkipShow") + skip_show = first_match.get("SkipShow") # If the user did not skip, but provided an index selection, get the # matching show - if not skipShow: - return showsWithSameName[firstMatchSelectedIndex] + if not skip_show: + return shows_with_same_name[first_match_selected_index] # Otherwise, return None, which will trigger the script to skip # and move onto the next show else: @@ -239,27 +238,29 @@ def getShowByName(name, seasonNo, episodeNo): # then prompt the user to make a selection else: print( - f"INFO - MANUAL INPUT REQUIRED: The TV Time data for Show '{name}' (Season {seasonNo}, Episode {episodeNo}) has {len(showsWithSameName)} matching Trakt shows with the same name.\a" + f"INFO - MANUAL INPUT REQUIRED: The TV Time data for Show '{name}' (Season {season_number}," + f"Episode {episode_number}) has {len(shows_with_same_name)} matching Trakt shows with the same name.\a " ) # Output each show for manual selection - for idx, item in enumerate(showsWithSameName): + for idx, item in enumerate(shows_with_same_name): # Display the show's title, broadcast year, amount of seasons and a link to the Trakt page. # This will provide the user with enough information to make a selection. print( - f" ({idx + 1}) {item.title} - {item.year} - {len(item.seasons)} Season(s) - More Info: https://trakt.tv/{item.ext}" + f" ({idx + 1}) {item.title} - {item.year} - {len(item.seasons)} " + f"Season(s) - More Info: https://trakt.tv/{item.ext}" ) while True: try: # Get the user's selection, either a numerical input, or a string 'SKIP' value - indexSelected = input( + index_selected = input( "Please make a selection from above (or enter SKIP):" ) - if indexSelected != "SKIP": + if index_selected != "SKIP": # Since the value isn't 'skip', check that the result is numerical - indexSelected = int(indexSelected) - 1 + index_selected = int(index_selected) - 1 # Exit the selection loop break # Otherwise, exit the loop @@ -271,12 +272,12 @@ def getShowByName(name, seasonNo, episodeNo): # Otherwise, the user has entered an invalid value, warn the user to try again except Exception: logging.error( - f"Sorry! Please select a value between 0 to {len(showsWithSameName)}" + f"Sorry! Please select a value between 0 to {len(shows_with_same_name)}" ) # If the user entered 'SKIP', then exit from the loop with no selection, which # will trigger the program to move onto the next episode - if indexSelected == "SKIP": + if index_selected == "SKIP": # Record that the user has skipped the TV Show for import, so that # manual input isn't required everytime userMatchedShowsTable.insert( @@ -286,23 +287,23 @@ def getShowByName(name, seasonNo, episodeNo): return None # Otherwise, return the selection which the user made from the list else: - selectedShow = showsWithSameName[int(indexSelected)] + selected_show = shows_with_same_name[int(index_selected)] userMatchedShowsTable.insert( { "ShowName": name, - "UserSelectedIndex": indexSelected, + "UserSelectedIndex": index_selected, "SkipShow": False, } ) - return selectedShow + return selected_show else: - if len(showsWithSameName) > 0: + if len(shows_with_same_name) > 0: # If the search returned only one result, then awesome! # Return the show, so the import automation can continue. - return showsWithSameName[0] + return shows_with_same_name[0] else: return None @@ -313,76 +314,74 @@ def getShowByName(name, seasonNo, episodeNo): # this will match TV Time's existing value. -def parseSeasonNo(seasonNo, traktShowObj): +def parse_season_number(season_number, trakt_show_obj): # Parse the season number into a numerical value - seasonNo = int(seasonNo) + season_number = int(season_number) # Then get the Season Number from the first item in the array - firstSeasonNo = traktShowObj.seasons[0].number + first_season_no = trakt_show_obj.seasons[0].number # If the season number is 0, then the Trakt show contains a "special" season - if firstSeasonNo == 0: + if first_season_no == 0: # No need to modify the value, as the TV Time value will match Trakt - return seasonNo + return season_number # Otherwise, if the Trakt seasons start with no specials, then return the seasonNo, # but subtracted by one (e.g Season 1 in TV Time, will be 0) else: # Only subtract if the TV Time season number is greater than 0. - if seasonNo != 0: - return seasonNo - 1 + if season_number != 0: + return season_number - 1 # Otherwise, the TV Time season is a special! Then you don't need to change the starting position else: - return seasonNo + return season_number -def processWatchedShows(): - # Total amount of rows which have been processed in the CSV file - rowsCount = 0 +def process_watched_shows(): # Total amount of rows in the CSV file - errorStreak = 0 + error_streak = 0 # Open the CSV file within the GDPR exported data with open(WATCHED_SHOWS_PATH, newline="") as csvfile: # Create the CSV reader, which will break up the fields using the delimiter ',' - showsReader = csv.DictReader(csvfile, delimiter=",") + shows_reader = csv.DictReader(csvfile, delimiter=",") # Get the total amount of rows in the CSV file, - rowsTotal = len(list(showsReader)) + rows_total = len(list(shows_reader)) # Move position to the beginning of the file csvfile.seek(0, 0) # Loop through each line/record of the CSV file # Ignore the header row - next(showsReader, None) - for rowsCount, row in enumerate(showsReader): + next(shows_reader, None) + for rowsCount, row in enumerate(shows_reader): # Get the name of the TV show - tvShowName = row["tv_show_name"] - # Get the TV Time Episode Id - tvShowEpisodeId = row["episode_id"] + tv_show_name = row["tv_show_name"] + # Get the TV Time Episode id + tv_show_episode_id = row["episode_id"] # Get the TV Time Season Number - tvShowSeasonNo = row["episode_season_number"] + tv_show_season_no = row["episode_season_number"] # Get the TV Time Episode Number - tvShowEpisodeNo = row["episode_number"] + tv_show_episode_no = row["episode_number"] # Get the date which the show was marked 'watched' in TV Time - tvShowDateWatched = row["updated_at"] + tv_show_date_watched = row["updated_at"] # Parse the watched date value into a Python type - tvShowDateWatchedConverted = datetime.strptime( - tvShowDateWatched, "%Y-%m-%d %H:%M:%S" + tv_show_date_watched_converted = datetime.strptime( + tv_show_date_watched, "%Y-%m-%d %H:%M:%S" ) # Query the local database for previous entries indicating that # the episode has already been imported in the past. Which will # ease pressure on TV Time's API server during a retry of the import # process, and just save time overall without needing to create network requests - episodeCompletedQuery = Query() - queryResult = syncedEpisodesTable.search( - episodeCompletedQuery.episodeId == tvShowEpisodeId + episode_completed_query = Query() + query_result = syncedEpisodesTable.search( + episode_completed_query.episodeId == tv_show_episode_id ) # If the query returned no results, then continue to import it into Trakt - if len(queryResult) == 0: + if len(query_result) == 0: # Create a repeating loop, which will break on success, but repeats on failures while True: # If more than 10 errors occurred in one streak, whilst trying to import the episode # then give up, and move onto the next episode, but warn the user. - if errorStreak > 10: + if error_streak > 10: logging.warning( "An error occurred 10 times in a row... skipping episode..." ) @@ -393,46 +392,50 @@ def processWatchedShows(): # Other developers share the service, for free - so be considerate of your usage. time.sleep(DELAY_BETWEEN_EPISODES_IN_SECONDS) # Search Trakt for the TV show matching TV Time's title value - traktShowObj = getShowByName( - tvShowName, tvShowSeasonNo, tvShowEpisodeNo + trakt_show_obj = get_show_by_name( + tv_show_name, tv_show_season_no, tv_show_episode_no ) # If the method returned 'None', then this is an indication to skip the episode, and # move onto the next one - if traktShowObj is None: + if trakt_show_obj is None: break # Show the progress of the import on-screen logging.info( - f"({rowsCount + 1}/{rowsTotal}) - Processing '{tvShowName}' Season {tvShowSeasonNo} / Episode {tvShowEpisodeNo}" + f"({rowsCount + 1}/{rows_total}) - Processing '{tv_show_name}' Season {tv_show_season_no} /" + f"Episode {tv_show_episode_no}" ) # Get the season from the Trakt API - season = traktShowObj.seasons[ - parseSeasonNo(tvShowSeasonNo, traktShowObj) + season = trakt_show_obj.seasons[ + parse_season_number(tv_show_season_no, trakt_show_obj) ] # Get the episode from the season - episode = season.episodes[int(tvShowEpisodeNo) - 1] + episode = season.episodes[int(tv_show_episode_no) - 1] # Mark the episode as watched! - episode.mark_as_seen(tvShowDateWatchedConverted) + episode.mark_as_seen(tv_show_date_watched_converted) # Add the episode to the local database as imported, so it can be skipped, # if the process is repeated - syncedEpisodesTable.insert({"episodeId": tvShowEpisodeId}) + syncedEpisodesTable.insert({"episodeId": tv_show_episode_id}) # Clear the error streak on completing the method without errors - errorStreak = 0 + error_streak = 0 break # Catch errors which occur because of an incorrect array index. This occurs when # an incorrect Trakt show has been selected, with season/episodes which don't match TV Time. # It can also occur due to a bug in Trakt Py, whereby some seasons contain an empty array of episodes. except IndexError: - tvShowSlug = traktShowObj.to_json()["shows"][0]["ids"]["ids"][ + tv_show_slug = trakt_show_obj.to_json()["shows"][0]["ids"]["ids"][ "slug" ] logging.warning( - f"({rowsCount}/{rowsTotal}) - {tvShowName} Season {tvShowSeasonNo}, Episode {tvShowEpisodeNo} does not exist in Trakt! (https://trakt.tv/shows/{tvShowSlug}/seasons/{tvShowSeasonNo}/episodes/{tvShowEpisodeNo})" + f"({rowsCount}/{rows_total}) - {tv_show_name} Season {tv_show_season_no}, " + f"Episode {tv_show_episode_no} does not exist in Trakt! " + f"(https://trakt.tv/shows/{tv_show_slug}/seasons/{tv_show_season_no}/episodes/{tv_show_episode_no})" ) break # Catch any errors which are raised because a show could not be found in Trakt except trakt.errors.NotFoundException: logging.warning( - f"({rowsCount}/{rowsTotal}) - {tvShowName} Season {tvShowSeasonNo}, Episode {tvShowEpisodeNo} does not exist (search) in Trakt!" + f"({rowsCount}/{rows_total}) - {tv_show_name} Season {tv_show_season_no}, " + f"Episode {tv_show_episode_no} does not exist (search) in Trakt!" ) break # Catch errors because of the program breaching the Trakt API rate limit @@ -445,12 +448,12 @@ def processWatchedShows(): time.sleep(60) # Mark the exception in the error streak - errorStreak += 1 + error_streak += 1 # Catch a JSON decode error - this can be raised when the API server is down and produces a HTML page, instead of JSON except json.decoder.JSONDecodeError: logging.warning( - f"({rowsCount}/{rowsTotal}) - A JSON decode error occuring whilst processing {tvShowName} " - + f"Season {tvShowSeasonNo}, Episode {tvShowEpisodeNo}! This might occur when the server is down and has produced " + f"({rowsCount}/{rows_total}) - A JSON decode error occuring whilst processing {tv_show_name} " + + f"Season {tv_show_season_no}, Episode {tv_show_episode_no}! This might occur when the server is down and has produced " + "a HTML document instead of JSON. The script will wait 60 seconds before trying again." ) @@ -458,14 +461,14 @@ def processWatchedShows(): time.sleep(60) # Mark the exception in the error streak - errorStreak += 1 + error_streak += 1 # Catch a CTRL + C keyboard input, and exits the program except KeyboardInterrupt: sys.exit("Cancel requested...") # Skip the episode else: logging.info( - f"({rowsCount}/{rowsTotal}) - Already imported, skipping '{tvShowName}' Season {tvShowSeasonNo} / Episode {tvShowEpisodeNo}." + f"({rowsCount}/{rows_total}) - Already imported, skipping '{tv_show_name}' Season {tv_show_season_no} / Episode {tv_show_episode_no}." ) @@ -473,79 +476,78 @@ def processWatchedShows(): # in Trakt.TV either by automation, or asking the user to confirm. -def getMovieByName(name): +def get_movie_by_name(name): # Parse the Movie's name for year, if one is present in the string - titleObj = getYearFromTitle(name) + title_obj = get_year_from_title(name) # Create a boolean to indicate if the title contains a year, # this is used later on to improve the accuracy of picking # from search results - doesTitleIncludeYear = titleObj.yearValue != -1 + does_title_include_year = title_obj.yearValue != -1 # If the title contains a year, then replace the local variable with the stripped version - if doesTitleIncludeYear: - name = titleObj.titleWithoutYear + if does_title_include_year: + name = title_obj.titleWithoutYear # Request the Trakt API for search results, using the name - movieSearch = Movie.search(name) + movie_search = Movie.search(name) # Create an array of movies which have been matched - moviesWithSameName = [] + movies_with_same_name = [] # Go through each result from the search - for movie in movieSearch: + for movie in movie_search: # Check if the title is a match, based on our conditions (e.g over 50% of words match) - if checkTitleNameMatch(name, movie.title): + if check_title_name_match(name, movie.title): # If the title included the year of broadcast, then we can be more picky in the results # to look for a movie with a broadcast year that matches - if doesTitleIncludeYear: + if does_title_include_year: # If the movie title is a 1:1 match, with the same broadcast year, then bingo! - if (name == movie.title) and (movie.year == titleObj.yearValue): + if (name == movie.title) and (movie.year == title_obj.yearValue): # Clear previous results, and only use this one - moviesWithSameName = [] - moviesWithSameName.append(movie) + movies_with_same_name = [movie] break # Otherwise, only add the movie if the broadcast year matches - if movie.year == titleObj.yearValue: - moviesWithSameName.append(movie) + if movie.year == title_obj.yearValue: + movies_with_same_name.append(movie) # If the program doesn't have the broadcast year, then add all the results else: - moviesWithSameName.append(movie) + movies_with_same_name.append(movie) # Sweep through the results once more for 1:1 title name matches, # then if the list contains one entry with a 1:1 match, then clear the array # and only use this one! - completeMatchNames = [] - for nameFromSearch in moviesWithSameName: + complete_match_names = [] + for nameFromSearch in movies_with_same_name: if nameFromSearch.title == name: - completeMatchNames.append(nameFromSearch) + complete_match_names.append(nameFromSearch) - if len(completeMatchNames) == 1: - moviesWithSameName = completeMatchNames + if len(complete_match_names) == 1: + movies_with_same_name = complete_match_names # If the search contains multiple results, then we need to confirm with the user which movie # the script should use, or access the local database to see if the user has already provided # a manual selection - if len(moviesWithSameName) > 1: + if len(movies_with_same_name) > 1: # Query the local database for existing selection - userMatchedQuery = Query() - queryResult = userMatchedMoviesTable.search(userMatchedQuery.movie_name == name) + user_matched_query = Query() + query_result = userMatchedMoviesTable.search(user_matched_query.movie_name == name) # If the local database already contains an entry for a manual selection # then don't bother prompting the user to select it again! - if len(queryResult) == 1: + if len(query_result) == 1: # Get the first result from the query - firstMatch = queryResult[0] + first_match = query_result[0] # Get the value contains the selection index - firstMatchSelectedIndex = int(firstMatch.get("UserSelectedIndex")) + first_match_selected_index = int(first_match.get("UserSelectedIndex")) # Check if the user previously requested to skip the movie - skipMovie = firstMatch.get("SkipMovie") + skip_movie = first_match.get("SkipMovie") # If the user did not skip, but provided an index selection, get the # matching movie - if not skipMovie: - return moviesWithSameName[firstMatchSelectedIndex] + if not skip_movie: + return movies_with_same_name[first_match_selected_index] # Otherwise, return None, which will trigger the script to skip # and move onto the next movie else: @@ -554,11 +556,12 @@ def getMovieByName(name): # then prompt the user to make a selection else: print( - f"INFO - MANUAL INPUT REQUIRED: The TV Time data for Movie '{name}' has {len(moviesWithSameName)} matching Trakt movies with the same name.\a" + f"INFO - MANUAL INPUT REQUIRED: The TV Time data for Movie '{name}' has {len(movies_with_same_name)} " + f"matching Trakt movies with the same name.\a" ) # Output each movie for manual selection - for idx, item in enumerate(moviesWithSameName): + for idx, item in enumerate(movies_with_same_name): # Display the movie's title, broadcast year, amount of seasons and a link to the Trakt page. # This will provide the user with enough information to make a selection. print( @@ -568,13 +571,13 @@ def getMovieByName(name): while True: try: # Get the user's selection, either a numerical input, or a string 'SKIP' value - indexSelected = input( + index_selected = input( "Please make a selection from above (or enter SKIP):" ) - if indexSelected != "SKIP": + if index_selected != "SKIP": # Since the value isn't 'skip', check that the result is numerical - indexSelected = int(indexSelected) - 1 + index_selected = int(index_selected) - 1 # Exit the selection loop break # Otherwise, exit the loop @@ -586,12 +589,12 @@ def getMovieByName(name): # Otherwise, the user has entered an invalid value, warn the user to try again except Exception: logging.error( - f"Sorry! Please select a value between 0 to {len(moviesWithSameName)}" + f"Sorry! Please select a value between 0 to {len(movies_with_same_name)}" ) # If the user entered 'SKIP', then exit from the loop with no selection, which # will trigger the program to move onto the next episode - if indexSelected == "SKIP": + if index_selected == "SKIP": # Record that the user has skipped the Movie for import, so that # manual input isn't required everytime userMatchedMoviesTable.insert( @@ -601,90 +604,89 @@ def getMovieByName(name): return None # Otherwise, return the selection which the user made from the list else: - selectedMovie = moviesWithSameName[int(indexSelected)] + selected_movie = movies_with_same_name[int(index_selected)] userMatchedMoviesTable.insert( { "movie_name": name, - "UserSelectedIndex": indexSelected, + "UserSelectedIndex": index_selected, "SkipMovie": False, } ) - return selectedMovie + return selected_movie else: - if len(moviesWithSameName) > 0: + if len(movies_with_same_name) > 0: # If the search returned only one result, then awesome! # Return the movie, so the import automation can continue. - return moviesWithSameName[0] + return movies_with_same_name[0] else: return None -def processMovies(): +def process_movies(): # Total amount of rows which have been processed in the CSV file - rowsCount = 0 # Total amount of rows in the CSV file - errorStreak = 0 + error_streak = 0 # Open the CSV file within the GDPR exported data with open(MOVIES_PATH, newline="") as csvfile: # Create the CSV reader, which will break up the fields using the delimiter ',' - movieReaderTemp = csv.DictReader(csvfile, delimiter=",") - movieReader = filter(lambda p: "" != p["movie_name"], movieReaderTemp) + movie_reader_temp = csv.DictReader(csvfile, delimiter=",") + movie_reader = filter(lambda p: "" != p["movie_name"], movie_reader_temp) # First, list all movies with watched type so that watchlist entry for them is not created - watchedList = [] - for row in movieReader: + watched_list = [] + for row in movie_reader: if row["type"] == "watch": - watchedList.append(row["movie_name"]) + watched_list.append(row["movie_name"]) # Move position to the beginning of the file csvfile.seek(0, 0) # Get the total amount of rows in the CSV file, - rowsTotal = len(list(movieReader)) + rows_total = len(list(movie_reader)) # Move position to the beginning of the file csvfile.seek(0, 0) # Loop through each line/record of the CSV file # Ignore the header row - next(movieReader, None) - for rowsCount, row in enumerate(movieReader): + next(movie_reader, None) + for rows_count, row in enumerate(movie_reader): # Get the name of the Movie - movieName = row["movie_name"] + movie_name = row["movie_name"] # Get the date which the movie was marked 'watched' in TV Time - activityType = row["type"] - movieDateWatched = row["updated_at"] + activity_type = row["type"] + movie_date_watched = row["updated_at"] # Parse the watched date value into a Python type - movieDateWatchedConverted = datetime.strptime( - movieDateWatched, "%Y-%m-%d %H:%M:%S" + movie_date_watched_converted = datetime.strptime( + movie_date_watched, "%Y-%m-%d %H:%M:%S" ) # Query the local database for previous entries indicating that # the episode has already been imported in the past. Which will # ease pressure on TV Time's API server during a retry of the import # process, and just save time overall without needing to create network requests - movieQuery = Query() - queryResult = syncedMoviesTable.search( - (movieQuery.movie_name == movieName) & (movieQuery.type == "watched") + movie_query = Query() + query_result = syncedMoviesTable.search( + (movie_query.movie_name == movie_name) & (movie_query.type == "watched") ) - watchlistQuery = Query() - queryResultWatchlist = syncedMoviesTable.search( - (watchlistQuery.movie_name == movieName) - & (watchlistQuery.type == "watchlist") + watchlist_query = Query() + query_result_watchlist = syncedMoviesTable.search( + (watchlist_query.movie_name == movie_name) + & (watchlist_query.type == "watchlist") ) # If the query returned no results, then continue to import it into Trakt - if len(queryResult) == 0: + if len(query_result) == 0: # Create a repeating loop, which will break on success, but repeats on failures while True: # If movie is watched but this is an entry for watchlist, then skip - if movieName in watchedList and activityType != "watch": + if movie_name in watched_list and activity_type != "watch": logging.info( - f"Skipping '{movieName}' to avoid redundant watchlist entry." + f"Skipping '{movie_name}' to avoid redundant watchlist entry." ) break # If more than 10 errors occurred in one streak, whilst trying to import the episode # then give up, and move onto the next episode, but warn the user. - if errorStreak > 10: + if error_streak > 10: logging.warning( "An error occurred 10 times in a row... skipping episode..." ) @@ -695,51 +697,52 @@ def processMovies(): # Other developers share the service, for free - so be considerate of your usage. time.sleep(DELAY_BETWEEN_EPISODES_IN_SECONDS) # Search Trakt for the Movie matching TV Time's title value - traktMovieObj = getMovieByName(movieName) + trakt_movie_obj = get_movie_by_name(movie_name) # If the method returned 'None', then this is an indication to skip the episode, and # move onto the next one - if traktMovieObj is None: + if trakt_movie_obj is None: break # Show the progress of the import on-screen logging.info( - f"({rowsCount + 1}/{rowsTotal}) - Processing '{movieName}'" + f"({rows_count + 1}/{rows_total}) - Processing '{movie_name}'" ) - if activityType == "watch": - traktMovieObj.mark_as_seen(movieDateWatchedConverted) + if activity_type == "watch": + trakt_movie_obj.mark_as_seen(movie_date_watched_converted) # Add the episode to the local database as imported, so it can be skipped, # if the process is repeated syncedMoviesTable.insert( - {"movie_name": movieName, "type": "watched"} + {"movie_name": movie_name, "type": "watched"} ) logging.info(f"Marked as seen") - elif len(queryResultWatchlist) == 0: - traktMovieObj.add_to_watchlist() + elif len(query_result_watchlist) == 0: + trakt_movie_obj.add_to_watchlist() # Add the episode to the local database as imported, so it can be skipped, # if the process is repeated syncedMoviesTable.insert( - {"movie_name": movieName, "type": "watchlist"} + {"movie_name": movie_name, "type": "watchlist"} ) logging.info(f"Added to watchlist") else: logging.warning(f"Already in watchlist") # Clear the error streak on completing the method without errors - errorStreak = 0 + error_streak = 0 break # Catch errors which occur because of an incorrect array index. This occurs when # an incorrect Trakt movie has been selected, with season/episodes which don't match TV Time. # It can also occur due to a bug in Trakt Py, whereby some seasons contain an empty array of episodes. except IndexError: - movieSlug = traktMovieObj.to_json()["movies"][0]["ids"]["ids"][ + movie_slug = trakt_movie_obj.to_json()["movies"][0]["ids"]["ids"][ "slug" ] logging.warning( - f"({rowsCount}/{rowsTotal}) - {movieName} does not exist in Trakt! (https://trakt.tv/movies/{movieSlug}/)" + f"({rows_count}/{rows_total}) - {movie_name} " + f"does not exist in Trakt! (https://trakt.tv/movies/{movie_slug}/)" ) break # Catch any errors which are raised because a movie could not be found in Trakt except trakt.errors.NotFoundException: logging.warning( - f"({rowsCount}/{rowsTotal}) - {movieName} does not exist (search) in Trakt!" + f"({rows_count}/{rows_total}) - {movie_name} does not exist (search) in Trakt!" ) break # Catch errors because of the program breaching the Trakt API rate limit @@ -752,11 +755,11 @@ def processMovies(): time.sleep(60) # Mark the exception in the error streak - errorStreak += 1 + error_streak += 1 # Catch a JSON decode error - this can be raised when the API server is down and produces a HTML page, instead of JSON except json.decoder.JSONDecodeError: logging.warning( - f"({rowsCount}/{rowsTotal}) - A JSON decode error occuring whilst processing {movieName} " + f"({rows_count}/{rows_total}) - A JSON decode error occuring whilst processing {movie_name} " + f" This might occur when the server is down and has produced " + "a HTML document instead of JSON. The script will wait 60 seconds before trying again." ) @@ -765,7 +768,7 @@ def processMovies(): time.sleep(60) # Mark the exception in the error streak - errorStreak += 1 + error_streak += 1 # Catch a CTRL + C keyboard input, and exits the program except KeyboardInterrupt: sys.exit("Cancel requested...") @@ -773,7 +776,7 @@ def processMovies(): # Skip the episode else: logging.info( - f"({rowsCount}/{rowsTotal}) - Already imported, skipping '{movieName}'." + f"({rows_count}/{rows_total}) - Already imported, skipping '{movie_name}'." ) @@ -787,38 +790,38 @@ def start(): while True: try: - menuSelection = input("Enter your menu selection: ") - menuSelection = 3 if not menuSelection else int(menuSelection) + menu_selection = input("Enter your menu selection: ") + menu_selection = 3 if not menu_selection else int(menu_selection) break except ValueError: logging.warning("Invalid input. Please enter a numerical number.") # Check if the input is valid - if not 1 <= menuSelection <= 4: + if not 1 <= menu_selection <= 4: logging.warning("Sorry - that's an unknown menu selection") exit() # Exit if the 4th option was chosen - if menuSelection == 4: + if menu_selection == 4: logging.info("Exiting as per user's selection.") exit() # Create the initial authentication with Trakt, before starting the process - if initTraktAuth(): + if init_trakt_auth(): # Start the process which is required - if menuSelection == 1: + if menu_selection == 1: # Invoke the method which will import episodes which have been watched # from TV Time into Trakt logging.info("Processing watched shows.") - processWatchedShows() + process_watched_shows() # TODO: Add support for followed shows - elif menuSelection == 2: + elif menu_selection == 2: # Invoke the method which will import movies which have been watched # from TV Time into Trakt logging.info("Processing movies.") - processMovies() - elif menuSelection == 3: + process_movies() + elif menu_selection == 3: # Invoke both the episodes and movies import methods logging.info("Processing both watched shows and movies.") - processWatchedShows() - processMovies() + process_watched_shows() + process_movies() else: logging.error( "ERROR: Unable to complete authentication to Trakt - please try again."