mirror of
https://github.com/SinTan1729/TvTimeToTrakt.git
synced 2024-12-25 21:08:37 -06:00
Movie support and refactoring (#26)
* Initial working * Cleanup * Merged scripts * Present menu before authentication. add entries * Just use one database * Add bell on manual input prompt (suggested by @WeirdAlex03) * Config to dataclass * Prompt config if it doesn't exist * Naming to snake_case * Remove use of Exodus class * Remove old Title fields * Specify TV Shows and Movies as default action * Extract menu selection to own function * Fix movie query * Simple refactor * Extract getting same name items to common function * Remove unnecessary param * Fix TinyDB movie name * WIP: refactor * Introduce common get_item function * Extract finding single result to common function * Implement general Searcher to search and handle search results * WIP: Processor class * Remove redundant dataclass annotation * Add TVTimeShow class * Make Searcher abstract * Process TV Shows and Movies using Processor class * Move common logic to base Processor class * Small cleanup * Split stuff to own files * Fix error * Fix search bug * Improve logging * Change is not to is None * Handle general exception when processing * Fix grammar * Fix typo * Fix case where nothing is found * Read release date where possible * Progress to percentage Co-authored-by: SinTan1729 <sayantan.santra689@gmail.com>
This commit is contained in:
parent
197ec19dfa
commit
26cd599e45
5 changed files with 566 additions and 654 deletions
|
@ -30,7 +30,7 @@ TV Time's API is not open. In order to get access to your personal data, you wil
|
||||||
1. Go to "Settings" under your profile
|
1. Go to "Settings" under your profile
|
||||||
2. Select ["Your API Applications"](https://trakt.tv/oauth/applications)
|
2. Select ["Your API Applications"](https://trakt.tv/oauth/applications)
|
||||||
3. Select "New Application"
|
3. Select "New Application"
|
||||||
4. Provide a name into "Name" e.g John Smith Import from TV Time
|
4. Provide a name into "Name" e.g. John Smith Import from TV Time
|
||||||
5. Paste "urn:ietf:wg:oauth:2.0:oob" into "Redirect uri:"
|
5. Paste "urn:ietf:wg:oauth:2.0:oob" into "Redirect uri:"
|
||||||
6. Click "Save App"
|
6. Click "Save App"
|
||||||
7. Make note of your details to be used later.
|
7. Make note of your details to be used later.
|
||||||
|
|
678
TimeToTrakt.py
678
TimeToTrakt.py
|
@ -3,18 +3,14 @@ import csv
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import re
|
|
||||||
import sys
|
|
||||||
import time
|
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from typing import Optional, Callable, TypeVar, Union, List
|
|
||||||
|
|
||||||
import trakt.core
|
import trakt.core
|
||||||
from tinydb import Query, TinyDB
|
|
||||||
from trakt import init
|
from trakt import init
|
||||||
from trakt.movies import Movie
|
|
||||||
from trakt.tv import TVShow
|
from processor import TVShowProcessor, MovieProcessor
|
||||||
|
from searcher import TVTimeTVShow, TVTimeMovie
|
||||||
|
|
||||||
# Setup logger
|
# Setup logger
|
||||||
logging.basicConfig(
|
logging.basicConfig(
|
||||||
|
@ -25,14 +21,7 @@ logging.basicConfig(
|
||||||
|
|
||||||
# Adjust this value to increase/decrease your requests between episodes.
|
# Adjust this value to increase/decrease your requests between episodes.
|
||||||
# Make to remain within the rate limit: https://trakt.docs.apiary.io/#introduction/rate-limiting
|
# Make to remain within the rate limit: https://trakt.docs.apiary.io/#introduction/rate-limiting
|
||||||
DELAY_BETWEEN_EPISODES_IN_SECONDS = 1
|
DELAY_BETWEEN_ITEMS_IN_SECONDS = 1
|
||||||
|
|
||||||
# Create databases to keep track of completed processes
|
|
||||||
database = TinyDB("localStorage.json")
|
|
||||||
syncedEpisodesTable = database.table("SyncedEpisodes")
|
|
||||||
userMatchedShowsTable = database.table("TvTimeTraktUserMatched")
|
|
||||||
syncedMoviesTable = database.table("SyncedMovies")
|
|
||||||
userMatchedMoviesTable = database.table("TvTimeTraktUserMatchedMovies")
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
|
@ -83,7 +72,6 @@ MOVIES_PATH = config.gdpr_workspace_path + "/tracking-prod-records.csv"
|
||||||
def init_trakt_auth() -> bool:
|
def init_trakt_auth() -> bool:
|
||||||
if is_authenticated():
|
if is_authenticated():
|
||||||
return True
|
return True
|
||||||
# Set the method of authentication
|
|
||||||
trakt.core.AUTH_METHOD = trakt.core.OAUTH_AUTH
|
trakt.core.AUTH_METHOD = trakt.core.OAUTH_AUTH
|
||||||
return init(
|
return init(
|
||||||
config.trakt_username,
|
config.trakt_username,
|
||||||
|
@ -93,648 +81,39 @@ def init_trakt_auth() -> bool:
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
# With a given title, check if it contains a year (e.g Doctor Who (2005))
|
|
||||||
# and then return this value, with the title and year removed to improve
|
|
||||||
# the accuracy of Trakt results.
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class Title:
|
|
||||||
name: str
|
|
||||||
without_year: str
|
|
||||||
year: Optional[int]
|
|
||||||
|
|
||||||
def __init__(self, title: str):
|
|
||||||
try:
|
|
||||||
# Use a regex expression to get the value within the brackets e.g. The Americans (2017)
|
|
||||||
year_search = re.search(r"\(([A-Za-z0-9_]+)\)", title)
|
|
||||||
year_value = year_search.group(1)
|
|
||||||
# Then, get the title without the year value included
|
|
||||||
title_value = title.split("(")[0].strip()
|
|
||||||
# Put this together into an object
|
|
||||||
self.name = title
|
|
||||||
self.without_year = title_value
|
|
||||||
self.year = int(year_value)
|
|
||||||
except Exception:
|
|
||||||
# If the above failed, then the title doesn't include a year
|
|
||||||
# so return the object as is.
|
|
||||||
self.name = title
|
|
||||||
self.without_year = title
|
|
||||||
self.year = None
|
|
||||||
|
|
||||||
|
|
||||||
def get_year_from_title(title) -> Title:
|
|
||||||
return Title(title)
|
|
||||||
|
|
||||||
|
|
||||||
# Shows in TV Time are often different to Trakt.TV - in order to improve results and automation,
|
|
||||||
# calculate how many words are in the title, and return true if more than 50% of the title is a match,
|
|
||||||
# It seems to improve automation, and reduce manual selection....
|
|
||||||
|
|
||||||
|
|
||||||
def check_title_name_match(tv_time_title: str, trakt_title: str) -> bool:
|
|
||||||
# If the name is a complete match, then don't bother comparing them!
|
|
||||||
if tv_time_title == trakt_title:
|
|
||||||
return True
|
|
||||||
|
|
||||||
# Split the TvTime title
|
|
||||||
tv_time_title_split = tv_time_title.split()
|
|
||||||
|
|
||||||
# Create an array of words which are found in the Trakt title
|
|
||||||
words_matched = []
|
|
||||||
|
|
||||||
# Go through each word of the TV Time title, and check if it's in the Trakt title
|
|
||||||
for word in tv_time_title_split:
|
|
||||||
if word in trakt_title:
|
|
||||||
words_matched.append(word)
|
|
||||||
|
|
||||||
# Then calculate what percentage of words matched
|
|
||||||
quotient = len(words_matched) / len(trakt_title.split())
|
|
||||||
percentage = quotient * 100
|
|
||||||
|
|
||||||
# If more than 50% of words in the TV Time title exist in the Trakt title,
|
|
||||||
# then return the title as a possibility to use
|
|
||||||
return percentage > 50
|
|
||||||
|
|
||||||
|
|
||||||
# Using TV Time data (Name of Show, Season No and Episode) - find the corresponding show
|
|
||||||
# in Trakt.TV either by automation, or asking the user to confirm.
|
|
||||||
|
|
||||||
TraktTVShow = TypeVar("TraktTVShow")
|
|
||||||
TraktMovie = TypeVar("TraktMovie")
|
|
||||||
|
|
||||||
SearchResult = Union[TraktTVShow, TraktMovie]
|
|
||||||
|
|
||||||
|
|
||||||
def get_items_with_same_name(title: Title, items: List[SearchResult]) -> List[SearchResult]:
|
|
||||||
shows_with_same_name = []
|
|
||||||
|
|
||||||
for item in items:
|
|
||||||
if check_title_name_match(title.name, item.title):
|
|
||||||
# If the title included the year of broadcast, then we can be more picky in the results
|
|
||||||
# to look for an item with a broadcast year that matches
|
|
||||||
if title.year:
|
|
||||||
# If the item title is a 1:1 match, with the same broadcast year, then bingo!
|
|
||||||
if (title.name == item.title) and (item.year == title.year):
|
|
||||||
# Clear previous results, and only use this one
|
|
||||||
shows_with_same_name = [item]
|
|
||||||
break
|
|
||||||
|
|
||||||
# Otherwise, only add the item if the broadcast year matches
|
|
||||||
if item.year == title.year:
|
|
||||||
shows_with_same_name.append(item)
|
|
||||||
# If the item doesn't have the broadcast year, then add all the results
|
|
||||||
else:
|
|
||||||
shows_with_same_name.append(item)
|
|
||||||
|
|
||||||
return shows_with_same_name
|
|
||||||
|
|
||||||
|
|
||||||
def get_show_by_name(name: str, season_number: str, episode_number: str):
|
|
||||||
# Parse the TV Show's name for year, if one is present in the string
|
|
||||||
title = get_year_from_title(name)
|
|
||||||
|
|
||||||
# If the title contains a year, then replace the local variable with the stripped version
|
|
||||||
if title.year:
|
|
||||||
name = title.without_year
|
|
||||||
|
|
||||||
shows_with_same_name = get_items_with_same_name(title, TVShow.search(name))
|
|
||||||
|
|
||||||
complete_match_names = [name_from_search for name_from_search in shows_with_same_name if
|
|
||||||
name_from_search.title == name]
|
|
||||||
if len(complete_match_names) == 1:
|
|
||||||
return complete_match_names[0]
|
|
||||||
elif len(shows_with_same_name) == 1:
|
|
||||||
return shows_with_same_name[0]
|
|
||||||
elif len(shows_with_same_name) < 1:
|
|
||||||
return None
|
|
||||||
else:
|
|
||||||
# If the search contains multiple results, then we need to confirm with the user which show
|
|
||||||
# the script should use, or access the local database to see if the user has already provided
|
|
||||||
# a manual selection
|
|
||||||
|
|
||||||
# Query the local database for existing selection
|
|
||||||
user_matched_query = Query()
|
|
||||||
query_result = userMatchedShowsTable.search(user_matched_query.ShowName == name)
|
|
||||||
|
|
||||||
# If the local database already contains an entry for a manual selection
|
|
||||||
# then don't bother prompting the user to select it again!
|
|
||||||
if len(query_result) == 1:
|
|
||||||
# Get the first result from the query
|
|
||||||
first_match = query_result[0]
|
|
||||||
# Get the value contains the selection index
|
|
||||||
first_match_selected_index = int(first_match.get("UserSelectedIndex"))
|
|
||||||
# Check if the user previously requested to skip the show
|
|
||||||
skip_show = first_match.get("SkipShow")
|
|
||||||
# If the user did not skip, but provided an index selection, get the
|
|
||||||
# matching show
|
|
||||||
if not skip_show:
|
|
||||||
return shows_with_same_name[first_match_selected_index]
|
|
||||||
# Otherwise, return None, which will trigger the script to skip
|
|
||||||
# and move onto the next show
|
|
||||||
else:
|
|
||||||
return None
|
|
||||||
# If the user has not provided a manual selection already in the process
|
|
||||||
# then prompt the user to make a selection
|
|
||||||
else:
|
|
||||||
print(
|
|
||||||
f"INFO - MANUAL INPUT REQUIRED: The TV Time data for Show '{name}' (Season {season_number},"
|
|
||||||
f"Episode {episode_number}) has {len(shows_with_same_name)} matching Trakt shows with the same name.\a "
|
|
||||||
)
|
|
||||||
|
|
||||||
# Output each show for manual selection
|
|
||||||
for idx, item in enumerate(shows_with_same_name):
|
|
||||||
# Display the show's title, broadcast year, amount of seasons and a link to the Trakt page.
|
|
||||||
# This will provide the user with enough information to make a selection.
|
|
||||||
print(
|
|
||||||
f" ({idx + 1}) {item.title} - {item.year} - {len(item.seasons)} "
|
|
||||||
f"Season(s) - More Info: https://trakt.tv/{item.ext}"
|
|
||||||
)
|
|
||||||
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
# Get the user's selection, either a numerical input, or a string 'SKIP' value
|
|
||||||
index_selected = input(
|
|
||||||
"Please make a selection from above (or enter SKIP):"
|
|
||||||
)
|
|
||||||
|
|
||||||
# Exit the loop
|
|
||||||
if index_selected == "SKIP":
|
|
||||||
break
|
|
||||||
|
|
||||||
# Since the value isn't 'skip', check that the result is numerical
|
|
||||||
index_selected = int(index_selected) - 1
|
|
||||||
# Exit the selection loop
|
|
||||||
break
|
|
||||||
# Still allow the user to provide the exit input, and kill the program
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
sys.exit("Cancel requested...")
|
|
||||||
# Otherwise, the user has entered an invalid value, warn the user to try again
|
|
||||||
except Exception:
|
|
||||||
logging.error(
|
|
||||||
f"Sorry! Please select a value between 0 to {len(shows_with_same_name)}"
|
|
||||||
)
|
|
||||||
|
|
||||||
# If the user entered 'SKIP', then exit from the loop with no selection, which
|
|
||||||
# will trigger the program to move onto the next episode
|
|
||||||
if index_selected == "SKIP":
|
|
||||||
# Record that the user has skipped the TV Show for import, so that
|
|
||||||
# manual input isn't required everytime
|
|
||||||
userMatchedShowsTable.insert(
|
|
||||||
{"ShowName": name, "UserSelectedIndex": 0, "SkipShow": True}
|
|
||||||
)
|
|
||||||
|
|
||||||
return None
|
|
||||||
# Otherwise, return the selection which the user made from the list
|
|
||||||
else:
|
|
||||||
selected_show = shows_with_same_name[int(index_selected)]
|
|
||||||
|
|
||||||
userMatchedShowsTable.insert(
|
|
||||||
{
|
|
||||||
"ShowName": name,
|
|
||||||
"UserSelectedIndex": index_selected,
|
|
||||||
"SkipShow": False,
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
return selected_show
|
|
||||||
|
|
||||||
|
|
||||||
# Since the Trakt.Py starts the indexing of seasons in the array from 0 (e.g. Season 1 in Index 0), then
|
|
||||||
# subtract the TV Time numerical value by 1, so it starts from 0 as well. However, when a TV series includes
|
|
||||||
# a 'special' season, Trakt.Py will place this as the first season in the array - so, don't subtract, since
|
|
||||||
# this will match TV Time's existing value.
|
|
||||||
|
|
||||||
|
|
||||||
def parse_season_number(season_number, trakt_show_obj):
|
|
||||||
# Parse the season number into a numerical value
|
|
||||||
season_number = int(season_number)
|
|
||||||
|
|
||||||
# Then get the Season Number from the first item in the array
|
|
||||||
first_season_no = trakt_show_obj.seasons[0].number
|
|
||||||
|
|
||||||
# If the season number is 0, then the Trakt show contains a "special" season
|
|
||||||
if first_season_no == 0:
|
|
||||||
# No need to modify the value, as the TV Time value will match Trakt
|
|
||||||
return season_number
|
|
||||||
# Otherwise, if the Trakt seasons start with no specials, then return the seasonNo,
|
|
||||||
# but subtracted by one (e.g Season 1 in TV Time, will be 0)
|
|
||||||
else:
|
|
||||||
# Only subtract if the TV Time season number is greater than 0.
|
|
||||||
if season_number != 0:
|
|
||||||
return season_number - 1
|
|
||||||
# Otherwise, the TV Time season is a special! Then you don't need to change the starting position
|
|
||||||
else:
|
|
||||||
return season_number
|
|
||||||
|
|
||||||
|
|
||||||
def process_watched_shows() -> None:
|
def process_watched_shows() -> None:
|
||||||
# Open the CSV file within the GDPR exported data
|
|
||||||
with open(WATCHED_SHOWS_PATH, newline="") as csvfile:
|
with open(WATCHED_SHOWS_PATH, newline="") as csvfile:
|
||||||
# Create the CSV reader, which will break up the fields using the delimiter ','
|
reader = csv.DictReader(csvfile, delimiter=",")
|
||||||
shows_reader = csv.DictReader(csvfile, delimiter=",")
|
total_rows = len(list(reader))
|
||||||
# Get the total amount of rows in the CSV file,
|
|
||||||
rows_total = len(list(shows_reader))
|
|
||||||
# Move position to the beginning of the file
|
|
||||||
csvfile.seek(0, 0)
|
csvfile.seek(0, 0)
|
||||||
# Loop through each line/record of the CSV file
|
|
||||||
# Ignore the header row
|
# Ignore the header row
|
||||||
next(shows_reader, None)
|
next(reader, None)
|
||||||
for rowsCount, row in enumerate(shows_reader):
|
for rows_count, row in enumerate(reader):
|
||||||
# Get the name of the TV show
|
tv_time_show = TVTimeTVShow(row)
|
||||||
tv_show_name = row["tv_show_name"]
|
TVShowProcessor().process_item(tv_time_show, "{:.2f}%".format(rows_count / total_rows * 100))
|
||||||
# Get the TV Time Episode id
|
|
||||||
tv_show_episode_id = row["episode_id"]
|
|
||||||
# Get the TV Time Season Number
|
|
||||||
tv_show_season_number = row["episode_season_number"]
|
|
||||||
# Get the TV Time Episode Number
|
|
||||||
tv_show_episode_number = row["episode_number"]
|
|
||||||
# Get the date which the show was marked 'watched' in TV Time
|
|
||||||
tv_show_date_watched = row["updated_at"]
|
|
||||||
# Parse the watched date value into a Python type
|
|
||||||
tv_show_date_watched_converted = datetime.strptime(
|
|
||||||
tv_show_date_watched, "%Y-%m-%d %H:%M:%S"
|
|
||||||
)
|
|
||||||
|
|
||||||
# Query the local database for previous entries indicating that
|
|
||||||
# the episode has already been imported in the past. Which will
|
|
||||||
# ease pressure on TV Time's API server during a retry of the import
|
|
||||||
# process, and just save time overall without needing to create network requests
|
|
||||||
episode_completed_query = Query()
|
|
||||||
query_result = syncedEpisodesTable.search(
|
|
||||||
episode_completed_query.episodeId == tv_show_episode_id
|
|
||||||
)
|
|
||||||
|
|
||||||
# If the query returned no results, then continue to import it into Trakt
|
|
||||||
if len(query_result) == 0:
|
|
||||||
# Create a repeating loop, which will break on success, but repeats on failures
|
|
||||||
error_streak = 0
|
|
||||||
while True:
|
|
||||||
# If more than 10 errors occurred in one streak, whilst trying to import the episode
|
|
||||||
# then give up, and move onto the next episode, but warn the user.
|
|
||||||
if error_streak > 10:
|
|
||||||
logging.warning(
|
|
||||||
"An error occurred 10 times in a row... skipping episode..."
|
|
||||||
)
|
|
||||||
break
|
|
||||||
try:
|
|
||||||
# Sleep for a second between each process, before going onto the next watched episode.
|
|
||||||
# This is required to remain within the API rate limit, and use the API server fairly.
|
|
||||||
# Other developers share the service, for free - so be considerate of your usage.
|
|
||||||
time.sleep(DELAY_BETWEEN_EPISODES_IN_SECONDS)
|
|
||||||
# Search Trakt for the TV show matching TV Time's title value
|
|
||||||
trakt_show = get_show_by_name(
|
|
||||||
tv_show_name, tv_show_season_number, tv_show_episode_number
|
|
||||||
)
|
|
||||||
# If the method returned 'None', then this is an indication to skip the episode, and
|
|
||||||
# move onto the next one
|
|
||||||
if not trakt_show:
|
|
||||||
break
|
|
||||||
# Show the progress of the import on-screen
|
|
||||||
logging.info(
|
|
||||||
f"({rowsCount + 1}/{rows_total}) - Processing '{tv_show_name}' Season {tv_show_season_number} /"
|
|
||||||
f"Episode {tv_show_episode_number}"
|
|
||||||
)
|
|
||||||
# Get the season from the Trakt API
|
|
||||||
season = trakt_show.seasons[
|
|
||||||
parse_season_number(tv_show_season_number, trakt_show)
|
|
||||||
]
|
|
||||||
# Get the episode from the season
|
|
||||||
episode = season.episodes[int(tv_show_episode_number) - 1]
|
|
||||||
# Mark the episode as watched!
|
|
||||||
episode.mark_as_seen(tv_show_date_watched_converted)
|
|
||||||
# Add the episode to the local database as imported, so it can be skipped,
|
|
||||||
# if the process is repeated
|
|
||||||
syncedEpisodesTable.insert({"episodeId": tv_show_episode_id})
|
|
||||||
# Clear the error streak on completing the method without errors
|
|
||||||
error_streak = 0
|
|
||||||
break
|
|
||||||
# Catch errors which occur because of an incorrect array index. This occurs when
|
|
||||||
# an incorrect Trakt show has been selected, with season/episodes which don't match TV Time.
|
|
||||||
# It can also occur due to a bug in Trakt Py, whereby some seasons contain an empty array of episodes.
|
|
||||||
except IndexError:
|
|
||||||
tv_show_slug = trakt_show.to_json()["shows"][0]["ids"]["ids"][
|
|
||||||
"slug"
|
|
||||||
]
|
|
||||||
logging.warning(
|
|
||||||
f"({rowsCount}/{rows_total}) - {tv_show_name} Season {tv_show_season_number}, "
|
|
||||||
f"Episode {tv_show_episode_number} does not exist in Trakt! "
|
|
||||||
f"(https://trakt.tv/shows/{tv_show_slug}/seasons/{tv_show_season_number}/episodes/{tv_show_episode_number})"
|
|
||||||
)
|
|
||||||
break
|
|
||||||
# Catch any errors which are raised because a show could not be found in Trakt
|
|
||||||
except trakt.errors.NotFoundException:
|
|
||||||
logging.warning(
|
|
||||||
f"({rowsCount}/{rows_total}) - {tv_show_name} Season {tv_show_season_number}, "
|
|
||||||
f"Episode {tv_show_episode_number} does not exist (search) in Trakt!"
|
|
||||||
)
|
|
||||||
break
|
|
||||||
# Catch errors because of the program breaching the Trakt API rate limit
|
|
||||||
except trakt.errors.RateLimitException:
|
|
||||||
logging.warning(
|
|
||||||
"The program is running too quickly and has hit Trakt's API rate limit! Please increase the delay between "
|
|
||||||
+ "episdoes via the variable 'DELAY_BETWEEN_EPISODES_IN_SECONDS'. The program will now wait 60 seconds before "
|
|
||||||
+ "trying again."
|
|
||||||
)
|
|
||||||
time.sleep(60)
|
|
||||||
|
|
||||||
# Mark the exception in the error streak
|
|
||||||
error_streak += 1
|
|
||||||
# Catch a JSON decode error - this can be raised when the API server is down and produces a HTML page, instead of JSON
|
|
||||||
except json.decoder.JSONDecodeError:
|
|
||||||
logging.warning(
|
|
||||||
f"({rowsCount}/{rows_total}) - A JSON decode error occuring whilst processing {tv_show_name} "
|
|
||||||
+ f"Season {tv_show_season_number}, Episode {tv_show_episode_number}! This might occur when the server is down and has produced "
|
|
||||||
+ "a HTML document instead of JSON. The script will wait 60 seconds before trying again."
|
|
||||||
)
|
|
||||||
|
|
||||||
# Wait 60 seconds
|
|
||||||
time.sleep(60)
|
|
||||||
|
|
||||||
# Mark the exception in the error streak
|
|
||||||
error_streak += 1
|
|
||||||
# Catch a CTRL + C keyboard input, and exits the program
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
sys.exit("Cancel requested...")
|
|
||||||
# Skip the episode
|
|
||||||
else:
|
|
||||||
logging.info(
|
|
||||||
f"({rowsCount}/{rows_total}) - Already imported, skipping '{tv_show_name}' Season {tv_show_season_number} / Episode {tv_show_episode_number}."
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
# Using TV Time data (Name of Movie) - find the corresponding movie
|
def process_watched_movies() -> None:
|
||||||
# in Trakt.TV either by automation, or asking the user to confirm.
|
|
||||||
|
|
||||||
|
|
||||||
def get_movie_by_name(name: str):
|
|
||||||
# Parse the Movie's name for year, if one is present in the string
|
|
||||||
title = get_year_from_title(name)
|
|
||||||
|
|
||||||
# If the title contains a year, then replace the local variable with the stripped version
|
|
||||||
if title.year:
|
|
||||||
name = title.without_year
|
|
||||||
|
|
||||||
movies_with_same_name = get_items_with_same_name(title, Movie.search(name))
|
|
||||||
|
|
||||||
complete_match_names = [name_from_search for name_from_search in movies_with_same_name if
|
|
||||||
name_from_search.title == name]
|
|
||||||
if len(complete_match_names) == 1:
|
|
||||||
return complete_match_names[0]
|
|
||||||
elif len(movies_with_same_name) == 1:
|
|
||||||
return movies_with_same_name[0]
|
|
||||||
elif len(movies_with_same_name) < 1:
|
|
||||||
return None
|
|
||||||
else:
|
|
||||||
# If the search contains multiple results, then we need to confirm with the user which movie
|
|
||||||
# the script should use, or access the local database to see if the user has already provided
|
|
||||||
# a manual selection
|
|
||||||
|
|
||||||
# Query the local database for existing selection
|
|
||||||
user_matched_query = Query()
|
|
||||||
query_result = userMatchedMoviesTable.search(user_matched_query.MovieName == name)
|
|
||||||
|
|
||||||
# If the local database already contains an entry for a manual selection
|
|
||||||
# then don't bother prompting the user to select it again!
|
|
||||||
if len(query_result) == 1:
|
|
||||||
# Get the first result from the query
|
|
||||||
first_match = query_result[0]
|
|
||||||
# Get the value contains the selection index
|
|
||||||
first_match_selected_index = int(first_match.get("UserSelectedIndex"))
|
|
||||||
# Check if the user previously requested to skip the movie
|
|
||||||
skip_movie = first_match.get("SkipMovie")
|
|
||||||
# If the user did not skip, but provided an index selection, get the
|
|
||||||
# matching movie
|
|
||||||
if not skip_movie:
|
|
||||||
return movies_with_same_name[first_match_selected_index]
|
|
||||||
# Otherwise, return None, which will trigger the script to skip
|
|
||||||
# and move onto the next movie
|
|
||||||
else:
|
|
||||||
return None
|
|
||||||
# If the user has not provided a manual selection already in the process
|
|
||||||
# then prompt the user to make a selection
|
|
||||||
else:
|
|
||||||
print(
|
|
||||||
f"INFO - MANUAL INPUT REQUIRED: The TV Time data for Movie '{name}' has {len(movies_with_same_name)} "
|
|
||||||
f"matching Trakt movies with the same name.\a"
|
|
||||||
)
|
|
||||||
|
|
||||||
# Output each movie for manual selection
|
|
||||||
for idx, item in enumerate(movies_with_same_name):
|
|
||||||
# Display the movie's title, broadcast year, amount of seasons and a link to the Trakt page.
|
|
||||||
# This will provide the user with enough information to make a selection.
|
|
||||||
print(
|
|
||||||
f" ({idx + 1}) {item.title} - {item.year} - More Info: https://trakt.tv/{item.ext}"
|
|
||||||
)
|
|
||||||
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
# Get the user's selection, either a numerical input, or a string 'SKIP' value
|
|
||||||
index_selected = input(
|
|
||||||
"Please make a selection from above (or enter SKIP):"
|
|
||||||
)
|
|
||||||
|
|
||||||
if index_selected != "SKIP":
|
|
||||||
# Since the value isn't 'skip', check that the result is numerical
|
|
||||||
index_selected = int(index_selected) - 1
|
|
||||||
# Exit the selection loop
|
|
||||||
break
|
|
||||||
# Otherwise, exit the loop
|
|
||||||
else:
|
|
||||||
break
|
|
||||||
# Still allow the user to provide the exit input, and kill the program
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
sys.exit("Cancel requested...")
|
|
||||||
# Otherwise, the user has entered an invalid value, warn the user to try again
|
|
||||||
except Exception:
|
|
||||||
logging.error(
|
|
||||||
f"Sorry! Please select a value between 0 to {len(movies_with_same_name)}"
|
|
||||||
)
|
|
||||||
|
|
||||||
# If the user entered 'SKIP', then exit from the loop with no selection, which
|
|
||||||
# will trigger the program to move onto the next episode
|
|
||||||
if index_selected == "SKIP":
|
|
||||||
# Record that the user has skipped the Movie for import, so that
|
|
||||||
# manual input isn't required everytime
|
|
||||||
userMatchedMoviesTable.insert(
|
|
||||||
{"MovieName": name, "UserSelectedIndex": 0, "SkipMovie": True}
|
|
||||||
)
|
|
||||||
|
|
||||||
return None
|
|
||||||
# Otherwise, return the selection which the user made from the list
|
|
||||||
else:
|
|
||||||
selected_movie = movies_with_same_name[int(index_selected)]
|
|
||||||
|
|
||||||
userMatchedMoviesTable.insert(
|
|
||||||
{
|
|
||||||
"MovieName": name,
|
|
||||||
"UserSelectedIndex": index_selected,
|
|
||||||
"SkipMovie": False,
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
return selected_movie
|
|
||||||
|
|
||||||
|
|
||||||
def process_movies():
|
|
||||||
# Total amount of rows which have been processed in the CSV file
|
|
||||||
# Total amount of rows in the CSV file
|
|
||||||
error_streak = 0
|
|
||||||
# Open the CSV file within the GDPR exported data
|
|
||||||
with open(MOVIES_PATH, newline="") as csvfile:
|
with open(MOVIES_PATH, newline="") as csvfile:
|
||||||
# Create the CSV reader, which will break up the fields using the delimiter ','
|
reader = filter(lambda p: p["movie_name"] != "", csv.DictReader(csvfile, delimiter=","))
|
||||||
movie_reader_temp = csv.DictReader(csvfile, delimiter=",")
|
watched_list = [row["movie_name"] for row in reader if row["type"] == "watch"]
|
||||||
movie_reader = filter(lambda p: "" != p["movie_name"], movie_reader_temp)
|
|
||||||
# First, list all movies with watched type so that watchlist entry for them is not created
|
|
||||||
watched_list = []
|
|
||||||
for row in movie_reader:
|
|
||||||
if row["type"] == "watch":
|
|
||||||
watched_list.append(row["movie_name"])
|
|
||||||
# Move position to the beginning of the file
|
|
||||||
csvfile.seek(0, 0)
|
csvfile.seek(0, 0)
|
||||||
# Get the total amount of rows in the CSV file,
|
total_rows = len(list(reader))
|
||||||
rows_total = len(list(movie_reader))
|
|
||||||
# Move position to the beginning of the file
|
|
||||||
csvfile.seek(0, 0)
|
csvfile.seek(0, 0)
|
||||||
# Loop through each line/record of the CSV file
|
|
||||||
# Ignore the header row
|
# Ignore the header row
|
||||||
next(movie_reader, None)
|
next(reader, None)
|
||||||
for rows_count, row in enumerate(movie_reader):
|
for rows_count, row in enumerate(reader):
|
||||||
# Get the name of the Movie
|
movie = TVTimeMovie(row)
|
||||||
movie_name = row["movie_name"]
|
MovieProcessor(watched_list).process_item(movie, "{:.2f}%".format(rows_count / total_rows * 100))
|
||||||
# Get the date which the movie was marked 'watched' in TV Time
|
|
||||||
activity_type = row["type"]
|
|
||||||
movie_date_watched = row["updated_at"]
|
|
||||||
# Parse the watched date value into a Python type
|
|
||||||
movie_date_watched_converted = datetime.strptime(
|
|
||||||
movie_date_watched, "%Y-%m-%d %H:%M:%S"
|
|
||||||
)
|
|
||||||
|
|
||||||
# Query the local database for previous entries indicating that
|
|
||||||
# the episode has already been imported in the past. Which will
|
|
||||||
# ease pressure on TV Time's API server during a retry of the import
|
|
||||||
# process, and just save time overall without needing to create network requests
|
|
||||||
movie_query = Query()
|
|
||||||
query_result = syncedMoviesTable.search(
|
|
||||||
(movie_query.movie_name == movie_name) & (movie_query.type == "watched")
|
|
||||||
)
|
|
||||||
|
|
||||||
watchlist_query = Query()
|
|
||||||
query_result_watchlist = syncedMoviesTable.search(
|
|
||||||
(watchlist_query.movie_name == movie_name)
|
|
||||||
& (watchlist_query.type == "watchlist")
|
|
||||||
)
|
|
||||||
|
|
||||||
# If the query returned no results, then continue to import it into Trakt
|
|
||||||
if len(query_result) == 0:
|
|
||||||
# Create a repeating loop, which will break on success, but repeats on failures
|
|
||||||
while True:
|
|
||||||
# If movie is watched but this is an entry for watchlist, then skip
|
|
||||||
if movie_name in watched_list and activity_type != "watch":
|
|
||||||
logging.info(
|
|
||||||
f"Skipping '{movie_name}' to avoid redundant watchlist entry."
|
|
||||||
)
|
|
||||||
break
|
|
||||||
# If more than 10 errors occurred in one streak, whilst trying to import the episode
|
|
||||||
# then give up, and move onto the next episode, but warn the user.
|
|
||||||
if error_streak > 10:
|
|
||||||
logging.warning(
|
|
||||||
"An error occurred 10 times in a row... skipping episode..."
|
|
||||||
)
|
|
||||||
break
|
|
||||||
try:
|
|
||||||
# Sleep for a second between each process, before going onto the next watched episode.
|
|
||||||
# This is required to remain within the API rate limit, and use the API server fairly.
|
|
||||||
# Other developers share the service, for free - so be considerate of your usage.
|
|
||||||
time.sleep(DELAY_BETWEEN_EPISODES_IN_SECONDS)
|
|
||||||
# Search Trakt for the Movie matching TV Time's title value
|
|
||||||
trakt_movie_obj = get_movie_by_name(movie_name)
|
|
||||||
# If the method returned 'None', then this is an indication to skip the episode, and
|
|
||||||
# move onto the next one
|
|
||||||
if trakt_movie_obj is None:
|
|
||||||
break
|
|
||||||
# Show the progress of the import on-screen
|
|
||||||
logging.info(
|
|
||||||
f"({rows_count + 1}/{rows_total}) - Processing '{movie_name}'"
|
|
||||||
)
|
|
||||||
if activity_type == "watch":
|
|
||||||
trakt_movie_obj.mark_as_seen(movie_date_watched_converted)
|
|
||||||
# Add the episode to the local database as imported, so it can be skipped,
|
|
||||||
# if the process is repeated
|
|
||||||
syncedMoviesTable.insert(
|
|
||||||
{"movie_name": movie_name, "type": "watched"}
|
|
||||||
)
|
|
||||||
logging.info(f"Marked as seen")
|
|
||||||
elif len(query_result_watchlist) == 0:
|
|
||||||
trakt_movie_obj.add_to_watchlist()
|
|
||||||
# Add the episode to the local database as imported, so it can be skipped,
|
|
||||||
# if the process is repeated
|
|
||||||
syncedMoviesTable.insert(
|
|
||||||
{"movie_name": movie_name, "type": "watchlist"}
|
|
||||||
)
|
|
||||||
logging.info(f"Added to watchlist")
|
|
||||||
else:
|
|
||||||
logging.warning(f"Already in watchlist")
|
|
||||||
# Clear the error streak on completing the method without errors
|
|
||||||
error_streak = 0
|
|
||||||
break
|
|
||||||
# Catch errors which occur because of an incorrect array index. This occurs when
|
|
||||||
# an incorrect Trakt movie has been selected, with season/episodes which don't match TV Time.
|
|
||||||
# It can also occur due to a bug in Trakt Py, whereby some seasons contain an empty array of episodes.
|
|
||||||
except IndexError:
|
|
||||||
movie_slug = trakt_movie_obj.to_json()["movies"][0]["ids"]["ids"][
|
|
||||||
"slug"
|
|
||||||
]
|
|
||||||
logging.warning(
|
|
||||||
f"({rows_count}/{rows_total}) - {movie_name} "
|
|
||||||
f"does not exist in Trakt! (https://trakt.tv/movies/{movie_slug}/)"
|
|
||||||
)
|
|
||||||
break
|
|
||||||
# Catch any errors which are raised because a movie could not be found in Trakt
|
|
||||||
except trakt.errors.NotFoundException:
|
|
||||||
logging.warning(
|
|
||||||
f"({rows_count}/{rows_total}) - {movie_name} does not exist (search) in Trakt!"
|
|
||||||
)
|
|
||||||
break
|
|
||||||
# Catch errors because of the program breaching the Trakt API rate limit
|
|
||||||
except trakt.errors.RateLimitException:
|
|
||||||
logging.warning(
|
|
||||||
"The program is running too quickly and has hit Trakt's API rate limit! Please increase the delay between "
|
|
||||||
+ "movies via the variable 'DELAY_BETWEEN_EPISODES_IN_SECONDS'. The program will now wait 60 seconds before "
|
|
||||||
+ "trying again."
|
|
||||||
)
|
|
||||||
time.sleep(60)
|
|
||||||
|
|
||||||
# Mark the exception in the error streak
|
|
||||||
error_streak += 1
|
|
||||||
# Catch a JSON decode error - this can be raised when the API server is down and produces a HTML page, instead of JSON
|
|
||||||
except json.decoder.JSONDecodeError:
|
|
||||||
logging.warning(
|
|
||||||
f"({rows_count}/{rows_total}) - A JSON decode error occuring whilst processing {movie_name} "
|
|
||||||
+ f" This might occur when the server is down and has produced "
|
|
||||||
+ "a HTML document instead of JSON. The script will wait 60 seconds before trying again."
|
|
||||||
)
|
|
||||||
|
|
||||||
# Wait 60 seconds
|
|
||||||
time.sleep(60)
|
|
||||||
|
|
||||||
# Mark the exception in the error streak
|
|
||||||
error_streak += 1
|
|
||||||
# Catch a CTRL + C keyboard input, and exits the program
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
sys.exit("Cancel requested...")
|
|
||||||
|
|
||||||
# Skip the episode
|
|
||||||
else:
|
|
||||||
logging.info(
|
|
||||||
f"({rows_count}/{rows_total}) - Already imported, skipping '{movie_name}'."
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def menu_selection() -> int:
|
def menu_selection() -> int:
|
||||||
# Display a menu selection
|
# Display a menu selection
|
||||||
print(">> What do you want to do?")
|
print(">> What do you want to do?")
|
||||||
print(" 1) Import Watch History for TV Shows from TV Time")
|
print(" 1) Import Watch History for TV Shows from TV Time")
|
||||||
print(" 2) Import Watch Movies from TV Time")
|
print(" 2) Import Watched Movies from TV Time")
|
||||||
print(" 3) Do both 1 and 2 (default)")
|
print(" 3) Do both 1 and 2 (default)")
|
||||||
print(" 4) Exit")
|
print(" 4) Exit")
|
||||||
|
|
||||||
|
@ -766,23 +145,17 @@ def start():
|
||||||
"ERROR: Unable to complete authentication to Trakt - please try again."
|
"ERROR: Unable to complete authentication to Trakt - please try again."
|
||||||
)
|
)
|
||||||
|
|
||||||
# Start the process which is required
|
|
||||||
if selection == 1:
|
if selection == 1:
|
||||||
# Invoke the method which will import episodes which have been watched
|
|
||||||
# from TV Time into Trakt
|
|
||||||
logging.info("Processing watched shows.")
|
logging.info("Processing watched shows.")
|
||||||
process_watched_shows()
|
process_watched_shows()
|
||||||
# TODO: Add support for followed shows
|
# TODO: Add support for followed shows
|
||||||
elif selection == 2:
|
elif selection == 2:
|
||||||
# Invoke the method which will import movies which have been watched
|
|
||||||
# from TV Time into Trakt
|
|
||||||
logging.info("Processing movies.")
|
logging.info("Processing movies.")
|
||||||
process_movies()
|
process_watched_movies()
|
||||||
elif selection == 3:
|
elif selection == 3:
|
||||||
# Invoke both the episodes and movies import methods
|
|
||||||
logging.info("Processing both watched shows and movies.")
|
logging.info("Processing both watched shows and movies.")
|
||||||
process_watched_shows()
|
process_watched_shows()
|
||||||
process_movies()
|
process_watched_movies()
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
@ -791,7 +164,6 @@ if __name__ == "__main__":
|
||||||
start()
|
start()
|
||||||
else:
|
else:
|
||||||
logging.error(
|
logging.error(
|
||||||
"Oops! The TV Time GDPR folder '"
|
f"Oops! The TV Time GDPR folder 'config.gdpr_workspace_path'"
|
||||||
+ config.gdpr_workspace_path
|
" does not exist on the local system. Please check it, and try again."
|
||||||
+ "' does not exist on the local system. Please check it, and try again."
|
|
||||||
)
|
)
|
||||||
|
|
8
database.py
Normal file
8
database.py
Normal file
|
@ -0,0 +1,8 @@
|
||||||
|
from tinydb import TinyDB
|
||||||
|
|
||||||
|
# Create databases to keep track of completed processes
|
||||||
|
database = TinyDB("localStorage.json")
|
||||||
|
syncedEpisodesTable = database.table("SyncedEpisodes")
|
||||||
|
userMatchedShowsTable = database.table("TvTimeTraktUserMatched")
|
||||||
|
syncedMoviesTable = database.table("SyncedMovies")
|
||||||
|
userMatchedMoviesTable = database.table("TvTimeTraktUserMatchedMovies")
|
237
processor.py
Normal file
237
processor.py
Normal file
|
@ -0,0 +1,237 @@
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
from abc import ABC, abstractmethod
|
||||||
|
|
||||||
|
import trakt.core
|
||||||
|
from tinydb import Query
|
||||||
|
from tinydb.table import Document
|
||||||
|
|
||||||
|
from database import syncedEpisodesTable, syncedMoviesTable
|
||||||
|
from searcher import TVShowSearcher, MovieSearcher, TraktTVShow, TraktMovie, TraktItem, TVTimeItem, TVTimeTVShow, \
|
||||||
|
TVTimeMovie
|
||||||
|
|
||||||
|
|
||||||
|
class Processor(ABC):
|
||||||
|
@abstractmethod
|
||||||
|
def _get_synced_items(self, tv_time_item: TVTimeItem) -> list[Document]:
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def _log_already_imported(self, tv_time_item: TVTimeItem, progress: str) -> None:
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def _should_continue(self, tv_time_item: TVTimeItem) -> bool:
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def _search(self, tv_time_item: TVTimeItem) -> TraktItem:
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def _process(self, tv_time_item: TVTimeItem, trakt_item: TraktItem, progress: str) -> None:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def process_item(self, tv_time_item: TVTimeItem, progress: str, delay: int = 1) -> None:
|
||||||
|
# Query the local database for previous entries indicating that
|
||||||
|
# the item has already been imported in the past. Which will
|
||||||
|
# ease pressure on Trakt's API server during a retry of the import
|
||||||
|
# process, and just save time overall without needing to create network requests.
|
||||||
|
synced_episodes = self._get_synced_items(tv_time_item)
|
||||||
|
if len(synced_episodes) != 0:
|
||||||
|
self._log_already_imported(tv_time_item, progress)
|
||||||
|
return
|
||||||
|
|
||||||
|
# If the query returned no results, then continue to import it into Trakt
|
||||||
|
# Create a repeating loop, which will break on success, but repeats on failures
|
||||||
|
error_streak = 0
|
||||||
|
while True:
|
||||||
|
# If more than 10 errors occurred in one streak, whilst trying to import the item
|
||||||
|
# then give up, and move onto the next item, but warn the user.
|
||||||
|
if error_streak > 10:
|
||||||
|
logging.warning("An error occurred 10 times in a row... skipping episode...")
|
||||||
|
break
|
||||||
|
|
||||||
|
if not self._should_continue(tv_time_item):
|
||||||
|
break
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Sleep for a second between each process, before going onto the next watched item.
|
||||||
|
# This is required to remain within the API rate limit, and use the API server fairly.
|
||||||
|
# Other developers share the service, for free - so be considerate of your usage.
|
||||||
|
time.sleep(delay)
|
||||||
|
|
||||||
|
trakt_item = self._search(tv_time_item)
|
||||||
|
if trakt_item is None:
|
||||||
|
break
|
||||||
|
|
||||||
|
self._process(tv_time_item, trakt_item, progress)
|
||||||
|
|
||||||
|
error_streak = 0
|
||||||
|
break
|
||||||
|
# Catch errors which occur because of an incorrect array index. This occurs when
|
||||||
|
# an incorrect Trakt show has been selected, with season/episodes which don't match TV Time.
|
||||||
|
# It can also occur due to a bug in Trakt Py, whereby some seasons contain an empty array of episodes.
|
||||||
|
except IndexError:
|
||||||
|
self._handle_index_error(tv_time_item, trakt_item, progress)
|
||||||
|
break
|
||||||
|
except trakt.core.errors.NotFoundException:
|
||||||
|
self._handle_not_found_exception(tv_time_item, progress)
|
||||||
|
break
|
||||||
|
except trakt.core.errors.RateLimitException:
|
||||||
|
logging.warning(
|
||||||
|
"The program is running too quickly and has hit Trakt's API rate limit!"
|
||||||
|
" Please increase the delay between"
|
||||||
|
" movies via the variable 'DELAY_BETWEEN_EPISODES_IN_SECONDS'."
|
||||||
|
" The program will now wait 60 seconds before"
|
||||||
|
" trying again."
|
||||||
|
)
|
||||||
|
time.sleep(60)
|
||||||
|
error_streak += 1
|
||||||
|
# Catch a JSON decode error - this can be raised when the API server is down and produces an HTML page,
|
||||||
|
# instead of JSON
|
||||||
|
except json.decoder.JSONDecodeError:
|
||||||
|
logging.warning(
|
||||||
|
f"({progress}) - A JSON decode error occurred whilst processing {tv_time_item.name}"
|
||||||
|
" This might occur when the server is down and has produced"
|
||||||
|
" a HTML document instead of JSON. The script will wait 60 seconds before trying again."
|
||||||
|
)
|
||||||
|
|
||||||
|
time.sleep(60)
|
||||||
|
error_streak += 1
|
||||||
|
# Catch a CTRL + C keyboard input, and exits the program
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
sys.exit("Cancel requested...")
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(
|
||||||
|
f"Got unknown error {e},"
|
||||||
|
f" while processing {tv_time_item.name}"
|
||||||
|
)
|
||||||
|
error_streak += 1
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def _handle_index_error(self, tv_time_item: TVTimeItem, trakt_item: TraktItem, progress: str) -> None:
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def _handle_not_found_exception(self, tv_time_item: TVTimeItem, progress: str) -> None:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class TVShowProcessor(Processor):
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__()
|
||||||
|
|
||||||
|
def _get_synced_items(self, tv_time_show: TVTimeTVShow) -> list[Document]:
|
||||||
|
episode_completed_query = Query()
|
||||||
|
return syncedEpisodesTable.search(episode_completed_query.episodeId == tv_time_show.episode_id)
|
||||||
|
|
||||||
|
def _log_already_imported(self, tv_time_show: TVTimeTVShow, progress: str) -> None:
|
||||||
|
logging.info(
|
||||||
|
f"({progress}) - Already imported,"
|
||||||
|
f" skipping \'{tv_time_show.name}\' Season {tv_time_show.season_number} /"
|
||||||
|
f" Episode {tv_time_show.episode_number}."
|
||||||
|
)
|
||||||
|
|
||||||
|
def _should_continue(self, tv_time_show: TVTimeTVShow) -> bool:
|
||||||
|
return True
|
||||||
|
|
||||||
|
def _search(self, tv_time_show: TVTimeTVShow) -> TraktTVShow:
|
||||||
|
return TVShowSearcher(tv_time_show).search(tv_time_show.title)
|
||||||
|
|
||||||
|
def _process(self, tv_time_show: TVTimeTVShow, trakt_show: TraktItem, progress: str) -> None:
|
||||||
|
logging.info(
|
||||||
|
f"({progress}) - Processing '{tv_time_show.name}'"
|
||||||
|
f" Season {tv_time_show.season_number} /"
|
||||||
|
f" Episode {tv_time_show.episode_number}"
|
||||||
|
)
|
||||||
|
|
||||||
|
season = trakt_show.seasons[tv_time_show.parse_season_number(trakt_show)]
|
||||||
|
episode = season.episodes[int(tv_time_show.episode_number) - 1]
|
||||||
|
episode.mark_as_seen(tv_time_show.date_watched)
|
||||||
|
# Add the episode to the local database as imported, so it can be skipped,
|
||||||
|
# if the process is repeated
|
||||||
|
syncedEpisodesTable.insert({"episodeId": tv_time_show.episode_id})
|
||||||
|
logging.info(
|
||||||
|
f"'{tv_time_show.name} Season {tv_time_show.season_number},"
|
||||||
|
f" Episode {tv_time_show.episode_number}' marked as seen"
|
||||||
|
)
|
||||||
|
|
||||||
|
def _handle_index_error(self, tv_time_show: TVTimeTVShow, trakt_show: TraktTVShow, progress: str) -> None:
|
||||||
|
tv_show_slug = trakt_show.to_json()["shows"][0]["ids"]["ids"]["slug"]
|
||||||
|
logging.warning(
|
||||||
|
f"({progress}) - {tv_time_show.name} Season {tv_time_show.season_number},"
|
||||||
|
f" Episode {tv_time_show.episode_number} does not exist in Trakt!"
|
||||||
|
f" (https://trakt.tv/shows/{tv_show_slug}/seasons/{tv_time_show.season_number}/episodes/{tv_time_show.episode_number})"
|
||||||
|
)
|
||||||
|
|
||||||
|
def _handle_not_found_exception(self, tv_time_show: TVTimeTVShow, progress: str) -> None:
|
||||||
|
logging.warning(
|
||||||
|
f"({progress}) - {tv_time_show.name} Season {tv_time_show.season_number},"
|
||||||
|
f" Episode {tv_time_show.episode_number} does not exist (search) in Trakt!"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class MovieProcessor(Processor):
|
||||||
|
def __init__(self, watched_list: list):
|
||||||
|
super().__init__()
|
||||||
|
self._watched_list = watched_list
|
||||||
|
|
||||||
|
def _get_synced_items(self, tv_time_movie: TVTimeMovie) -> list[Document]:
|
||||||
|
movie_query = Query()
|
||||||
|
return syncedMoviesTable.search(
|
||||||
|
(movie_query.movie_name == tv_time_movie.name) & (movie_query.type == "watched")
|
||||||
|
)
|
||||||
|
|
||||||
|
def _log_already_imported(self, tv_time_movie: TVTimeMovie, progress: str) -> None:
|
||||||
|
logging.info(f"({progress}) - Already imported, skipping '{tv_time_movie.name}'.")
|
||||||
|
|
||||||
|
def _should_continue(self, tv_time_movie: TVTimeMovie) -> bool:
|
||||||
|
# If movie is watched but this is an entry for watchlist, then skip
|
||||||
|
if tv_time_movie.name in self._watched_list and tv_time_movie.activity_type != "watch":
|
||||||
|
logging.info(f"Skipping '{tv_time_movie.name}' to avoid redundant watchlist entry.")
|
||||||
|
return False
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
def _search(self, tv_time_movie: TVTimeMovie) -> TraktMovie:
|
||||||
|
return MovieSearcher().search(tv_time_movie.title)
|
||||||
|
|
||||||
|
def _process(self, tv_time_movie: TVTimeMovie, trakt_movie: TraktMovie, progress: str) -> None:
|
||||||
|
logging.info(f"({progress}) - Processing '{tv_time_movie.name}'")
|
||||||
|
|
||||||
|
watchlist_query = Query()
|
||||||
|
movies_in_watchlist = syncedMoviesTable.search(
|
||||||
|
(watchlist_query.movie_name == tv_time_movie.name) & (watchlist_query.type == "watchlist")
|
||||||
|
)
|
||||||
|
|
||||||
|
if tv_time_movie.activity_type == "watch":
|
||||||
|
trakt_movie.mark_as_seen(tv_time_movie.date_watched)
|
||||||
|
# Add the episode to the local database as imported, so it can be skipped,
|
||||||
|
# if the process is repeated
|
||||||
|
syncedMoviesTable.insert(
|
||||||
|
{"movie_name": tv_time_movie.name, "type": "watched"}
|
||||||
|
)
|
||||||
|
logging.info(f"'{tv_time_movie.name}' marked as seen")
|
||||||
|
elif len(movies_in_watchlist) == 0:
|
||||||
|
trakt_movie.add_to_watchlist()
|
||||||
|
# Add the episode to the local database as imported, so it can be skipped,
|
||||||
|
# if the process is repeated
|
||||||
|
syncedMoviesTable.insert(
|
||||||
|
{"movie_name": tv_time_movie.name, "type": "watchlist"}
|
||||||
|
)
|
||||||
|
logging.info(f"'{tv_time_movie.name}' added to watchlist")
|
||||||
|
else:
|
||||||
|
logging.warning(f"{tv_time_movie.name} already in watchlist")
|
||||||
|
|
||||||
|
def _handle_index_error(self, tv_time_movie: TVTimeMovie, trakt_movie: TraktMovie, progress: str) -> None:
|
||||||
|
movie_slug = trakt_movie.to_json()["movies"][0]["ids"]["ids"]["slug"]
|
||||||
|
logging.warning(
|
||||||
|
f"({progress}) - {tv_time_movie.name}"
|
||||||
|
f" does not exist in Trakt! (https://trakt.tv/movies/{movie_slug}/)"
|
||||||
|
)
|
||||||
|
|
||||||
|
def _handle_not_found_exception(self, tv_time_movie: TVTimeMovie, progress: str) -> None:
|
||||||
|
logging.warning(f"({progress}) - {tv_time_movie.name} does not exist (search) in Trakt!")
|
295
searcher.py
Normal file
295
searcher.py
Normal file
|
@ -0,0 +1,295 @@
|
||||||
|
import logging
|
||||||
|
import re
|
||||||
|
import sys
|
||||||
|
from abc import ABC, abstractmethod
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from datetime import datetime
|
||||||
|
from typing import Optional, TypeVar, Union, Any
|
||||||
|
|
||||||
|
from tinydb import Query
|
||||||
|
from tinydb.table import Table
|
||||||
|
from trakt.movies import Movie
|
||||||
|
from trakt.tv import TVShow
|
||||||
|
|
||||||
|
from database import userMatchedShowsTable, userMatchedMoviesTable
|
||||||
|
|
||||||
|
TraktTVShow = TypeVar("TraktTVShow")
|
||||||
|
TraktMovie = TypeVar("TraktMovie")
|
||||||
|
TraktItem = Union[TraktTVShow, TraktMovie]
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Title:
|
||||||
|
name: str
|
||||||
|
without_year: str
|
||||||
|
year: Optional[int]
|
||||||
|
|
||||||
|
def __init__(self, title: str, year: Optional[int] = None):
|
||||||
|
"""
|
||||||
|
Creates a Title object. If year is not passed, it tries to parse it from the title.
|
||||||
|
"""
|
||||||
|
self.name = title
|
||||||
|
if year is not None:
|
||||||
|
self.without_year = title
|
||||||
|
self.year = year
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
# Use a regex expression to get the value within the brackets e.g. The Americans (2017)
|
||||||
|
year_search = re.search(r"\(([A-Za-z0-9_]+)\)", title)
|
||||||
|
self.year = int(year_search.group(1))
|
||||||
|
# Then, get the title without the year value included
|
||||||
|
self.without_year = title.split("(")[0].strip()
|
||||||
|
except Exception:
|
||||||
|
# If the above failed, then the title doesn't include a year
|
||||||
|
# so create the value with "defaults"
|
||||||
|
self.name = title
|
||||||
|
self.without_year = title
|
||||||
|
self.year = None
|
||||||
|
|
||||||
|
def items_with_same_name(self, items: list[TraktItem]) -> list[TraktItem]:
|
||||||
|
with_same_name = []
|
||||||
|
|
||||||
|
for item in items:
|
||||||
|
if self.matches(item.title):
|
||||||
|
# If the title included the year of broadcast, then we can be more picky in the results
|
||||||
|
# to look for an item with a broadcast year that matches
|
||||||
|
if self.year:
|
||||||
|
# If the item title is a 1:1 match, with the same broadcast year, then bingo!
|
||||||
|
if (self.name == item.title) and (item.year == self.year):
|
||||||
|
# Clear previous results, and only use this one
|
||||||
|
with_same_name = [item]
|
||||||
|
break
|
||||||
|
|
||||||
|
# Otherwise, only add the item if the broadcast year matches
|
||||||
|
if item.year == self.year:
|
||||||
|
with_same_name.append(item)
|
||||||
|
# If the item doesn't have the broadcast year, then add all the results
|
||||||
|
else:
|
||||||
|
with_same_name.append(item)
|
||||||
|
|
||||||
|
return with_same_name
|
||||||
|
|
||||||
|
def matches(self, other: str) -> bool:
|
||||||
|
"""
|
||||||
|
Shows in TV Time are often different to Trakt.TV - in order to improve results and automation,
|
||||||
|
calculate how many words are in the title, and return true if more than 50% of the title is a match,
|
||||||
|
It seems to improve automation, and reduce manual selection...
|
||||||
|
"""
|
||||||
|
|
||||||
|
# If the name is a complete match, then don't bother comparing them!
|
||||||
|
if self.name == other:
|
||||||
|
return True
|
||||||
|
|
||||||
|
# Go through each word of the TV Time title, and check if it's in the Trakt title
|
||||||
|
words_matched = [word for word in self.name.split() if word in other]
|
||||||
|
|
||||||
|
# Then calculate what percentage of words matched
|
||||||
|
quotient = len(words_matched) / len(other.split())
|
||||||
|
percentage = quotient * 100
|
||||||
|
|
||||||
|
# If more than 50% of words in the TV Time title exist in the Trakt title,
|
||||||
|
# then return the title as a possibility to use
|
||||||
|
return percentage > 50
|
||||||
|
|
||||||
|
|
||||||
|
class TVTimeItem:
|
||||||
|
def __init__(self, name: str, updated_at: str):
|
||||||
|
self.name = name
|
||||||
|
self.title = Title(name)
|
||||||
|
# Get the date which the show was marked 'watched' in TV Time
|
||||||
|
# and parse the watched date value into a Python object
|
||||||
|
self.date_watched = datetime.strptime(
|
||||||
|
updated_at, "%Y-%m-%d %H:%M:%S"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TVTimeTVShow(TVTimeItem):
|
||||||
|
def __init__(self, row: Any):
|
||||||
|
super().__init__(row["tv_show_name"], row["updated_at"])
|
||||||
|
self.episode_id = row["episode_id"]
|
||||||
|
self.season_number = row["episode_season_number"]
|
||||||
|
self.episode_number = row["episode_number"]
|
||||||
|
|
||||||
|
def parse_season_number(self, trakt_show: TraktTVShow) -> int:
|
||||||
|
"""
|
||||||
|
Since the Trakt.Py starts the indexing of seasons in the array from 0 (e.g. Season 1 in Index 0), then
|
||||||
|
subtract the TV Time numerical value by 1, so it starts from 0 as well. However, when a TV series includes
|
||||||
|
a 'special' season, Trakt.Py will place this as the first season in the array - so, don't subtract, since
|
||||||
|
this will match TV Time's existing value.
|
||||||
|
"""
|
||||||
|
|
||||||
|
season_number = int(self.season_number)
|
||||||
|
# Gen get the Season Number from the first item in the array
|
||||||
|
first_season_no = trakt_show.seasons[0].number
|
||||||
|
|
||||||
|
# If the season number is 0, then the Trakt show contains a "special" season
|
||||||
|
if first_season_no == 0:
|
||||||
|
# No need to modify the value, as the TV Time value will match Trakt
|
||||||
|
return season_number
|
||||||
|
# Otherwise, if the Trakt seasons start with no specials, then return the seasonNo,
|
||||||
|
# but subtracted by one (e.g. Season 1 in TV Time, will be 0)
|
||||||
|
else:
|
||||||
|
# Only subtract if the TV Time season number is greater than 0.
|
||||||
|
if season_number != 0:
|
||||||
|
return season_number - 1
|
||||||
|
# Otherwise, the TV Time season is a special! Then you don't need to change the starting position
|
||||||
|
else:
|
||||||
|
return season_number
|
||||||
|
|
||||||
|
|
||||||
|
class TVTimeMovie(TVTimeItem):
|
||||||
|
def __init__(self, row: Any):
|
||||||
|
super().__init__(row["movie_name"], row["updated_at"])
|
||||||
|
self.activity_type = row["type"]
|
||||||
|
|
||||||
|
# Release date is available for movies
|
||||||
|
|
||||||
|
release_date = datetime.strptime(
|
||||||
|
row["release_date"], "%Y-%m-%d %H:%M:%S"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Check that date is valid
|
||||||
|
if release_date.year > 1800:
|
||||||
|
self.title = Title(self.title.name, release_date.year)
|
||||||
|
|
||||||
|
|
||||||
|
class Searcher(ABC):
|
||||||
|
def __init__(self, user_matched_table: Table):
|
||||||
|
self.name = ""
|
||||||
|
self.items_with_same_name: Optional[TraktItem] = None
|
||||||
|
self._user_matched_table = user_matched_table
|
||||||
|
|
||||||
|
def search(self, title: Title) -> Optional[TraktItem]:
|
||||||
|
self.name = title.name
|
||||||
|
# If the title contains a year, then replace the local variable with the stripped version.
|
||||||
|
if title.year:
|
||||||
|
self.name = title.without_year
|
||||||
|
self.items_with_same_name = title.items_with_same_name(self.search_trakt(self.name))
|
||||||
|
|
||||||
|
single_result = self._check_single_result()
|
||||||
|
if single_result:
|
||||||
|
return single_result
|
||||||
|
elif len(self.items_with_same_name) < 1:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# If the search contains multiple results, then we need to confirm with the user which show
|
||||||
|
# the script should use, or access the local database to see if the user has already provided
|
||||||
|
# a manual selection
|
||||||
|
|
||||||
|
should_return, query_result = self._search_local()
|
||||||
|
if should_return:
|
||||||
|
return query_result
|
||||||
|
# If the user has not provided a manual selection already in the process
|
||||||
|
# then prompt the user to make a selection
|
||||||
|
else:
|
||||||
|
self._handle_multiple_manually()
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def search_trakt(self, name: str) -> list[TraktItem]:
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def _print_manual_selection(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def _search_local(self) -> tuple[bool, TraktItem]:
|
||||||
|
user_matched_query = Query()
|
||||||
|
query_result = self._user_matched_table.search(user_matched_query.Name == self.name)
|
||||||
|
# If the local database already contains an entry for a manual selection
|
||||||
|
# then don't bother prompting the user to select it again!
|
||||||
|
if len(query_result) == 1:
|
||||||
|
first_match = query_result[0]
|
||||||
|
first_match_selected_index = int(first_match.get("UserSelectedIndex"))
|
||||||
|
skip_show = first_match.get("Skip")
|
||||||
|
if skip_show is None:
|
||||||
|
return True, self.items_with_same_name[first_match_selected_index]
|
||||||
|
else:
|
||||||
|
return True, None
|
||||||
|
else:
|
||||||
|
return False, None
|
||||||
|
|
||||||
|
def _handle_multiple_manually(self) -> Optional[TraktItem]:
|
||||||
|
self._print_manual_selection()
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
# Get the user's selection, either a numerical input, or a string 'SKIP' value
|
||||||
|
index_selected = input("Please make a selection from above (or enter SKIP): ")
|
||||||
|
if index_selected == "SKIP":
|
||||||
|
break
|
||||||
|
|
||||||
|
index_selected = int(index_selected) - 1
|
||||||
|
break
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
sys.exit("Cancel requested...")
|
||||||
|
except Exception:
|
||||||
|
logging.error(f"Sorry! Please select a value between 0 to {len(self.items_with_same_name)}")
|
||||||
|
|
||||||
|
# If the user entered 'SKIP', then exit from the loop with no selection, which
|
||||||
|
# will trigger the program to move onto the next episode
|
||||||
|
if index_selected == "SKIP":
|
||||||
|
# Record that the user has skipped the TV Show for import, so that
|
||||||
|
# manual input isn't required everytime
|
||||||
|
self._user_matched_table.insert(
|
||||||
|
{"Name": self.name, "UserSelectedIndex": 0, "Skip": True}
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
else:
|
||||||
|
selected_show = self.items_with_same_name[int(index_selected)]
|
||||||
|
|
||||||
|
self._user_matched_table.insert(
|
||||||
|
{
|
||||||
|
"Name": self.name,
|
||||||
|
"UserSelectedIndex": index_selected,
|
||||||
|
"Skip": False,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
return selected_show
|
||||||
|
|
||||||
|
def _check_single_result(self) -> Optional[TraktItem]:
|
||||||
|
complete_match_names = [name_from_search for name_from_search in self.items_with_same_name if
|
||||||
|
name_from_search.title == self.name]
|
||||||
|
if len(complete_match_names) == 1:
|
||||||
|
return complete_match_names[0]
|
||||||
|
elif len(self.items_with_same_name) == 1:
|
||||||
|
return self.items_with_same_name[0]
|
||||||
|
|
||||||
|
|
||||||
|
class TVShowSearcher(Searcher):
|
||||||
|
def __init__(self, tv_show: TVTimeTVShow):
|
||||||
|
super().__init__(userMatchedShowsTable)
|
||||||
|
self.tv_show = tv_show
|
||||||
|
|
||||||
|
def search_trakt(self, name: str) -> list[TraktItem]:
|
||||||
|
return TVShow.search(name)
|
||||||
|
|
||||||
|
def _print_manual_selection(self) -> None:
|
||||||
|
print(
|
||||||
|
f"INFO - MANUAL INPUT REQUIRED: The TV Time data for Show '{self.name}'"
|
||||||
|
f" (Season {self.tv_show.season_number}, Episode {self.tv_show.episode_number}) has"
|
||||||
|
f" {len(self.items_with_same_name)} matching Trakt shows with the same name.\a"
|
||||||
|
)
|
||||||
|
|
||||||
|
for idx, item in enumerate(self.items_with_same_name):
|
||||||
|
print(
|
||||||
|
f"({idx + 1}) {item.title} - {item.year} - {len(item.seasons)}"
|
||||||
|
f" Season(s) - More Info: https://trakt.tv/{item.ext}"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class MovieSearcher(Searcher):
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__(userMatchedMoviesTable)
|
||||||
|
|
||||||
|
def search_trakt(self, name: str) -> list[TraktItem]:
|
||||||
|
return Movie.search(name)
|
||||||
|
|
||||||
|
def _print_manual_selection(self) -> None:
|
||||||
|
print(
|
||||||
|
f"INFO - MANUAL INPUT REQUIRED: The TV Time data for Movie '{self.name}'"
|
||||||
|
f" has {len(self.items_with_same_name)}"
|
||||||
|
f" matching Trakt movies with the same name.\a"
|
||||||
|
)
|
||||||
|
|
||||||
|
for idx, item in enumerate(self.items_with_same_name):
|
||||||
|
print(f"({idx + 1}) {item.title} - {item.year} - More Info: https://trakt.tv/{item.ext}")
|
Loading…
Reference in a new issue