Simple refactor

This commit is contained in:
Markus Nyman 2023-01-12 15:58:39 +02:00
parent 2a78d7ecbe
commit c2e644fad0

View file

@ -43,15 +43,13 @@ class Config:
gdpr_workspace_path: str
def is_authenticated():
def is_authenticated() -> bool:
with open("pytrakt.json") as f:
data = json.load(f)
days_before_expiration = (
datetime.fromtimestamp(data["OAUTH_EXPIRES_AT"]) - datetime.now()
).days
if days_before_expiration < 1:
return False
return True
return days_before_expiration >= 1
def get_configuration() -> Config:
@ -82,7 +80,7 @@ FOLLOWED_SHOWS_PATH = config.gdpr_workspace_path + "/followed_tv_show.csv"
MOVIES_PATH = config.gdpr_workspace_path + "/tracking-prod-records.csv"
def init_trakt_auth():
def init_trakt_auth() -> bool:
if is_authenticated():
return True
# Set the method of authentication
@ -133,7 +131,7 @@ def get_year_from_title(title) -> Title:
# It seems to improve automation, and reduce manual selection....
def check_title_name_match(tv_time_title, trakt_title):
def check_title_name_match(tv_time_title: str, trakt_title: str) -> bool:
# If the name is a complete match, then don't bother comparing them!
if tv_time_title == trakt_title:
return True
@ -162,7 +160,7 @@ def check_title_name_match(tv_time_title, trakt_title):
# in Trakt.TV either by automation, or asking the user to confirm.
def get_show_by_name(name, season_number, episode_number):
def get_show_by_name(name: str, season_number: str, episode_number: str):
# Parse the TV Show's name for year, if one is present in the string
title = get_year_from_title(name)
@ -170,15 +168,12 @@ def get_show_by_name(name, season_number, episode_number):
if title.year:
name = title.without_year
# Request the Trakt API for search results, using the name
tv_search = TVShow.search(name)
# Create an array of shows which have been matched
shows_with_same_name = []
# Go through each result from the search
for show in tv_search:
# Check if the title is a match, based on our conditions (e.g over 50% of words match)
for show in TVShow.search(name):
# Check if the title is a match, based on our conditions (e.g. over 50% of words match)
if check_title_name_match(name, show.title):
# If the title included the year of broadcast, then we can be more picky in the results
# to look for a show with a broadcast year that matches
@ -196,21 +191,17 @@ def get_show_by_name(name, season_number, episode_number):
else:
shows_with_same_name.append(show)
# Sweep through the results once more for 1:1 title name matches,
# then if the list contains one entry with a 1:1 match, then clear the array
# and only use this one!
complete_match_names = []
for nameFromSearch in shows_with_same_name:
if nameFromSearch.title == name:
complete_match_names.append(nameFromSearch)
complete_match_names = [name_from_search for name_from_search in shows_with_same_name if name_from_search.title == name]
if len(complete_match_names) == 1:
shows_with_same_name = complete_match_names
# If the search contains multiple results, then we need to confirm with the user which show
# the script should use, or access the local database to see if the user has already provided
# a manual selection
if len(shows_with_same_name) > 1:
return complete_match_names[0]
elif len(shows_with_same_name) == 1:
return shows_with_same_name[0]
elif len(shows_with_same_name) < 1:
return None
else:
# If the search contains multiple results, then we need to confirm with the user which show
# the script should use, or access the local database to see if the user has already provided
# a manual selection
# Query the local database for existing selection
user_matched_query = Query()
@ -257,14 +248,14 @@ def get_show_by_name(name, season_number, episode_number):
"Please make a selection from above (or enter SKIP):"
)
if index_selected != "SKIP":
# Since the value isn't 'skip', check that the result is numerical
index_selected = int(index_selected) - 1
# Exit the selection loop
break
# Otherwise, exit the loop
else:
# Exit the loop
if index_selected == "SKIP":
break
# Since the value isn't 'skip', check that the result is numerical
index_selected = int(index_selected) - 1
# Exit the selection loop
break
# Still allow the user to provide the exit input, and kill the program
except KeyboardInterrupt:
sys.exit("Cancel requested...")
@ -298,17 +289,9 @@ def get_show_by_name(name, season_number, episode_number):
return selected_show
else:
if len(shows_with_same_name) > 0:
# If the search returned only one result, then awesome!
# Return the show, so the import automation can continue.
return shows_with_same_name[0]
else:
return None
# Since the Trakt.Py starts the indexing of seasons in the array from 0 (e.g Season 1 in Index 0), then
# subtract the TV Time numerical value by 1 so it starts from 0 as well. However, when a TV series includes
# Since the Trakt.Py starts the indexing of seasons in the array from 0 (e.g. Season 1 in Index 0), then
# subtract the TV Time numerical value by 1, so it starts from 0 as well. However, when a TV series includes
# a 'special' season, Trakt.Py will place this as the first season in the array - so, don't subtract, since
# this will match TV Time's existing value.
@ -335,9 +318,7 @@ def parse_season_number(season_number, trakt_show_obj):
return season_number
def process_watched_shows():
# Total amount of rows in the CSV file
error_streak = 0
def process_watched_shows() -> None:
# Open the CSV file within the GDPR exported data
with open(WATCHED_SHOWS_PATH, newline="") as csvfile:
# Create the CSV reader, which will break up the fields using the delimiter ','
@ -355,9 +336,9 @@ def process_watched_shows():
# Get the TV Time Episode id
tv_show_episode_id = row["episode_id"]
# Get the TV Time Season Number
tv_show_season_no = row["episode_season_number"]
tv_show_season_number = row["episode_season_number"]
# Get the TV Time Episode Number
tv_show_episode_no = row["episode_number"]
tv_show_episode_number = row["episode_number"]
# Get the date which the show was marked 'watched' in TV Time
tv_show_date_watched = row["updated_at"]
# Parse the watched date value into a Python type
@ -377,6 +358,7 @@ def process_watched_shows():
# If the query returned no results, then continue to import it into Trakt
if len(query_result) == 0:
# Create a repeating loop, which will break on success, but repeats on failures
error_streak = 0
while True:
# If more than 10 errors occurred in one streak, whilst trying to import the episode
# then give up, and move onto the next episode, but warn the user.
@ -391,24 +373,24 @@ def process_watched_shows():
# Other developers share the service, for free - so be considerate of your usage.
time.sleep(DELAY_BETWEEN_EPISODES_IN_SECONDS)
# Search Trakt for the TV show matching TV Time's title value
trakt_show_obj = get_show_by_name(
tv_show_name, tv_show_season_no, tv_show_episode_no
trakt_show = get_show_by_name(
tv_show_name, tv_show_season_number, tv_show_episode_number
)
# If the method returned 'None', then this is an indication to skip the episode, and
# move onto the next one
if trakt_show_obj is None:
if not trakt_show:
break
# Show the progress of the import on-screen
logging.info(
f"({rowsCount + 1}/{rows_total}) - Processing '{tv_show_name}' Season {tv_show_season_no} /"
f"Episode {tv_show_episode_no}"
f"({rowsCount + 1}/{rows_total}) - Processing '{tv_show_name}' Season {tv_show_season_number} /"
f"Episode {tv_show_episode_number}"
)
# Get the season from the Trakt API
season = trakt_show_obj.seasons[
parse_season_number(tv_show_season_no, trakt_show_obj)
season = trakt_show.seasons[
parse_season_number(tv_show_season_number, trakt_show)
]
# Get the episode from the season
episode = season.episodes[int(tv_show_episode_no) - 1]
episode = season.episodes[int(tv_show_episode_number) - 1]
# Mark the episode as watched!
episode.mark_as_seen(tv_show_date_watched_converted)
# Add the episode to the local database as imported, so it can be skipped,
@ -421,20 +403,20 @@ def process_watched_shows():
# an incorrect Trakt show has been selected, with season/episodes which don't match TV Time.
# It can also occur due to a bug in Trakt Py, whereby some seasons contain an empty array of episodes.
except IndexError:
tv_show_slug = trakt_show_obj.to_json()["shows"][0]["ids"]["ids"][
tv_show_slug = trakt_show.to_json()["shows"][0]["ids"]["ids"][
"slug"
]
logging.warning(
f"({rowsCount}/{rows_total}) - {tv_show_name} Season {tv_show_season_no}, "
f"Episode {tv_show_episode_no} does not exist in Trakt! "
f"(https://trakt.tv/shows/{tv_show_slug}/seasons/{tv_show_season_no}/episodes/{tv_show_episode_no})"
f"({rowsCount}/{rows_total}) - {tv_show_name} Season {tv_show_season_number}, "
f"Episode {tv_show_episode_number} does not exist in Trakt! "
f"(https://trakt.tv/shows/{tv_show_slug}/seasons/{tv_show_season_number}/episodes/{tv_show_episode_number})"
)
break
# Catch any errors which are raised because a show could not be found in Trakt
except trakt.errors.NotFoundException:
logging.warning(
f"({rowsCount}/{rows_total}) - {tv_show_name} Season {tv_show_season_no}, "
f"Episode {tv_show_episode_no} does not exist (search) in Trakt!"
f"({rowsCount}/{rows_total}) - {tv_show_name} Season {tv_show_season_number}, "
f"Episode {tv_show_episode_number} does not exist (search) in Trakt!"
)
break
# Catch errors because of the program breaching the Trakt API rate limit
@ -452,7 +434,7 @@ def process_watched_shows():
except json.decoder.JSONDecodeError:
logging.warning(
f"({rowsCount}/{rows_total}) - A JSON decode error occuring whilst processing {tv_show_name} "
+ f"Season {tv_show_season_no}, Episode {tv_show_episode_no}! This might occur when the server is down and has produced "
+ f"Season {tv_show_season_number}, Episode {tv_show_episode_number}! This might occur when the server is down and has produced "
+ "a HTML document instead of JSON. The script will wait 60 seconds before trying again."
)
@ -467,7 +449,7 @@ def process_watched_shows():
# Skip the episode
else:
logging.info(
f"({rowsCount}/{rows_total}) - Already imported, skipping '{tv_show_name}' Season {tv_show_season_no} / Episode {tv_show_episode_no}."
f"({rowsCount}/{rows_total}) - Already imported, skipping '{tv_show_name}' Season {tv_show_season_number} / Episode {tv_show_episode_number}."
)
@ -475,7 +457,7 @@ def process_watched_shows():
# in Trakt.TV either by automation, or asking the user to confirm.
def get_movie_by_name(name):
def get_movie_by_name(name: str):
# Parse the Movie's name for year, if one is present in the string
title = get_year_from_title(name)
@ -483,15 +465,12 @@ def get_movie_by_name(name):
if title.year:
name = title.without_year
# Request the Trakt API for search results, using the name
movie_search = Movie.search(name)
# Create an array of movies which have been matched
movies_with_same_name = []
# Go through each result from the search
for movie in movie_search:
# Check if the title is a match, based on our conditions (e.g over 50% of words match)
for movie in Movie.search(name):
# Check if the title is a match, based on our conditions (e.g. over 50% of words match)
if check_title_name_match(name, movie.title):
# If the title included the year of broadcast, then we can be more picky in the results
# to look for a movie with a broadcast year that matches
@ -509,21 +488,17 @@ def get_movie_by_name(name):
else:
movies_with_same_name.append(movie)
# Sweep through the results once more for 1:1 title name matches,
# then if the list contains one entry with a 1:1 match, then clear the array
# and only use this one!
complete_match_names = []
for nameFromSearch in movies_with_same_name:
if nameFromSearch.title == name:
complete_match_names.append(nameFromSearch)
complete_match_names = [name_from_search for name_from_search in movies_with_same_name if name_from_search.title == name]
if len(complete_match_names) == 1:
movies_with_same_name = complete_match_names
# If the search contains multiple results, then we need to confirm with the user which movie
# the script should use, or access the local database to see if the user has already provided
# a manual selection
if len(movies_with_same_name) > 1:
return complete_match_names[0]
elif len(movies_with_same_name) == 1:
return movies_with_same_name[0]
elif len(movies_with_same_name) < 1:
return None
else:
# If the search contains multiple results, then we need to confirm with the user which movie
# the script should use, or access the local database to see if the user has already provided
# a manual selection
# Query the local database for existing selection
user_matched_query = Query()
@ -610,14 +585,6 @@ def get_movie_by_name(name):
return selected_movie
else:
if len(movies_with_same_name) > 0:
# If the search returned only one result, then awesome!
# Return the movie, so the import automation can continue.
return movies_with_same_name[0]
else:
return None
def process_movies():
# Total amount of rows which have been processed in the CSV file
@ -773,6 +740,7 @@ def process_movies():
f"({rows_count}/{rows_total}) - Already imported, skipping '{movie_name}'."
)
def menu_selection() -> int:
# Display a menu selection
print(">> What do you want to do?")
@ -802,30 +770,31 @@ def menu_selection() -> int:
def start():
selection = menu_selection()
# Create the initial authentication with Trakt, before starting the process
if init_trakt_auth():
# Start the process which is required
if selection == 1:
# Invoke the method which will import episodes which have been watched
# from TV Time into Trakt
logging.info("Processing watched shows.")
process_watched_shows()
# TODO: Add support for followed shows
elif selection == 2:
# Invoke the method which will import movies which have been watched
# from TV Time into Trakt
logging.info("Processing movies.")
process_movies()
elif selection == 3:
# Invoke both the episodes and movies import methods
logging.info("Processing both watched shows and movies.")
process_watched_shows()
process_movies()
else:
if not init_trakt_auth():
logging.error(
"ERROR: Unable to complete authentication to Trakt - please try again."
)
# Start the process which is required
if selection == 1:
# Invoke the method which will import episodes which have been watched
# from TV Time into Trakt
logging.info("Processing watched shows.")
process_watched_shows()
# TODO: Add support for followed shows
elif selection == 2:
# Invoke the method which will import movies which have been watched
# from TV Time into Trakt
logging.info("Processing movies.")
process_movies()
elif selection == 3:
# Invoke both the episodes and movies import methods
logging.info("Processing both watched shows and movies.")
process_watched_shows()
process_movies()
if __name__ == "__main__":
# Check that the user has provided the GDPR path