diff --git a/TimeToTrakt.py b/TimeToTrakt.py index 3610b26..8c62dbd 100644 --- a/TimeToTrakt.py +++ b/TimeToTrakt.py @@ -13,7 +13,7 @@ from typing import Optional, TypeVar, Union, Any import trakt.core from tinydb import Query, TinyDB -from tinydb.table import Table +from tinydb.table import Table, Document from trakt import init from trakt.movies import Movie from trakt.tv import TVShow @@ -97,7 +97,7 @@ def init_trakt_auth() -> bool: TraktTVShow = TypeVar("TraktTVShow") TraktMovie = TypeVar("TraktMovie") -SearchResult = Union[TraktTVShow, TraktMovie] +TraktItem = Union[TraktTVShow, TraktMovie] @dataclass @@ -125,7 +125,7 @@ class Title: self.without_year = title self.year = None - def items_with_same_name(self, items: list[SearchResult]) -> list[SearchResult]: + def items_with_same_name(self, items: list[TraktItem]) -> list[TraktItem]: with_same_name = [] for item in items: @@ -227,7 +227,7 @@ class Searcher(ABC): self.items_with_same_name = None self._user_matched_table = user_matched_table - def search(self, title: Title) -> Optional[SearchResult]: + def search(self, title: Title) -> Optional[TraktItem]: self.name = title.name # If the title contains a year, then replace the local variable with the stripped version. if title.year: @@ -251,14 +251,14 @@ class Searcher(ABC): self._handle_multiple_manually() @abstractmethod - def search_trakt(self, name: str) -> list[SearchResult]: + def search_trakt(self, name: str) -> list[TraktItem]: pass @abstractmethod def _print_manual_selection(self): pass - def _search_local(self) -> tuple[bool, SearchResult]: + def _search_local(self) -> tuple[bool, TraktItem]: user_matched_query = Query() query_result = self._user_matched_table.search(user_matched_query.Name == self.name) # If the local database already contains an entry for a manual selection @@ -274,7 +274,7 @@ class Searcher(ABC): else: return False, None - def _handle_multiple_manually(self) -> Optional[SearchResult]: + def _handle_multiple_manually(self) -> Optional[TraktItem]: self._print_manual_selection() while True: try: @@ -315,7 +315,7 @@ class Searcher(ABC): return selected_show - def _check_single_result(self) -> Optional[SearchResult]: + def _check_single_result(self) -> Optional[TraktItem]: complete_match_names = [name_from_search for name_from_search in self.items_with_same_name if name_from_search.title == self.name] if len(complete_match_names) == 1: @@ -331,20 +331,20 @@ class TVShowSearcher(Searcher): super().__init__(userMatchedShowsTable) self.tv_show = tv_show - def search_trakt(self, name: str) -> list[SearchResult]: + def search_trakt(self, name: str) -> list[TraktItem]: return TVShow.search(name) def _print_manual_selection(self) -> None: print( f"INFO - MANUAL INPUT REQUIRED: The TV Time data for Show '{self.name}'" f" (Season {self.tv_show.season_number}, Episode {self.tv_show.episode_number}) has" - f" {len(self.items_with_same_name)} matching Trakt shows with the same name.\a " + f" {len(self.items_with_same_name)} matching Trakt shows with the same name.\a" ) for idx, item in enumerate(self.items_with_same_name): print( - f"({idx + 1}) {item.title} - {item.year} - {len(item.seasons)} " - f"Season(s) - More Info: https://trakt.tv/{item.ext}" + f"({idx + 1}) {item.title} - {item.year} - {len(item.seasons)}" + f" Season(s) - More Info: https://trakt.tv/{item.ext}" ) @@ -352,7 +352,7 @@ class MovieSearcher(Searcher): def __init__(self): super().__init__(userMatchedMoviesTable) - def search_trakt(self, name: str) -> list[SearchResult]: + def search_trakt(self, name: str) -> list[TraktItem]: return Movie.search(name) def _print_manual_selection(self) -> None: @@ -368,28 +368,33 @@ class MovieSearcher(Searcher): class Processor(ABC): @abstractmethod - def process_item(self, tv_time_item: TVTimeItem, progress: float) -> None: + def _get_synced_items(self, tv_time_item: TVTimeItem) -> list[Document]: pass + @abstractmethod + def _log_already_imported(self, tv_time_item: TVTimeItem, progress: float) -> None: + pass -class TVShowProcessor(Processor): - def __init__(self): - super().__init__() + @abstractmethod + def _should_continue(self, tv_time_item: TVTimeItem) -> bool: + pass - def process_item(self, tv_time_show: TVTimeTVShow, progress: float) -> None: + @abstractmethod + def _search_trakt(self, tv_time_item: TVTimeItem) -> TraktItem: + pass + + @abstractmethod + def _process(self, tv_time_item: TVTimeItem, trakt_item: TraktItem, progress: float) -> None: + pass + + def process_item(self, tv_time_item: TVTimeItem, progress: float) -> None: # Query the local database for previous entries indicating that # the item has already been imported in the past. Which will # ease pressure on Trakt's API server during a retry of the import # process, and just save time overall without needing to create network requests. - episode_completed_query = Query() - synced_episodes = syncedEpisodesTable.search(episode_completed_query.episodeId == tv_time_show.episode_id) - + synced_episodes = self._get_synced_items(tv_time_item) if len(synced_episodes) != 0: - logging.info( - f"({progress}) - Already imported," - f" skipping \'{tv_time_show.name}\' Season {tv_time_show.season_number} /" - f" Episode {tv_time_show.episode_number}." - ) + self._log_already_imported(tv_time_item, progress) return # If the query returned no results, then continue to import it into Trakt @@ -401,29 +406,21 @@ class TVShowProcessor(Processor): if error_streak > 10: logging.warning("An error occurred 10 times in a row... skipping episode...") break + + if not self._should_continue(): + break + try: # Sleep for a second between each process, before going onto the next watched item. # This is required to remain within the API rate limit, and use the API server fairly. # Other developers share the service, for free - so be considerate of your usage. time.sleep(DELAY_BETWEEN_ITEMS_IN_SECONDS) - trakt_show = TVShowSearcher(tv_time_show).search(Title(tv_time_show.name)) - if not trakt_show: + trakt_item = self._search_trakt(tv_time_item) + if not trakt_item: break - logging.info( - f"({progress}) - Processing '{tv_time_show.name}'" - f" Season {tv_time_show.season_number} /" - f" Episode {tv_time_show.episode_number}" - ) - - season = trakt_show.seasons[tv_time_show.parse_season_number(trakt_show)] - episode = season.episodes[int(tv_time_show.episode_number) - 1] - episode.mark_as_seen(tv_time_show.date_watched) - # Add the episode to the local database as imported, so it can be skipped, - # if the process is repeated - syncedEpisodesTable.insert({"episodeId": tv_time_show.episode_id}) - logging.info(f"'{tv_time_show.name}' marked as seen") + self._process(tv_time_item, trakt_item, progress) error_streak = 0 break @@ -431,18 +428,10 @@ class TVShowProcessor(Processor): # an incorrect Trakt show has been selected, with season/episodes which don't match TV Time. # It can also occur due to a bug in Trakt Py, whereby some seasons contain an empty array of episodes. except IndexError: - tv_show_slug = trakt_show.to_json()["shows"][0]["ids"]["ids"]["slug"] - logging.warning( - f"({progress}) - {tv_time_show.name} Season {tv_time_show.season_number}," - f" Episode {tv_time_show.episode_number} does not exist in Trakt!" - f" (https://trakt.tv/shows/{tv_show_slug}/seasons/{tv_time_show.season_number}/episodes/{tv_time_show.episode_number})" - ) + self._handle_index_error(tv_time_item, trakt_item, progress) break except trakt.core.errors.NotFoundException: - logging.warning( - f"({progress}) - {tv_time_show.name} Season {tv_time_show.season_number}," - f" Episode {tv_time_show.episode_number} does not exist (search) in Trakt!" - ) + self._handle_not_found_exception(tv_time_item, progress) break except trakt.core.errors.RateLimitException: logging.warning( @@ -458,7 +447,7 @@ class TVShowProcessor(Processor): # instead of JSON except json.decoder.JSONDecodeError: logging.warning( - f"({progress}) - A JSON decode error occurred whilst processing {tv_time_show.name}" + f"({progress}) - A JSON decode error occurred whilst processing {tv_time_item.name}" " This might occur when the server is down and has produced" " a HTML document instead of JSON. The script will wait 60 seconds before trying again." ) @@ -469,111 +458,127 @@ class TVShowProcessor(Processor): except KeyboardInterrupt: sys.exit("Cancel requested...") + @abstractmethod + def _handle_index_error(self, tv_time_item: TVTimeItem, trakt_item: TraktItem, progress: float) -> None: + pass + + @abstractmethod + def _handle_not_found_exception(self, tv_time_item: TVTimeItem, progress: float) -> None: + pass + + +class TVShowProcessor(Processor): + def __init__(self): + super().__init__() + + def _get_synced_items(self, tv_time_show: TVTimeTVShow) -> list[Document]: + episode_completed_query = Query() + return syncedEpisodesTable.search(episode_completed_query.episodeId == tv_time_show.episode_id) + + def _log_already_imported(self, tv_time_show: TVTimeTVShow, progress: float) -> None: + logging.info( + f"({progress}) - Already imported," + f" skipping \'{tv_time_show.name}\' Season {tv_time_show.season_number} /" + f" Episode {tv_time_show.episode_number}." + ) + + def _should_continue(self, tv_time_show: TVTimeTVShow) -> bool: + return True + + def _search_trakt(self, tv_time_show: TVTimeTVShow) -> TraktTVShow: + return TVShowSearcher(tv_time_show).search_trakt(tv_time_show.name) + + def _process(self, tv_time_show: TVTimeTVShow, trakt_show: TraktItem, progress: float) -> None: + logging.info( + f"({progress}) - Processing '{tv_time_show.name}'" + f" Season {tv_time_show.season_number} /" + f" Episode {tv_time_show.episode_number}" + ) + + season = trakt_show.seasons[tv_time_show.parse_season_number(trakt_show)] + episode = season.episodes[int(tv_time_show.episode_number) - 1] + episode.mark_as_seen(tv_time_show.date_watched) + # Add the episode to the local database as imported, so it can be skipped, + # if the process is repeated + syncedEpisodesTable.insert({"episodeId": tv_time_show.episode_id}) + logging.info(f"'{tv_time_show.name}' marked as seen") + + def _handle_index_error(self, tv_time_show: TVTimeTVShow, trakt_show: TraktTVShow, progress: float) -> None: + tv_show_slug = trakt_show.to_json()["shows"][0]["ids"]["ids"]["slug"] + logging.warning( + f"({progress}) - {tv_time_show.name} Season {tv_time_show.season_number}," + f" Episode {tv_time_show.episode_number} does not exist in Trakt!" + f" (https://trakt.tv/shows/{tv_show_slug}/seasons/{tv_time_show.season_number}/episodes/{tv_time_show.episode_number})" + ) + + def _handle_not_found_exception(self, tv_time_show: TVTimeTVShow, progress: float) -> None: + logging.warning( + f"({progress}) - {tv_time_show.name} Season {tv_time_show.season_number}," + f" Episode {tv_time_show.episode_number} does not exist (search) in Trakt!" + ) + class MovieProcessor(Processor): def __init__(self, watched_list: list): super().__init__() self._watched_list = watched_list - def process_item(self, tv_time_movie: TVTimeMovie, progress: float) -> None: - # Query the local database for previous entries indicating that - # the episode has already been imported in the past. Which will - # ease pressure on Trakt's API server during a retry of the import - # process, and just save time overall without needing to create network requests. + def _get_synced_items(self, tv_time_movie: TVTimeMovie) -> list[Document]: movie_query = Query() - synced_movies = syncedMoviesTable.search( + return syncedMoviesTable.search( (movie_query.movie_name == tv_time_movie.name) & (movie_query.type == "watched") ) - if len(synced_movies) != 0: - logging.info(f"({progress}) - Already imported, skipping '{tv_time_movie.name}'.") - return + def _log_already_imported(self, tv_time_movie: TVTimeMovie, progress: float) -> None: + logging.info(f"({progress}) - Already imported, skipping '{tv_time_movie.name}'.") + + def _should_continue(self, tv_time_movie: TVTimeMovie) -> bool: + # If movie is watched but this is an entry for watchlist, then skip + if tv_time_movie.name in self._watched_list and tv_time_movie.activity_type != "watch": + logging.info(f"Skipping '{tv_time_movie.name}' to avoid redundant watchlist entry.") + return False + + return True + + def _search_trakt(self, tv_time_movie: TVTimeMovie) -> TraktMovie: + return MovieSearcher().search(Title(tv_time_movie.name)) + + def _process(self, tv_time_movie: TVTimeMovie, trakt_movie: TraktMovie, progress: float) -> None: + logging.info(f"({progress}) - Processing '{tv_time_movie.name}'") watchlist_query = Query() movies_in_watchlist = syncedMoviesTable.search( (watchlist_query.movie_name == tv_time_movie.name) & (watchlist_query.type == "watchlist") ) - # If the query returned no results, then continue to import it into Trakt - # Create a repeating loop, which will break on success, but repeats on failures - error_streak = 0 - while True: - # If more than 10 errors occurred in one streak, whilst trying to import the item - # then give up, and move onto the next item, but warn the user. - if error_streak > 10: - logging.warning("An error occurred 10 times in a row... skipping episode...") - break - # If movie is watched but this is an entry for watchlist, then skip - if tv_time_movie.name in self._watched_list and tv_time_movie.activity_type != "watch": - logging.info(f"Skipping '{tv_time_movie.name}' to avoid redundant watchlist entry.") - break - try: - # Sleep for a second between each process, before going onto the next watched item. - # This is required to remain within the API rate limit, and use the API server fairly. - # Other developers share the service, for free - so be considerate of your usage. - time.sleep(DELAY_BETWEEN_ITEMS_IN_SECONDS) - trakt_movie = MovieSearcher().search(Title(tv_time_movie.name)) - if not trakt_movie: - break + if tv_time_movie.activity_type == "watch": + trakt_movie.mark_as_seen(tv_time_movie.date_watched) + # Add the episode to the local database as imported, so it can be skipped, + # if the process is repeated + syncedMoviesTable.insert( + {"movie_name": tv_time_movie.name, "type": "watched"} + ) + logging.info(f"'{tv_time_movie.name}' marked as seen") + elif len(movies_in_watchlist) == 0: + trakt_movie.add_to_watchlist() + # Add the episode to the local database as imported, so it can be skipped, + # if the process is repeated + syncedMoviesTable.insert( + {"movie_name": tv_time_movie.name, "type": "watchlist"} + ) + logging.info(f"'{tv_time_movie.name}' added to watchlist") + else: + logging.warning(f"{tv_time_movie.name} already in watchlist") - logging.info(f"({progress}) - Processing '{tv_time_movie.name}'") + def _handle_index_error(self, tv_time_movie: TVTimeMovie, trakt_movie: TraktMovie, progress: float) -> None: + movie_slug = trakt_movie.to_json()["movies"][0]["ids"]["ids"]["slug"] + logging.warning( + f"({progress}) - {tv_time_movie.name}" + f" does not exist in Trakt! (https://trakt.tv/movies/{movie_slug}/)" + ) - if tv_time_movie.activity_type == "watch": - trakt_movie.mark_as_seen(tv_time_movie.date_watched) - # Add the episode to the local database as imported, so it can be skipped, - # if the process is repeated - syncedMoviesTable.insert( - {"movie_name": tv_time_movie.name, "type": "watched"} - ) - logging.info(f"'{tv_time_movie.name}' marked as seen") - elif len(movies_in_watchlist) == 0: - trakt_movie.add_to_watchlist() - # Add the episode to the local database as imported, so it can be skipped, - # if the process is repeated - syncedMoviesTable.insert( - {"movie_name": tv_time_movie.name, "type": "watchlist"} - ) - logging.info(f"'{tv_time_movie.name}' added to watchlist") - else: - logging.warning(f"{tv_time_movie.name} already in watchlist") - - error_streak = 0 - break - # Catch errors which occur because of an incorrect array index. This occurs when - # an incorrect Trakt movie has been selected, with season/episodes which don't match TV Time. - # It can also occur due to a bug in Trakt Py, whereby some seasons contain an empty array of episodes. - except IndexError: - movie_slug = trakt_movie.to_json()["movies"][0]["ids"]["ids"]["slug"] - logging.warning( - f"({progress}) - {tv_time_movie.name}" - f" does not exist in Trakt! (https://trakt.tv/movies/{movie_slug}/)" - ) - break - except trakt.core.errors.NotFoundException: - logging.warning(f"({progress}) - {tv_time_movie.name} does not exist (search) in Trakt!") - break - except trakt.core.errors.RateLimitException: - logging.warning( - "The program is running too quickly and has hit Trakt's API rate limit!" - " Please increase the delay between" - " movies via the variable 'DELAY_BETWEEN_EPISODES_IN_SECONDS'." - " The program will now wait 60 seconds before" - " trying again." - ) - time.sleep(60) - error_streak += 1 - except json.decoder.JSONDecodeError: - logging.warning( - f"({progress}) - A JSON decode error occurred whilst processing {tv_time_movie.name}" - " This might occur when the server is down and has produced" - " a HTML document instead of JSON. The script will wait 60 seconds before trying again." - ) - - time.sleep(60) - error_streak += 1 - # Catch a CTRL + C keyboard input, and exits the program - except KeyboardInterrupt: - sys.exit("Cancel requested...") + def _handle_not_found_exception(self, tv_time_movie: TVTimeMovie, progress: float) -> None: + logging.warning(f"({progress}) - {tv_time_movie.name} does not exist (search) in Trakt!") def process_watched_shows() -> None: