From 5ec8c1323ee9717d338b59d13cac73cf146f7d57 Mon Sep 17 00:00:00 2001 From: SinTan1729 Date: Thu, 6 Oct 2022 00:49:16 -0500 Subject: [PATCH] Merged scripts --- TimeToTrakt.py | 715 ++++++++++++++++++++++--------------------- TimeToTraktMovies.py | 492 ----------------------------- 2 files changed, 373 insertions(+), 834 deletions(-) delete mode 100644 TimeToTraktMovies.py diff --git a/TimeToTrakt.py b/TimeToTrakt.py index 954c365..f58e336 100644 --- a/TimeToTrakt.py +++ b/TimeToTrakt.py @@ -6,15 +6,14 @@ import os import re import sys import time -from dataclasses import dataclass from datetime import datetime -from typing import Optional, Callable, TypeVar, Union, List +from pathlib import Path import trakt.core from tinydb import Query, TinyDB from trakt import init -from trakt.movies import Movie from trakt.tv import TVShow +from trakt.movies import Movie # Setup logger logging.basicConfig( @@ -28,68 +27,72 @@ logging.basicConfig( DELAY_BETWEEN_EPISODES_IN_SECONDS = 1 # Create databases to keep track of completed processes -database = TinyDB("localStorage.json") -syncedEpisodesTable = database.table("SyncedEpisodes") -userMatchedShowsTable = database.table("TvTimeTraktUserMatched") -syncedMoviesTable = database.table("SyncedMovies") -userMatchedMoviesTable = database.table("TvTimeTraktUserMatchedMovies") +databaseshows = TinyDB("localStorageShows.json") +syncedEpisodesTable = databaseshows.table("SyncedEpisodes") +userMatchedShowsTable = databaseshows.table("TvTimeTraktUserMatched") +databasemovies = TinyDB("localStorageMovies.json") +syncedMoviesTable = databasemovies.table("SyncedMovies") +userMatchedMoviesTable = databasemovies.table("TvTimeTraktUserMatched") -@dataclass -class Config: - trakt_username: str - client_id: str - client_secret: str - gdpr_workspace_path: str +class Expando(object): + pass -def is_authenticated() -> bool: +def isAuthenticated(): with open("pytrakt.json") as f: data = json.load(f) - days_before_expiration = ( - datetime.fromtimestamp(data["OAUTH_EXPIRES_AT"]) - datetime.now() + daysBeforeExpiration = ( + datetime.fromtimestamp(data["OAUTH_EXPIRES_AT"]) - datetime.now() ).days - return days_before_expiration >= 1 + if daysBeforeExpiration < 1: + return False + return True -def get_configuration() -> Config: - try: - with open("config.json") as f: - data = json.load(f) +def getConfiguration(): + configEx = Expando() - return Config( - data["TRAKT_USERNAME"], - data["CLIENT_ID"], - data["CLIENT_SECRET"], - data["GDPR_WORKSPACE_PATH"], - ) - except FileNotFoundError: - logging.info("config.json not found prompting user for input") - return Config( - input("Enter your Trakt.tv username: "), - input("Enter you Client id: "), - input("Enter your Client secret: "), - input("Enter your GDPR workspace path: ") - ) + with open("config.json") as f: + data = json.load(f) + + configEx.TRAKT_USERNAME = data["TRAKT_USERNAME"] + configEx.CLIENT_ID = data["CLIENT_ID"] + configEx.CLIENT_SECRET = data["CLIENT_SECRET"] + configEx.GDPR_WORKSPACE_PATH = data["GDPR_WORKSPACE_PATH"] + + CONFIG_SINGLETON = configEx + + return CONFIG_SINGLETON -config = get_configuration() +config = getConfiguration() -WATCHED_SHOWS_PATH = config.gdpr_workspace_path + "/seen_episode.csv" -FOLLOWED_SHOWS_PATH = config.gdpr_workspace_path + "/followed_tv_show.csv" -MOVIES_PATH = config.gdpr_workspace_path + "/tracking-prod-records.csv" +# Return the path to the CSV file contain the watched episode and movie data from TV Time -def init_trakt_auth() -> bool: - if is_authenticated(): +def getWatchedShowsPath(): + return config.GDPR_WORKSPACE_PATH + "/seen_episode.csv" + + +def getFollowedShowsPath(): + return config.GDPR_WORKSPACE_PATH + "/followed_tv_show.csv" + + +def getMoviesPath(): + return config.GDPR_WORKSPACE_PATH + "/tracking-prod-records.csv" + + +def initTraktAuth(): + if isAuthenticated(): return True # Set the method of authentication trakt.core.AUTH_METHOD = trakt.core.OAUTH_AUTH return init( - config.trakt_username, + config.TRAKT_USERNAME, store=True, - client_id=config.client_id, - client_secret=config.client_secret, + client_id=config.CLIENT_ID, + client_secret=config.CLIENT_SECRET, ) @@ -97,33 +100,26 @@ def init_trakt_auth() -> bool: # and then return this value, with the title and year removed to improve # the accuracy of Trakt results. -@dataclass -class Title: - name: str - without_year: str - year: Optional[int] - def __init__(self, title: str): - try: - # Use a regex expression to get the value within the brackets e.g. The Americans (2017) - year_search = re.search(r"\(([A-Za-z0-9_]+)\)", title) - year_value = year_search.group(1) - # Then, get the title without the year value included - title_value = title.split("(")[0].strip() - # Put this together into an object - self.name = title - self.without_year = title_value - self.year = int(year_value) - except Exception: - # If the above failed, then the title doesn't include a year - # so return the object as is. - self.name = title - self.without_year = title - self.year = None +def getYearFromTitle(title): + ex = Expando() - -def get_year_from_title(title) -> Title: - return Title(title) + try: + # Use a regex expression to get the value within the brackets e.g The Americans (2017) + yearSearch = re.search(r"\(([A-Za-z0-9_]+)\)", title) + yearValue = yearSearch.group(1) + # Then, get the title without the year value included + titleValue = title.split("(")[0].strip() + # Put this together into an object + ex.titleWithoutYear = titleValue + ex.yearValue = int(yearValue) + return ex + except Exception: + # If the above failed, then the title doesn't include a year + # so return the object as is. + ex.titleWithoutYear = title + ex.yearValue = -1 + return ex # Shows in TV Time are often different to Trakt.TV - in order to improve results and automation, @@ -131,24 +127,24 @@ def get_year_from_title(title) -> Title: # It seems to improve automation, and reduce manual selection.... -def check_title_name_match(tv_time_title: str, trakt_title: str) -> bool: +def checkTitleNameMatch(tvTimeTitle, traktTitle): # If the name is a complete match, then don't bother comparing them! - if tv_time_title == trakt_title: + if tvTimeTitle == traktTitle: return True # Split the TvTime title - tv_time_title_split = tv_time_title.split() + tvTimeTitleSplit = tvTimeTitle.split() # Create an array of words which are found in the Trakt title - words_matched = [] + wordsMatched = [] # Go through each word of the TV Time title, and check if it's in the Trakt title - for word in tv_time_title_split: - if word in trakt_title: - words_matched.append(word) + for word in tvTimeTitleSplit: + if word in traktTitle: + wordsMatched.append(word) # Then calculate what percentage of words matched - quotient = len(words_matched) / len(trakt_title.split()) + quotient = len(wordsMatched) / len(traktTitle.split()) percentage = quotient * 100 # If more than 50% of words in the TV Time title exist in the Trakt title, @@ -159,76 +155,81 @@ def check_title_name_match(tv_time_title: str, trakt_title: str) -> bool: # Using TV Time data (Name of Show, Season No and Episode) - find the corresponding show # in Trakt.TV either by automation, or asking the user to confirm. -TraktTVShow = TypeVar("TraktTVShow") -TraktMovie = TypeVar("TraktMovie") -SearchResult = Union[TraktTVShow, TraktMovie] - - -def get_items_with_same_name(title: Title, items: List[SearchResult]) -> List[SearchResult]: - shows_with_same_name = [] - - for item in items: - if check_title_name_match(title.name, item.title): - # If the title included the year of broadcast, then we can be more picky in the results - # to look for an item with a broadcast year that matches - if title.year: - # If the item title is a 1:1 match, with the same broadcast year, then bingo! - if (title.name == item.title) and (item.year == title.year): - # Clear previous results, and only use this one - shows_with_same_name = [item] - break - - # Otherwise, only add the item if the broadcast year matches - if item.year == title.year: - shows_with_same_name.append(item) - # If the item doesn't have the broadcast year, then add all the results - else: - shows_with_same_name.append(item) - - return shows_with_same_name - - -def get_show_by_name(name: str, season_number: str, episode_number: str): +def getShowByName(name, seasonNo, episodeNo): # Parse the TV Show's name for year, if one is present in the string - title = get_year_from_title(name) + titleObj = getYearFromTitle(name) + + # Create a boolean to indicate if the title contains a year, + # this is used later on to improve the accuracy of picking + # from search results + doesTitleIncludeYear = titleObj.yearValue != -1 # If the title contains a year, then replace the local variable with the stripped version - if title.year: - name = title.without_year + if doesTitleIncludeYear: + name = titleObj.titleWithoutYear - shows_with_same_name = get_items_with_same_name(title, TVShow.search(name)) + # Request the Trakt API for search results, using the name + tvSearch = TVShow.search(name) - complete_match_names = [name_from_search for name_from_search in shows_with_same_name if - name_from_search.title == name] - if len(complete_match_names) == 1: - return complete_match_names[0] - elif len(shows_with_same_name) == 1: - return shows_with_same_name[0] - elif len(shows_with_same_name) < 1: - return None - else: - # If the search contains multiple results, then we need to confirm with the user which show - # the script should use, or access the local database to see if the user has already provided - # a manual selection + # Create an array of shows which have been matched + showsWithSameName = [] + + # Go through each result from the search + for show in tvSearch: + # Check if the title is a match, based on our conditions (e.g over 50% of words match) + if checkTitleNameMatch(name, show.title): + # If the title included the year of broadcast, then we can be more picky in the results + # to look for a show with a broadcast year that matches + if doesTitleIncludeYear: + # If the show title is a 1:1 match, with the same broadcast year, then bingo! + if (name == show.title) and (show.year == titleObj.yearValue): + # Clear previous results, and only use this one + showsWithSameName = [] + showsWithSameName.append(show) + break + + # Otherwise, only add the show if the broadcast year matches + if show.year == titleObj.yearValue: + showsWithSameName.append(show) + # If the program doesn't have the broadcast year, then add all the results + else: + showsWithSameName.append(show) + + # Sweep through the results once more for 1:1 title name matches, + # then if the list contains one entry with a 1:1 match, then clear the array + # and only use this one! + completeMatchNames = [] + for nameFromSearch in showsWithSameName: + if nameFromSearch.title == name: + completeMatchNames.append(nameFromSearch) + + if len(completeMatchNames) == 1: + showsWithSameName = completeMatchNames + + # If the search contains multiple results, then we need to confirm with the user which show + # the script should use, or access the local database to see if the user has already provided + # a manual selection + if len(showsWithSameName) > 1: # Query the local database for existing selection - user_matched_query = Query() - query_result = userMatchedShowsTable.search(user_matched_query.ShowName == name) + userMatchedQuery = Query() + queryResult = userMatchedShowsTable.search( + userMatchedQuery.ShowName == name) # If the local database already contains an entry for a manual selection # then don't bother prompting the user to select it again! - if len(query_result) == 1: + if len(queryResult) == 1: # Get the first result from the query - first_match = query_result[0] + firstMatch = queryResult[0] # Get the value contains the selection index - first_match_selected_index = int(first_match.get("UserSelectedIndex")) + firstMatchSelectedIndex = int(firstMatch.get("UserSelectedIndex")) # Check if the user previously requested to skip the show - skip_show = first_match.get("SkipShow") + skipShow = firstMatch.get("SkipShow") # If the user did not skip, but provided an index selection, get the # matching show - if not skip_show: - return shows_with_same_name[first_match_selected_index] + if not skipShow: + return showsWithSameName[firstMatchSelectedIndex] # Otherwise, return None, which will trigger the script to skip # and move onto the next show else: @@ -237,46 +238,44 @@ def get_show_by_name(name: str, season_number: str, episode_number: str): # then prompt the user to make a selection else: print( - f"INFO - MANUAL INPUT REQUIRED: The TV Time data for Show '{name}' (Season {season_number}," - f"Episode {episode_number}) has {len(shows_with_same_name)} matching Trakt shows with the same name.\a " + f"INFO - MANUAL INPUT REQUIRED: The TV Time data for Show '{name}' (Season {seasonNo}, Episode {episodeNo}) has {len(showsWithSameName)} matching Trakt shows with the same name." ) # Output each show for manual selection - for idx, item in enumerate(shows_with_same_name): + for idx, item in enumerate(showsWithSameName): # Display the show's title, broadcast year, amount of seasons and a link to the Trakt page. # This will provide the user with enough information to make a selection. print( - f" ({idx + 1}) {item.title} - {item.year} - {len(item.seasons)} " - f"Season(s) - More Info: https://trakt.tv/{item.ext}" + f" ({idx + 1}) {item.title} - {item.year} - {len(item.seasons)} Season(s) - More Info: https://trakt.tv/{item.ext}" ) while True: try: # Get the user's selection, either a numerical input, or a string 'SKIP' value - index_selected = input( + indexSelected = input( "Please make a selection from above (or enter SKIP):" ) - # Exit the loop - if index_selected == "SKIP": + if indexSelected != "SKIP": + # Since the value isn't 'skip', check that the result is numerical + indexSelected = int(indexSelected) - 1 + # Exit the selection loop + break + # Otherwise, exit the loop + else: break - - # Since the value isn't 'skip', check that the result is numerical - index_selected = int(index_selected) - 1 - # Exit the selection loop - break # Still allow the user to provide the exit input, and kill the program except KeyboardInterrupt: sys.exit("Cancel requested...") # Otherwise, the user has entered an invalid value, warn the user to try again except Exception: logging.error( - f"Sorry! Please select a value between 0 to {len(shows_with_same_name)}" + f"Sorry! Please select a value between 0 to {len(showsWithSameName)}" ) # If the user entered 'SKIP', then exit from the loop with no selection, which # will trigger the program to move onto the next episode - if index_selected == "SKIP": + if indexSelected == "SKIP": # Record that the user has skipped the TV Show for import, so that # manual input isn't required everytime userMatchedShowsTable.insert( @@ -286,92 +285,103 @@ def get_show_by_name(name: str, season_number: str, episode_number: str): return None # Otherwise, return the selection which the user made from the list else: - selected_show = shows_with_same_name[int(index_selected)] + selectedShow = showsWithSameName[int(indexSelected)] userMatchedShowsTable.insert( { "ShowName": name, - "UserSelectedIndex": index_selected, + "UserSelectedIndex": indexSelected, "SkipShow": False, } ) - return selected_show + return selectedShow + + else: + if len(showsWithSameName) > 0: + # If the search returned only one result, then awesome! + # Return the show, so the import automation can continue. + return showsWithSameName[0] + else: + return None -# Since the Trakt.Py starts the indexing of seasons in the array from 0 (e.g. Season 1 in Index 0), then -# subtract the TV Time numerical value by 1, so it starts from 0 as well. However, when a TV series includes +# Since the Trakt.Py starts the indexing of seasons in the array from 0 (e.g Season 1 in Index 0), then +# subtract the TV Time numerical value by 1 so it starts from 0 as well. However, when a TV series includes # a 'special' season, Trakt.Py will place this as the first season in the array - so, don't subtract, since # this will match TV Time's existing value. -def parse_season_number(season_number, trakt_show_obj): +def parseSeasonNo(seasonNo, traktShowObj): # Parse the season number into a numerical value - season_number = int(season_number) + seasonNo = int(seasonNo) # Then get the Season Number from the first item in the array - first_season_no = trakt_show_obj.seasons[0].number + firstSeasonNo = traktShowObj.seasons[0].number # If the season number is 0, then the Trakt show contains a "special" season - if first_season_no == 0: + if firstSeasonNo == 0: # No need to modify the value, as the TV Time value will match Trakt - return season_number + return seasonNo # Otherwise, if the Trakt seasons start with no specials, then return the seasonNo, # but subtracted by one (e.g Season 1 in TV Time, will be 0) else: # Only subtract if the TV Time season number is greater than 0. - if season_number != 0: - return season_number - 1 + if seasonNo != 0: + return seasonNo - 1 # Otherwise, the TV Time season is a special! Then you don't need to change the starting position else: - return season_number + return seasonNo -def process_watched_shows() -> None: +def processWatchedShows(): + # Total amount of rows which have been processed in the CSV file + rowsCount = 0 + # Total amount of rows in the CSV file + errorStreak = 0 # Open the CSV file within the GDPR exported data - with open(WATCHED_SHOWS_PATH, newline="") as csvfile: + with open(getWatchedShowsPath(), newline="") as csvfile: # Create the CSV reader, which will break up the fields using the delimiter ',' - shows_reader = csv.DictReader(csvfile, delimiter=",") + showsReader = csv.DictReader(csvfile, delimiter=",") # Get the total amount of rows in the CSV file, - rows_total = len(list(shows_reader)) + rowsTotal = len(list(showsReader)) # Move position to the beginning of the file csvfile.seek(0, 0) # Loop through each line/record of the CSV file # Ignore the header row - next(shows_reader, None) - for rowsCount, row in enumerate(shows_reader): + next(showsReader, None) + for rowsCount, row in enumerate(showsReader): # Get the name of the TV show - tv_show_name = row["tv_show_name"] - # Get the TV Time Episode id - tv_show_episode_id = row["episode_id"] + tvShowName = row["tv_show_name"] + # Get the TV Time Episode Id + tvShowEpisodeId = row["episode_id"] # Get the TV Time Season Number - tv_show_season_number = row["episode_season_number"] + tvShowSeasonNo = row["episode_season_number"] # Get the TV Time Episode Number - tv_show_episode_number = row["episode_number"] + tvShowEpisodeNo = row["episode_number"] # Get the date which the show was marked 'watched' in TV Time - tv_show_date_watched = row["updated_at"] + tvShowDateWatched = row["updated_at"] # Parse the watched date value into a Python type - tv_show_date_watched_converted = datetime.strptime( - tv_show_date_watched, "%Y-%m-%d %H:%M:%S" + tvShowDateWatchedConverted = datetime.strptime( + tvShowDateWatched, "%Y-%m-%d %H:%M:%S" ) # Query the local database for previous entries indicating that # the episode has already been imported in the past. Which will # ease pressure on TV Time's API server during a retry of the import # process, and just save time overall without needing to create network requests - episode_completed_query = Query() - query_result = syncedEpisodesTable.search( - episode_completed_query.episodeId == tv_show_episode_id + episodeCompletedQuery = Query() + queryResult = syncedEpisodesTable.search( + episodeCompletedQuery.episodeId == tvShowEpisodeId ) # If the query returned no results, then continue to import it into Trakt - if len(query_result) == 0: + if len(queryResult) == 0: # Create a repeating loop, which will break on success, but repeats on failures - error_streak = 0 while True: # If more than 10 errors occurred in one streak, whilst trying to import the episode # then give up, and move onto the next episode, but warn the user. - if error_streak > 10: + if errorStreak > 10: logging.warning( "An error occurred 10 times in a row... skipping episode..." ) @@ -382,50 +392,47 @@ def process_watched_shows() -> None: # Other developers share the service, for free - so be considerate of your usage. time.sleep(DELAY_BETWEEN_EPISODES_IN_SECONDS) # Search Trakt for the TV show matching TV Time's title value - trakt_show = get_show_by_name( - tv_show_name, tv_show_season_number, tv_show_episode_number + traktShowObj = getShowByName( + tvShowName, tvShowSeasonNo, tvShowEpisodeNo ) # If the method returned 'None', then this is an indication to skip the episode, and # move onto the next one - if not trakt_show: + if traktShowObj is None: break # Show the progress of the import on-screen logging.info( - f"({rowsCount + 1}/{rows_total}) - Processing '{tv_show_name}' Season {tv_show_season_number} /" - f"Episode {tv_show_episode_number}" + f"({rowsCount+1}/{rowsTotal}) - Processing '{tvShowName}' Season {tvShowSeasonNo} / Episode {tvShowEpisodeNo}" ) # Get the season from the Trakt API - season = trakt_show.seasons[ - parse_season_number(tv_show_season_number, trakt_show) + season = traktShowObj.seasons[ + parseSeasonNo(tvShowSeasonNo, traktShowObj) ] # Get the episode from the season - episode = season.episodes[int(tv_show_episode_number) - 1] + episode = season.episodes[int(tvShowEpisodeNo) - 1] # Mark the episode as watched! - episode.mark_as_seen(tv_show_date_watched_converted) + episode.mark_as_seen(tvShowDateWatchedConverted) # Add the episode to the local database as imported, so it can be skipped, # if the process is repeated - syncedEpisodesTable.insert({"episodeId": tv_show_episode_id}) + syncedEpisodesTable.insert( + {"episodeId": tvShowEpisodeId}) # Clear the error streak on completing the method without errors - error_streak = 0 + errorStreak = 0 break # Catch errors which occur because of an incorrect array index. This occurs when # an incorrect Trakt show has been selected, with season/episodes which don't match TV Time. # It can also occur due to a bug in Trakt Py, whereby some seasons contain an empty array of episodes. except IndexError: - tv_show_slug = trakt_show.to_json()["shows"][0]["ids"]["ids"][ + tvShowSlug = traktShowObj.to_json()["shows"][0]["ids"]["ids"][ "slug" ] logging.warning( - f"({rowsCount}/{rows_total}) - {tv_show_name} Season {tv_show_season_number}, " - f"Episode {tv_show_episode_number} does not exist in Trakt! " - f"(https://trakt.tv/shows/{tv_show_slug}/seasons/{tv_show_season_number}/episodes/{tv_show_episode_number})" + f"({rowsCount}/{rowsTotal}) - {tvShowName} Season {tvShowSeasonNo}, Episode {tvShowEpisodeNo} does not exist in Trakt! (https://trakt.tv/shows/{tvShowSlug}/seasons/{tvShowSeasonNo}/episodes/{tvShowEpisodeNo})" ) break # Catch any errors which are raised because a show could not be found in Trakt except trakt.errors.NotFoundException: logging.warning( - f"({rowsCount}/{rows_total}) - {tv_show_name} Season {tv_show_season_number}, " - f"Episode {tv_show_episode_number} does not exist (search) in Trakt!" + f"({rowsCount}/{rowsTotal}) - {tvShowName} Season {tvShowSeasonNo}, Episode {tvShowEpisodeNo} does not exist (search) in Trakt!" ) break # Catch errors because of the program breaching the Trakt API rate limit @@ -438,12 +445,12 @@ def process_watched_shows() -> None: time.sleep(60) # Mark the exception in the error streak - error_streak += 1 + errorStreak += 1 # Catch a JSON decode error - this can be raised when the API server is down and produces a HTML page, instead of JSON except json.decoder.JSONDecodeError: logging.warning( - f"({rowsCount}/{rows_total}) - A JSON decode error occuring whilst processing {tv_show_name} " - + f"Season {tv_show_season_number}, Episode {tv_show_episode_number}! This might occur when the server is down and has produced " + f"({rowsCount}/{rowsTotal}) - A JSON decode error occuring whilst processing {tvShowName} " + + f"Season {tvShowSeasonNo}, Episode {tvShowEpisodeNo}! This might occur when the server is down and has produced " + "a HTML document instead of JSON. The script will wait 60 seconds before trying again." ) @@ -451,14 +458,14 @@ def process_watched_shows() -> None: time.sleep(60) # Mark the exception in the error streak - error_streak += 1 + errorStreak += 1 # Catch a CTRL + C keyboard input, and exits the program except KeyboardInterrupt: sys.exit("Cancel requested...") # Skip the episode else: logging.info( - f"({rowsCount}/{rows_total}) - Already imported, skipping '{tv_show_name}' Season {tv_show_season_number} / Episode {tv_show_episode_number}." + f"({rowsCount}/{rowsTotal}) - Already imported, skipping '{tvShowName}' Season {tvShowSeasonNo} / Episode {tvShowEpisodeNo}." ) @@ -466,46 +473,80 @@ def process_watched_shows() -> None: # in Trakt.TV either by automation, or asking the user to confirm. -def get_movie_by_name(name: str): +def getMovieByName(name): # Parse the Movie's name for year, if one is present in the string - title = get_year_from_title(name) + titleObj = getYearFromTitle(name) + + # Create a boolean to indicate if the title contains a year, + # this is used later on to improve the accuracy of picking + # from search results + doesTitleIncludeYear = titleObj.yearValue != -1 # If the title contains a year, then replace the local variable with the stripped version - if title.year: - name = title.without_year + if doesTitleIncludeYear: + name = titleObj.titleWithoutYear - movies_with_same_name = get_items_with_same_name(title, Movie.search(name)) + # Request the Trakt API for search results, using the name + movieSearch = Movie.search(name) - complete_match_names = [name_from_search for name_from_search in movies_with_same_name if - name_from_search.title == name] - if len(complete_match_names) == 1: - return complete_match_names[0] - elif len(movies_with_same_name) == 1: - return movies_with_same_name[0] - elif len(movies_with_same_name) < 1: - return None - else: - # If the search contains multiple results, then we need to confirm with the user which movie - # the script should use, or access the local database to see if the user has already provided - # a manual selection + # Create an array of movies which have been matched + moviesWithSameName = [] + + # Go through each result from the search + for movie in movieSearch: + # Check if the title is a match, based on our conditions (e.g over 50% of words match) + if checkTitleNameMatch(name, movie.title): + # If the title included the year of broadcast, then we can be more picky in the results + # to look for a movie with a broadcast year that matches + if doesTitleIncludeYear: + # If the movie title is a 1:1 match, with the same broadcast year, then bingo! + if (name == movie.title) and (movie.year == titleObj.yearValue): + # Clear previous results, and only use this one + moviesWithSameName = [] + moviesWithSameName.append(movie) + break + + # Otherwise, only add the movie if the broadcast year matches + if movie.year == titleObj.yearValue: + moviesWithSameName.append(movie) + # If the program doesn't have the broadcast year, then add all the results + else: + moviesWithSameName.append(movie) + + # Sweep through the results once more for 1:1 title name matches, + # then if the list contains one entry with a 1:1 match, then clear the array + # and only use this one! + completeMatchNames = [] + for nameFromSearch in moviesWithSameName: + if nameFromSearch.title == name: + completeMatchNames.append(nameFromSearch) + + if len(completeMatchNames) == 1: + moviesWithSameName = completeMatchNames + + # If the search contains multiple results, then we need to confirm with the user which movie + # the script should use, or access the local database to see if the user has already provided + # a manual selection + if len(moviesWithSameName) > 1: # Query the local database for existing selection - user_matched_query = Query() - query_result = userMatchedMoviesTable.search(user_matched_query.MovieName == name) + userMatchedQuery = Query() + queryResult = userMatchedMoviesTable.search( + userMatchedQuery.movie_name == name) # If the local database already contains an entry for a manual selection # then don't bother prompting the user to select it again! - if len(query_result) == 1: + if len(queryResult) == 1: # Get the first result from the query - first_match = query_result[0] + firstMatch = queryResult[0] # Get the value contains the selection index - first_match_selected_index = int(first_match.get("UserSelectedIndex")) + firstMatchSelectedIndex = int(firstMatch.get("UserSelectedIndex")) # Check if the user previously requested to skip the movie - skip_movie = first_match.get("SkipMovie") + skipMovie = firstMatch.get("SkipMovie") # If the user did not skip, but provided an index selection, get the # matching movie - if not skip_movie: - return movies_with_same_name[first_match_selected_index] + if not skipMovie: + return moviesWithSameName[firstMatchSelectedIndex] # Otherwise, return None, which will trigger the script to skip # and move onto the next movie else: @@ -514,12 +555,11 @@ def get_movie_by_name(name: str): # then prompt the user to make a selection else: print( - f"INFO - MANUAL INPUT REQUIRED: The TV Time data for Movie '{name}' has {len(movies_with_same_name)} " - f"matching Trakt movies with the same name.\a" + f"INFO - MANUAL INPUT REQUIRED: The TV Time data for Movie '{name}' has {len(moviesWithSameName)} matching Trakt movies with the same name." ) # Output each movie for manual selection - for idx, item in enumerate(movies_with_same_name): + for idx, item in enumerate(moviesWithSameName): # Display the movie's title, broadcast year, amount of seasons and a link to the Trakt page. # This will provide the user with enough information to make a selection. print( @@ -529,13 +569,13 @@ def get_movie_by_name(name: str): while True: try: # Get the user's selection, either a numerical input, or a string 'SKIP' value - index_selected = input( + indexSelected = input( "Please make a selection from above (or enter SKIP):" ) - if index_selected != "SKIP": + if indexSelected != "SKIP": # Since the value isn't 'skip', check that the result is numerical - index_selected = int(index_selected) - 1 + indexSelected = int(indexSelected) - 1 # Exit the selection loop break # Otherwise, exit the loop @@ -547,96 +587,105 @@ def get_movie_by_name(name: str): # Otherwise, the user has entered an invalid value, warn the user to try again except Exception: logging.error( - f"Sorry! Please select a value between 0 to {len(movies_with_same_name)}" + f"Sorry! Please select a value between 0 to {len(moviesWithSameName)}" ) # If the user entered 'SKIP', then exit from the loop with no selection, which # will trigger the program to move onto the next episode - if index_selected == "SKIP": + if indexSelected == "SKIP": # Record that the user has skipped the Movie for import, so that # manual input isn't required everytime userMatchedMoviesTable.insert( - {"MovieName": name, "UserSelectedIndex": 0, "SkipMovie": True} + {"movie_name": name, "UserSelectedIndex": 0, "SkipMovie": True} ) return None # Otherwise, return the selection which the user made from the list else: - selected_movie = movies_with_same_name[int(index_selected)] + selectedMovie = moviesWithSameName[int(indexSelected)] userMatchedMoviesTable.insert( { - "MovieName": name, - "UserSelectedIndex": index_selected, + "movie_name": name, + "UserSelectedIndex": indexSelected, "SkipMovie": False, } ) - return selected_movie + return selectedMovie + + else: + if len(moviesWithSameName) > 0: + # If the search returned only one result, then awesome! + # Return the movie, so the import automation can continue. + return moviesWithSameName[0] + else: + return None -def process_movies(): +def processMovies(): # Total amount of rows which have been processed in the CSV file + rowsCount = 0 # Total amount of rows in the CSV file - error_streak = 0 + errorStreak = 0 # Open the CSV file within the GDPR exported data - with open(MOVIES_PATH, newline="") as csvfile: + with open(getMoviesPath(), newline="") as csvfile: # Create the CSV reader, which will break up the fields using the delimiter ',' - movie_reader_temp = csv.DictReader(csvfile, delimiter=",") - movie_reader = filter(lambda p: "" != p["movie_name"], movie_reader_temp) + movieReaderTemp = csv.DictReader(csvfile, delimiter=",") + movieReader = filter(lambda p: '' != p['movie_name'], movieReaderTemp) # First, list all movies with watched type so that watchlist entry for them is not created - watched_list = [] - for row in movie_reader: + watchedList = [] + for row in movieReader: if row["type"] == "watch": - watched_list.append(row["movie_name"]) + watchedList.append(row["movie_name"]) # Move position to the beginning of the file csvfile.seek(0, 0) # Get the total amount of rows in the CSV file, - rows_total = len(list(movie_reader)) + rowsTotal = len(list(movieReader)) # Move position to the beginning of the file csvfile.seek(0, 0) # Loop through each line/record of the CSV file # Ignore the header row - next(movie_reader, None) - for rows_count, row in enumerate(movie_reader): + next(movieReader, None) + for rowsCount, row in enumerate(movieReader): # Get the name of the Movie - movie_name = row["movie_name"] + movieName = row["movie_name"] # Get the date which the movie was marked 'watched' in TV Time - activity_type = row["type"] - movie_date_watched = row["updated_at"] + activityType = row["type"] + movieDateWatched = row["updated_at"] # Parse the watched date value into a Python type - movie_date_watched_converted = datetime.strptime( - movie_date_watched, "%Y-%m-%d %H:%M:%S" + movieDateWatchedConverted = datetime.strptime( + movieDateWatched, "%Y-%m-%d %H:%M:%S" ) # Query the local database for previous entries indicating that # the episode has already been imported in the past. Which will # ease pressure on TV Time's API server during a retry of the import # process, and just save time overall without needing to create network requests - movie_query = Query() - query_result = syncedMoviesTable.search( - (movie_query.movie_name == movie_name) & (movie_query.type == "watched") + movieQuery = Query() + queryResult = syncedMoviesTable.search( + (movieQuery.movie_name == movieName) & + (movieQuery.type == "watched") ) - watchlist_query = Query() - query_result_watchlist = syncedMoviesTable.search( - (watchlist_query.movie_name == movie_name) - & (watchlist_query.type == "watchlist") + watchlistQuery = Query() + queryResultWatchlist = syncedMoviesTable.search( + (watchlistQuery.movie_name == movieName) & + (watchlistQuery.type == "watchlist") ) # If the query returned no results, then continue to import it into Trakt - if len(query_result) == 0: + if len(queryResult) == 0: # Create a repeating loop, which will break on success, but repeats on failures while True: # If movie is watched but this is an entry for watchlist, then skip - if movie_name in watched_list and activity_type != "watch": + if movieName in watchedList and activityType != "watch": logging.info( - f"Skipping '{movie_name}' to avoid redundant watchlist entry." - ) + f"Skipping '{movieName}' to avoid redundant watchlist entry.") break # If more than 10 errors occurred in one streak, whilst trying to import the episode # then give up, and move onto the next episode, but warn the user. - if error_streak > 10: + if errorStreak > 10: logging.warning( "An error occurred 10 times in a row... skipping episode..." ) @@ -647,52 +696,50 @@ def process_movies(): # Other developers share the service, for free - so be considerate of your usage. time.sleep(DELAY_BETWEEN_EPISODES_IN_SECONDS) # Search Trakt for the Movie matching TV Time's title value - trakt_movie_obj = get_movie_by_name(movie_name) + traktMovieObj = getMovieByName(movieName) # If the method returned 'None', then this is an indication to skip the episode, and # move onto the next one - if trakt_movie_obj is None: + if traktMovieObj is None: break # Show the progress of the import on-screen logging.info( - f"({rows_count + 1}/{rows_total}) - Processing '{movie_name}'" + f"({rowsCount+1}/{rowsTotal}) - Processing '{movieName}'" ) - if activity_type == "watch": - trakt_movie_obj.mark_as_seen(movie_date_watched_converted) + if activityType == "watch": + traktMovieObj.mark_as_seen( + movieDateWatchedConverted) # Add the episode to the local database as imported, so it can be skipped, # if the process is repeated syncedMoviesTable.insert( - {"movie_name": movie_name, "type": "watched"} - ) + {"movie_name": movieName, "type": "watched"}) logging.info(f"Marked as seen") - elif len(query_result_watchlist) == 0: - trakt_movie_obj.add_to_watchlist() + elif len(queryResultWatchlist) == 0: + traktMovieObj.add_to_watchlist() # Add the episode to the local database as imported, so it can be skipped, # if the process is repeated syncedMoviesTable.insert( - {"movie_name": movie_name, "type": "watchlist"} - ) + {"movie_name": movieName, "type": "watchlist"}) logging.info(f"Added to watchlist") else: logging.warning(f"Already in watchlist") # Clear the error streak on completing the method without errors - error_streak = 0 + errorStreak = 0 break # Catch errors which occur because of an incorrect array index. This occurs when # an incorrect Trakt movie has been selected, with season/episodes which don't match TV Time. # It can also occur due to a bug in Trakt Py, whereby some seasons contain an empty array of episodes. except IndexError: - movie_slug = trakt_movie_obj.to_json()["movies"][0]["ids"]["ids"][ + movieSlug = traktMovieObj.to_json()["movies"][0]["ids"]["ids"][ "slug" ] logging.warning( - f"({rows_count}/{rows_total}) - {movie_name} " - f"does not exist in Trakt! (https://trakt.tv/movies/{movie_slug}/)" + f"({rowsCount}/{rowsTotal}) - {movieName} does not exist in Trakt! (https://trakt.tv/movies/{movieSlug}/)" ) break # Catch any errors which are raised because a movie could not be found in Trakt except trakt.errors.NotFoundException: logging.warning( - f"({rows_count}/{rows_total}) - {movie_name} does not exist (search) in Trakt!" + f"({rowsCount}/{rowsTotal}) - {movieName} does not exist (search) in Trakt!" ) break # Catch errors because of the program breaching the Trakt API rate limit @@ -705,11 +752,11 @@ def process_movies(): time.sleep(60) # Mark the exception in the error streak - error_streak += 1 + errorStreak += 1 # Catch a JSON decode error - this can be raised when the API server is down and produces a HTML page, instead of JSON except json.decoder.JSONDecodeError: logging.warning( - f"({rows_count}/{rows_total}) - A JSON decode error occuring whilst processing {movie_name} " + f"({rowsCount}/{rowsTotal}) - A JSON decode error occuring whilst processing {movieName} " + f" This might occur when the server is down and has produced " + "a HTML document instead of JSON. The script will wait 60 seconds before trying again." ) @@ -718,7 +765,7 @@ def process_movies(): time.sleep(60) # Mark the exception in the error streak - error_streak += 1 + errorStreak += 1 # Catch a CTRL + C keyboard input, and exits the program except KeyboardInterrupt: sys.exit("Cancel requested...") @@ -726,72 +773,56 @@ def process_movies(): # Skip the episode else: logging.info( - f"({rows_count}/{rows_total}) - Already imported, skipping '{movie_name}'." + f"({rowsCount}/{rowsTotal}) - Already imported, skipping '{movieName}'." ) -def menu_selection() -> int: - # Display a menu selection - print(">> What do you want to do?") - print(" 1) Import Watch History for TV Shows from TV Time") - print(" 2) Import Watch Movies from TV Time") - print(" 3) Do both 1 and 2 (default)") - print(" 4) Exit") - - while True: - try: - selection = input("Enter your menu selection: ") - selection = 3 if not selection else int(selection) - break - except ValueError: - logging.warning("Invalid input. Please enter a numerical number.") - # Check if the input is valid - if not 1 <= selection <= 4: - logging.warning("Sorry - that's an unknown menu selection") - exit() - # Exit if the 4th option was chosen - if selection == 4: - logging.info("Exiting as per user's selection.") - exit() - - return selection - - def start(): - selection = menu_selection() - # Create the initial authentication with Trakt, before starting the process - if not init_trakt_auth(): + if initTraktAuth(): + # Display a menu selection + print(">> What do you want to do?") + print(" 1) Import Watch History for TV Shows from TV Time") + print(" 2) Import Watch Movies from TV Time") + + while True: + try: + menuSelection = input("Enter your menu selection: ") + menuSelection = 1 if not menuSelection else int(menuSelection) + break + except ValueError: + logging.warning( + "Invalid input. Please enter a numerical number.") + # Start the process which is required + if menuSelection == 1: + # Invoke the method which will import episodes which have been watched + # from TV Time into Trakt + processWatchedShows() + elif menuSelection == 2: + # Invoke the method which will import movies which have been watched + # from TV Time into Trakt + processMovies() + else: + logging.warning("Sorry - that's an unknown menu selection") + else: logging.error( "ERROR: Unable to complete authentication to Trakt - please try again." ) - # Start the process which is required - if selection == 1: - # Invoke the method which will import episodes which have been watched - # from TV Time into Trakt - logging.info("Processing watched shows.") - process_watched_shows() - # TODO: Add support for followed shows - elif selection == 2: - # Invoke the method which will import movies which have been watched - # from TV Time into Trakt - logging.info("Processing movies.") - process_movies() - elif selection == 3: - # Invoke both the episodes and movies import methods - logging.info("Processing both watched shows and movies.") - process_watched_shows() - process_movies() - if __name__ == "__main__": - # Check that the user has provided the GDPR path - if os.path.isdir(config.gdpr_workspace_path): - start() + # Check that the user has created the config file + if os.path.exists("config.json"): + # Check that the user has provided the GDPR path + if os.path.isdir(config.GDPR_WORKSPACE_PATH): + start() + else: + logging.error( + "Oops! The TV Time GDPR folder '" + + config.GDPR_WORKSPACE_PATH + + "' does not exist on the local system. Please check it, and try again." + ) else: logging.error( - "Oops! The TV Time GDPR folder '" - + config.gdpr_workspace_path - + "' does not exist on the local system. Please check it, and try again." + "The 'config.json' file cannot be found - have you created it yet?" ) diff --git a/TimeToTraktMovies.py b/TimeToTraktMovies.py deleted file mode 100644 index d4663e0..0000000 --- a/TimeToTraktMovies.py +++ /dev/null @@ -1,492 +0,0 @@ -#!/usr/bin/env python3 -import csv -import json -import logging -import os -import re -import sys -import time -from datetime import datetime -from pathlib import Path - -import trakt.core -from tinydb import Query, TinyDB -from trakt import init -from trakt.movies import Movie - -# Setup logger -logging.basicConfig( - format="%(asctime)s [%(levelname)7s] :: %(message)s", - level=logging.INFO, - datefmt="%Y-%m-%d %H:%M:%S", -) - -# Adjust this value to increase/decrease your requests between episodes. -# Make to remain within the rate limit: https://trakt.docs.apiary.io/#introduction/rate-limiting -DELAY_BETWEEN_EPISODES_IN_SECONDS = 1 - -# Create a database to keep track of completed processes -database = TinyDB("localStorageMovies.json") -syncedMoviesTable = database.table("SyncedMovies") -userMatchedMoviesTable = database.table("TvTimeTraktUserMatched") - - -class Expando(object): - pass - - -def isAuthenticated(): - with open("pytrakt.json") as f: - data = json.load(f) - daysBeforeExpiration = ( - datetime.fromtimestamp(data["OAUTH_EXPIRES_AT"]) - datetime.now() - ).days - if daysBeforeExpiration < 1: - return False - return True - - -def getConfiguration(): - configEx = Expando() - - with open("config.json") as f: - data = json.load(f) - - configEx.TRAKT_USERNAME = data["TRAKT_USERNAME"] - configEx.CLIENT_ID = data["CLIENT_ID"] - configEx.CLIENT_SECRET = data["CLIENT_SECRET"] - configEx.GDPR_WORKSPACE_PATH = data["GDPR_WORKSPACE_PATH"] - - CONFIG_SINGLETON = configEx - - return CONFIG_SINGLETON - - -config = getConfiguration() - -# Return the path to the CSV file contain the watched episode data from TV Time - - -def getMoviesPath(): - return config.GDPR_WORKSPACE_PATH + "/tracking-prod-records.csv" - - -def initTraktAuth(): - if isAuthenticated(): - return True - # Set the method of authentication - trakt.core.AUTH_METHOD = trakt.core.OAUTH_AUTH - return init( - config.TRAKT_USERNAME, - store=True, - client_id=config.CLIENT_ID, - client_secret=config.CLIENT_SECRET, - ) - - -# With a given title, check if it contains a year (e.g Doctor Who (2005)) -# and then return this value, with the title and year removed to improve -# the accuracy of Trakt results. - - -def getYearFromTitle(title): - ex = Expando() - - try: - # Use a regex expression to get the value within the brackets e.g The Americans (2017) - yearSearch = re.search(r"\(([A-Za-z0-9_]+)\)", title) - yearValue = yearSearch.group(1) - # Then, get the title without the year value included - titleValue = title.split("(")[0].strip() - # Put this together into an object - ex.titleWithoutYear = titleValue - ex.yearValue = int(yearValue) - return ex - except Exception: - # If the above failed, then the title doesn't include a year - # so return the object as is. - ex.titleWithoutYear = title - ex.yearValue = -1 - return ex - - -# Movies in TV Time are often different to Trakt.TV - in order to improve results and automation, -# calculate how many words are in the title, and return true if more than 50% of the title is a match, -# It seems to improve automation, and reduce manual selection.... - - -def checkTitleNameMatch(tvTimeTitle, traktTitle): - # If the name is a complete match, then don't bother comparing them! - if tvTimeTitle == traktTitle: - return True - - # Split the TvTime title - tvTimeTitleSplit = tvTimeTitle.split() - - # Create an array of words which are found in the Trakt title - wordsMatched = [] - - # Go through each word of the TV Time title, and check if it's in the Trakt title - for word in tvTimeTitleSplit: - if word in traktTitle: - wordsMatched.append(word) - - # Then calculate what percentage of words matched - quotient = len(wordsMatched) / len(traktTitle.split()) - percentage = quotient * 100 - - # If more than 50% of words in the TV Time title exist in the Trakt title, - # then return the title as a possibility to use - return percentage > 50 - - -# Using TV Time data (Name of Movie) - find the corresponding movie -# in Trakt.TV either by automation, or asking the user to confirm. - - -def getMovieByName(name): - # Parse the Movie's name for year, if one is present in the string - titleObj = getYearFromTitle(name) - - # Create a boolean to indicate if the title contains a year, - # this is used later on to improve the accuracy of picking - # from search results - doesTitleIncludeYear = titleObj.yearValue != -1 - - # If the title contains a year, then replace the local variable with the stripped version - if doesTitleIncludeYear: - name = titleObj.titleWithoutYear - - # Request the Trakt API for search results, using the name - movieSearch = Movie.search(name) - - # Create an array of movies which have been matched - moviesWithSameName = [] - - # Go through each result from the search - for movie in movieSearch: - # Check if the title is a match, based on our conditions (e.g over 50% of words match) - if checkTitleNameMatch(name, movie.title): - # If the title included the year of broadcast, then we can be more picky in the results - # to look for a movie with a broadcast year that matches - if doesTitleIncludeYear: - # If the movie title is a 1:1 match, with the same broadcast year, then bingo! - if (name == movie.title) and (movie.year == titleObj.yearValue): - # Clear previous results, and only use this one - moviesWithSameName = [] - moviesWithSameName.append(movie) - break - - # Otherwise, only add the movie if the broadcast year matches - if movie.year == titleObj.yearValue: - moviesWithSameName.append(movie) - # If the program doesn't have the broadcast year, then add all the results - else: - moviesWithSameName.append(movie) - - # Sweep through the results once more for 1:1 title name matches, - # then if the list contains one entry with a 1:1 match, then clear the array - # and only use this one! - completeMatchNames = [] - for nameFromSearch in moviesWithSameName: - if nameFromSearch.title == name: - completeMatchNames.append(nameFromSearch) - - if len(completeMatchNames) == 1: - moviesWithSameName = completeMatchNames - - # If the search contains multiple results, then we need to confirm with the user which movie - # the script should use, or access the local database to see if the user has already provided - # a manual selection - if len(moviesWithSameName) > 1: - - # Query the local database for existing selection - userMatchedQuery = Query() - queryResult = userMatchedMoviesTable.search( - userMatchedQuery.movie_name == name) - - # If the local database already contains an entry for a manual selection - # then don't bother prompting the user to select it again! - if len(queryResult) == 1: - # Get the first result from the query - firstMatch = queryResult[0] - # Get the value contains the selection index - firstMatchSelectedIndex = int(firstMatch.get("UserSelectedIndex")) - # Check if the user previously requested to skip the movie - skipMovie = firstMatch.get("SkipMovie") - # If the user did not skip, but provided an index selection, get the - # matching movie - if not skipMovie: - return moviesWithSameName[firstMatchSelectedIndex] - # Otherwise, return None, which will trigger the script to skip - # and move onto the next movie - else: - return None - # If the user has not provided a manual selection already in the process - # then prompt the user to make a selection - else: - print( - f"INFO - MANUAL INPUT REQUIRED: The TV Time data for Movie '{name}' has {len(moviesWithSameName)} matching Trakt movies with the same name." - ) - - # Output each movie for manual selection - for idx, item in enumerate(moviesWithSameName): - # Display the movie's title, broadcast year, amount of seasons and a link to the Trakt page. - # This will provide the user with enough information to make a selection. - print( - f" ({idx + 1}) {item.title} - {item.year} - More Info: https://trakt.tv/{item.ext}" - ) - - while True: - try: - # Get the user's selection, either a numerical input, or a string 'SKIP' value - indexSelected = input( - "Please make a selection from above (or enter SKIP):" - ) - - if indexSelected != "SKIP": - # Since the value isn't 'skip', check that the result is numerical - indexSelected = int(indexSelected) - 1 - # Exit the selection loop - break - # Otherwise, exit the loop - else: - break - # Still allow the user to provide the exit input, and kill the program - except KeyboardInterrupt: - sys.exit("Cancel requested...") - # Otherwise, the user has entered an invalid value, warn the user to try again - except Exception: - logging.error( - f"Sorry! Please select a value between 0 to {len(moviesWithSameName)}" - ) - - # If the user entered 'SKIP', then exit from the loop with no selection, which - # will trigger the program to move onto the next episode - if indexSelected == "SKIP": - # Record that the user has skipped the Movie for import, so that - # manual input isn't required everytime - userMatchedMoviesTable.insert( - {"movie_name": name, "UserSelectedIndex": 0, "SkipMovie": True} - ) - - return None - # Otherwise, return the selection which the user made from the list - else: - selectedMovie = moviesWithSameName[int(indexSelected)] - - userMatchedMoviesTable.insert( - { - "movie_name": name, - "UserSelectedIndex": indexSelected, - "SkipMovie": False, - } - ) - - return selectedMovie - - else: - if len(moviesWithSameName) > 0: - # If the search returned only one result, then awesome! - # Return the movie, so the import automation can continue. - return moviesWithSameName[0] - else: - return None - - -def processMovies(): - # Total amount of rows which have been processed in the CSV file - rowsCount = 0 - # Total amount of rows in the CSV file - errorStreak = 0 - # Open the CSV file within the GDPR exported data - with open(getMoviesPath(), newline="") as csvfile: - # Create the CSV reader, which will break up the fields using the delimiter ',' - movieReaderTemp = csv.DictReader(csvfile, delimiter=",") - movieReader = filter(lambda p: '' != p['movie_name'], movieReaderTemp) - # First, list all movies with watched type so that watchlist entry for them is not created - watchedList = [] - for row in movieReader: - if row["type"] == "watch": - watchedList.append(row["movie_name"]) - # Move position to the beginning of the file - csvfile.seek(0, 0) - # Get the total amount of rows in the CSV file, - rowsTotal = len(list(movieReader)) - # Move position to the beginning of the file - csvfile.seek(0, 0) - # Loop through each line/record of the CSV file - # Ignore the header row - next(movieReader, None) - for rowsCount, row in enumerate(movieReader): - # Get the name of the Movie - movieName = row["movie_name"] - # Get the date which the movie was marked 'watched' in TV Time - activityType = row["type"] - movieDateWatched = row["updated_at"] - # Parse the watched date value into a Python type - movieDateWatchedConverted = datetime.strptime( - movieDateWatched, "%Y-%m-%d %H:%M:%S" - ) - - # Query the local database for previous entries indicating that - # the episode has already been imported in the past. Which will - # ease pressure on TV Time's API server during a retry of the import - # process, and just save time overall without needing to create network requests - movieQuery = Query() - queryResult = syncedMoviesTable.search( - (movieQuery.movie_name == movieName) & - (movieQuery.type == "watched") - ) - - watchlistQuery = Query() - queryResultWatchlist = syncedMoviesTable.search( - (watchlistQuery.movie_name == movieName) & - (watchlistQuery.type == "watchlist") - ) - - # If the query returned no results, then continue to import it into Trakt - if len(queryResult) == 0: - # Create a repeating loop, which will break on success, but repeats on failures - while True: - # If movie is watched but this is an entry for watchlist, then skip - if movieName in watchedList and activityType != "watch": - break - # If more than 10 errors occurred in one streak, whilst trying to import the episode - # then give up, and move onto the next episode, but warn the user. - if errorStreak > 10: - logging.warning( - "An error occurred 10 times in a row... skipping episode..." - ) - break - try: - # Sleep for a second between each process, before going onto the next watched episode. - # This is required to remain within the API rate limit, and use the API server fairly. - # Other developers share the service, for free - so be considerate of your usage. - time.sleep(DELAY_BETWEEN_EPISODES_IN_SECONDS) - # Search Trakt for the Movie matching TV Time's title value - traktMovieObj = getMovieByName(movieName) - # If the method returned 'None', then this is an indication to skip the episode, and - # move onto the next one - if traktMovieObj is None: - break - # Show the progress of the import on-screen - logging.info( - f"({rowsCount+1}/{rowsTotal}) - Processing '{movieName}'" - ) - if activityType == "watch": - traktMovieObj.mark_as_seen( - movieDateWatchedConverted) - # Add the episode to the local database as imported, so it can be skipped, - # if the process is repeated - syncedMoviesTable.insert( - {"movie_name": movieName, "type": "watched"}) - logging.info(f"Marked as seen") - elif len(queryResultWatchlist) == 0: - traktMovieObj.add_to_watchlist() - # Add the episode to the local database as imported, so it can be skipped, - # if the process is repeated - syncedMoviesTable.insert( - {"movie_name": movieName, "type": "watchlist"}) - logging.info(f"Added to watchlist") - else: - logging.warning(f"Already in watchlist") - # Clear the error streak on completing the method without errors - errorStreak = 0 - break - # Catch errors which occur because of an incorrect array index. This occurs when - # an incorrect Trakt movie has been selected, with season/episodes which don't match TV Time. - # It can also occur due to a bug in Trakt Py, whereby some seasons contain an empty array of episodes. - except IndexError: - movieSlug = traktMovieObj.to_json()["movies"][0]["ids"]["ids"][ - "slug" - ] - logging.warning( - f"({rowsCount}/{rowsTotal}) - {movieName} does not exist in Trakt! (https://trakt.tv/movies/{movieSlug}/)" - ) - break - # Catch any errors which are raised because a movie could not be found in Trakt - except trakt.errors.NotFoundException: - logging.warning( - f"({rowsCount}/{rowsTotal}) - {movieName} does not exist (search) in Trakt!" - ) - break - # Catch errors because of the program breaching the Trakt API rate limit - except trakt.errors.RateLimitException: - logging.warning( - "The program is running too quickly and has hit Trakt's API rate limit! Please increase the delay between " - + "movies via the variable 'DELAY_BETWEEN_EPISODES_IN_SECONDS'. The program will now wait 60 seconds before " - + "trying again." - ) - time.sleep(60) - - # Mark the exception in the error streak - errorStreak += 1 - # Catch a JSON decode error - this can be raised when the API server is down and produces a HTML page, instead of JSON - except json.decoder.JSONDecodeError: - logging.warning( - f"({rowsCount}/{rowsTotal}) - A JSON decode error occuring whilst processing {movieName} " - + f" This might occur when the server is down and has produced " - + "a HTML document instead of JSON. The script will wait 60 seconds before trying again." - ) - - # Wait 60 seconds - time.sleep(60) - - # Mark the exception in the error streak - errorStreak += 1 - # Catch a CTRL + C keyboard input, and exits the program - except KeyboardInterrupt: - sys.exit("Cancel requested...") - - # Skip the episode - else: - logging.info( - f"({rowsCount}/{rowsTotal}) - Already imported, skipping '{movieName}'." - ) - - -def start(): - # Create the initial authentication with Trakt, before starting the process - if initTraktAuth(): - # Display a menu selection - print(">> What do you want to do?") - print(" 1) Import Movies History from TV Time") - - while True: - try: - menuSelection = input("Enter your menu selection: ") - menuSelection = 1 if not menuSelection else int(menuSelection) - break - except ValueError: - logging.warning( - "Invalid input. Please enter a numerical number.") - # Start the process which is required - if menuSelection == 1: - # Invoke the method which will import episodes which have been watched - # from TV Time into Trakt - processMovies() - else: - logging.warning("Sorry - that's an unknown menu selection") - else: - logging.error( - "ERROR: Unable to complete authentication to Trakt - please try again." - ) - - -if __name__ == "__main__": - # Check that the user has created the config file - if os.path.exists("config.json"): - # Check that the user has provided the GDPR path - if os.path.isdir(config.GDPR_WORKSPACE_PATH): - start() - else: - logging.error( - "Oops! The TV Time GDPR folder '" - + config.GDPR_WORKSPACE_PATH - + "' does not exist on the local system. Please check it, and try again." - ) - else: - logging.error( - "The 'config.json' file cannot be found - have you created it yet?" - )