mirror of
https://github.com/SinTan1729/TvTimeToTrakt.git
synced 2024-12-25 21:08:37 -06:00
Added support for movies (#17)
* Initial working * Removed .vscode * Cleanup * Merged scripts * Updated README.md * Present menu before authentication. add entries * Just use one database * Remove irrelevant entries * Add bell on manual input prompt (suggested by @WeirdAlex03) * Separate file is no longer used for movies * Config to dataclass * Prompt config if it doesn't exist * Naming to snake_case * Remove use of Exodus class * Remove old Title fields * Specify TV Shows and Movies as default action * Extract menu selection to own function * Fix movie query * Simple refactor * Extract getting same name items to common function * Remove unnecessary param * Fix TinyDB movie name Co-authored-by: Markus Nyman <markus@nyman.dev>
This commit is contained in:
parent
0b87de1abb
commit
01ab7897c2
4 changed files with 521 additions and 248 deletions
7
.gitignore
vendored
7
.gitignore
vendored
|
@ -1,6 +1 @@
|
||||||
watched_show_process_tracker.json
|
*.json
|
||||||
localStorage.json
|
|
||||||
config.json
|
|
||||||
TimeToTrackt.py
|
|
||||||
seen_episode.csv
|
|
||||||
followed_tv_show.csv
|
|
||||||
|
|
15
.vscode/launch.json
vendored
15
.vscode/launch.json
vendored
|
@ -1,15 +0,0 @@
|
||||||
{
|
|
||||||
// Use IntelliSense to learn about possible attributes.
|
|
||||||
// Hover to view descriptions of existing attributes.
|
|
||||||
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
|
|
||||||
"version": "0.2.0",
|
|
||||||
"configurations": [
|
|
||||||
{
|
|
||||||
"name": "Python: Current File",
|
|
||||||
"type": "python",
|
|
||||||
"request": "launch",
|
|
||||||
"program": "${file}",
|
|
||||||
"console": "integratedTerminal"
|
|
||||||
}
|
|
||||||
]
|
|
||||||
}
|
|
|
@ -2,13 +2,13 @@
|
||||||
|
|
||||||
![](https://loch.digital/image_for_external_apps/4342799-01.png)
|
![](https://loch.digital/image_for_external_apps/4342799-01.png)
|
||||||
|
|
||||||
A Python script to import TV Time tracked episode data into Trakt.TV - using data export provided by TV Time through a GDPR request.
|
A Python script to import TV Time tracked episode and movie data into Trakt.TV - using data export provided by TV Time through a GDPR request.
|
||||||
|
|
||||||
# Notes
|
# Notes
|
||||||
|
|
||||||
1. The script is using limited data provided from a GDPR request - so the accuracy isn't 100%. But you will be prompted to manually pick the Trakt show, when it can't be determined automatically.
|
1. The script is using limited data provided from a GDPR request - so the accuracy isn't 100%. But you will be prompted to manually pick the Trakt show/movie, when it can't be determined automatically.
|
||||||
2. A delay of 1 second is added between each episode to ensure fair use of Trakt's API server. You can adjust this for your own import, but make sure it's at least 0.75 second to remain within the rate limit: https://trakt.docs.apiary.io/#introduction/rate-limiting
|
2. A delay of 1 second is added between each episode/movie to ensure fair use of Trakt's API server. You can adjust this for your own import, but make sure it's at least 0.75 second to remain within the rate limit: https://trakt.docs.apiary.io/#introduction/rate-limiting
|
||||||
3. Episodes which have been processed will be saved to a TinyDB file `localStorage.json` - when you restart the script, the program will skip those episodes which have been marked 'imported'.
|
3. Episodes which have been processed will be saved to a TinyDB file `localStorage.json` - when you restart the script, the program will skip those episodes which have been marked 'imported'. Processed movies are also stored in the same file.
|
||||||
|
|
||||||
# Setup
|
# Setup
|
||||||
|
|
||||||
|
|
739
TimeToTrakt.py
739
TimeToTrakt.py
|
@ -6,12 +6,14 @@ import os
|
||||||
import re
|
import re
|
||||||
import sys
|
import sys
|
||||||
import time
|
import time
|
||||||
|
from dataclasses import dataclass
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from pathlib import Path
|
from typing import Optional, Callable, TypeVar, Union, List
|
||||||
|
|
||||||
import trakt.core
|
import trakt.core
|
||||||
from tinydb import Query, TinyDB
|
from tinydb import Query, TinyDB
|
||||||
from trakt import init
|
from trakt import init
|
||||||
|
from trakt.movies import Movie
|
||||||
from trakt.tv import TVShow
|
from trakt.tv import TVShow
|
||||||
|
|
||||||
# Setup logger
|
# Setup logger
|
||||||
|
@ -25,66 +27,69 @@ logging.basicConfig(
|
||||||
# Make to remain within the rate limit: https://trakt.docs.apiary.io/#introduction/rate-limiting
|
# Make to remain within the rate limit: https://trakt.docs.apiary.io/#introduction/rate-limiting
|
||||||
DELAY_BETWEEN_EPISODES_IN_SECONDS = 1
|
DELAY_BETWEEN_EPISODES_IN_SECONDS = 1
|
||||||
|
|
||||||
# Create a database to keep track of completed processes
|
# Create databases to keep track of completed processes
|
||||||
database = TinyDB("localStorage.json")
|
database = TinyDB("localStorage.json")
|
||||||
syncedEpisodesTable = database.table("SyncedEpisodes")
|
syncedEpisodesTable = database.table("SyncedEpisodes")
|
||||||
userMatchedShowsTable = database.table("TvTimeTraktUserMatched")
|
userMatchedShowsTable = database.table("TvTimeTraktUserMatched")
|
||||||
|
syncedMoviesTable = database.table("SyncedMovies")
|
||||||
|
userMatchedMoviesTable = database.table("TvTimeTraktUserMatchedMovies")
|
||||||
|
|
||||||
|
|
||||||
class Expando(object):
|
@dataclass
|
||||||
pass
|
class Config:
|
||||||
|
trakt_username: str
|
||||||
|
client_id: str
|
||||||
|
client_secret: str
|
||||||
|
gdpr_workspace_path: str
|
||||||
|
|
||||||
|
|
||||||
def isAuthenticated():
|
def is_authenticated() -> bool:
|
||||||
with open("pytrakt.json") as f:
|
with open("pytrakt.json") as f:
|
||||||
data = json.load(f)
|
data = json.load(f)
|
||||||
daysBeforeExpiration = (
|
days_before_expiration = (
|
||||||
datetime.fromtimestamp(data["OAUTH_EXPIRES_AT"]) - datetime.now()
|
datetime.fromtimestamp(data["OAUTH_EXPIRES_AT"]) - datetime.now()
|
||||||
).days
|
).days
|
||||||
if daysBeforeExpiration < 1:
|
return days_before_expiration >= 1
|
||||||
return False
|
|
||||||
return True
|
|
||||||
|
|
||||||
|
|
||||||
def getConfiguration():
|
def get_configuration() -> Config:
|
||||||
configEx = Expando()
|
try:
|
||||||
|
with open("config.json") as f:
|
||||||
|
data = json.load(f)
|
||||||
|
|
||||||
with open("config.json") as f:
|
return Config(
|
||||||
data = json.load(f)
|
data["TRAKT_USERNAME"],
|
||||||
|
data["CLIENT_ID"],
|
||||||
configEx.TRAKT_USERNAME = data["TRAKT_USERNAME"]
|
data["CLIENT_SECRET"],
|
||||||
configEx.CLIENT_ID = data["CLIENT_ID"]
|
data["GDPR_WORKSPACE_PATH"],
|
||||||
configEx.CLIENT_SECRET = data["CLIENT_SECRET"]
|
)
|
||||||
configEx.GDPR_WORKSPACE_PATH = data["GDPR_WORKSPACE_PATH"]
|
except FileNotFoundError:
|
||||||
|
logging.info("config.json not found prompting user for input")
|
||||||
CONFIG_SINGLETON = configEx
|
return Config(
|
||||||
|
input("Enter your Trakt.tv username: "),
|
||||||
return CONFIG_SINGLETON
|
input("Enter you Client id: "),
|
||||||
|
input("Enter your Client secret: "),
|
||||||
|
input("Enter your GDPR workspace path: ")
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
config = getConfiguration()
|
config = get_configuration()
|
||||||
|
|
||||||
# Return the path to the CSV file contain the watched episode data from TV Time
|
WATCHED_SHOWS_PATH = config.gdpr_workspace_path + "/seen_episode.csv"
|
||||||
|
FOLLOWED_SHOWS_PATH = config.gdpr_workspace_path + "/followed_tv_show.csv"
|
||||||
|
MOVIES_PATH = config.gdpr_workspace_path + "/tracking-prod-records.csv"
|
||||||
|
|
||||||
|
|
||||||
def getWatchedShowsPath():
|
def init_trakt_auth() -> bool:
|
||||||
return config.GDPR_WORKSPACE_PATH + "/seen_episode.csv"
|
if is_authenticated():
|
||||||
|
|
||||||
|
|
||||||
def getFollowedShowsPath():
|
|
||||||
return config.GDPR_WORKSPACE_PATH + "/followed_tv_show.csv"
|
|
||||||
|
|
||||||
|
|
||||||
def initTraktAuth():
|
|
||||||
if isAuthenticated():
|
|
||||||
return True
|
return True
|
||||||
# Set the method of authentication
|
# Set the method of authentication
|
||||||
trakt.core.AUTH_METHOD = trakt.core.OAUTH_AUTH
|
trakt.core.AUTH_METHOD = trakt.core.OAUTH_AUTH
|
||||||
return init(
|
return init(
|
||||||
config.TRAKT_USERNAME,
|
config.trakt_username,
|
||||||
store=True,
|
store=True,
|
||||||
client_id=config.CLIENT_ID,
|
client_id=config.client_id,
|
||||||
client_secret=config.CLIENT_SECRET,
|
client_secret=config.client_secret,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@ -92,26 +97,33 @@ def initTraktAuth():
|
||||||
# and then return this value, with the title and year removed to improve
|
# and then return this value, with the title and year removed to improve
|
||||||
# the accuracy of Trakt results.
|
# the accuracy of Trakt results.
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Title:
|
||||||
|
name: str
|
||||||
|
without_year: str
|
||||||
|
year: Optional[int]
|
||||||
|
|
||||||
def getYearFromTitle(title):
|
def __init__(self, title: str):
|
||||||
ex = Expando()
|
try:
|
||||||
|
# Use a regex expression to get the value within the brackets e.g. The Americans (2017)
|
||||||
|
year_search = re.search(r"\(([A-Za-z0-9_]+)\)", title)
|
||||||
|
year_value = year_search.group(1)
|
||||||
|
# Then, get the title without the year value included
|
||||||
|
title_value = title.split("(")[0].strip()
|
||||||
|
# Put this together into an object
|
||||||
|
self.name = title
|
||||||
|
self.without_year = title_value
|
||||||
|
self.year = int(year_value)
|
||||||
|
except Exception:
|
||||||
|
# If the above failed, then the title doesn't include a year
|
||||||
|
# so return the object as is.
|
||||||
|
self.name = title
|
||||||
|
self.without_year = title
|
||||||
|
self.year = None
|
||||||
|
|
||||||
try:
|
|
||||||
# Use a regex expression to get the value within the brackets e.g The Americans (2017)
|
def get_year_from_title(title) -> Title:
|
||||||
yearSearch = re.search(r"\(([A-Za-z0-9_]+)\)", title)
|
return Title(title)
|
||||||
yearValue = yearSearch.group(1)
|
|
||||||
# Then, get the title without the year value included
|
|
||||||
titleValue = title.split("(")[0].strip()
|
|
||||||
# Put this together into an object
|
|
||||||
ex.titleWithoutYear = titleValue
|
|
||||||
ex.yearValue = int(yearValue)
|
|
||||||
return ex
|
|
||||||
except Exception:
|
|
||||||
# If the above failed, then the title doesn't include a year
|
|
||||||
# so return the object as is.
|
|
||||||
ex.titleWithoutYear = title
|
|
||||||
ex.yearValue = -1
|
|
||||||
return ex
|
|
||||||
|
|
||||||
|
|
||||||
# Shows in TV Time are often different to Trakt.TV - in order to improve results and automation,
|
# Shows in TV Time are often different to Trakt.TV - in order to improve results and automation,
|
||||||
|
@ -119,24 +131,24 @@ def getYearFromTitle(title):
|
||||||
# It seems to improve automation, and reduce manual selection....
|
# It seems to improve automation, and reduce manual selection....
|
||||||
|
|
||||||
|
|
||||||
def checkTitleNameMatch(tvTimeTitle, traktTitle):
|
def check_title_name_match(tv_time_title: str, trakt_title: str) -> bool:
|
||||||
# If the name is a complete match, then don't bother comparing them!
|
# If the name is a complete match, then don't bother comparing them!
|
||||||
if tvTimeTitle == traktTitle:
|
if tv_time_title == trakt_title:
|
||||||
return True
|
return True
|
||||||
|
|
||||||
# Split the TvTime title
|
# Split the TvTime title
|
||||||
tvTimeTitleSplit = tvTimeTitle.split()
|
tv_time_title_split = tv_time_title.split()
|
||||||
|
|
||||||
# Create an array of words which are found in the Trakt title
|
# Create an array of words which are found in the Trakt title
|
||||||
wordsMatched = []
|
words_matched = []
|
||||||
|
|
||||||
# Go through each word of the TV Time title, and check if it's in the Trakt title
|
# Go through each word of the TV Time title, and check if it's in the Trakt title
|
||||||
for word in tvTimeTitleSplit:
|
for word in tv_time_title_split:
|
||||||
if word in traktTitle:
|
if word in trakt_title:
|
||||||
wordsMatched.append(word)
|
words_matched.append(word)
|
||||||
|
|
||||||
# Then calculate what percentage of words matched
|
# Then calculate what percentage of words matched
|
||||||
quotient = len(wordsMatched) / len(traktTitle.split())
|
quotient = len(words_matched) / len(trakt_title.split())
|
||||||
percentage = quotient * 100
|
percentage = quotient * 100
|
||||||
|
|
||||||
# If more than 50% of words in the TV Time title exist in the Trakt title,
|
# If more than 50% of words in the TV Time title exist in the Trakt title,
|
||||||
|
@ -147,80 +159,76 @@ def checkTitleNameMatch(tvTimeTitle, traktTitle):
|
||||||
# Using TV Time data (Name of Show, Season No and Episode) - find the corresponding show
|
# Using TV Time data (Name of Show, Season No and Episode) - find the corresponding show
|
||||||
# in Trakt.TV either by automation, or asking the user to confirm.
|
# in Trakt.TV either by automation, or asking the user to confirm.
|
||||||
|
|
||||||
|
TraktTVShow = TypeVar("TraktTVShow")
|
||||||
|
TraktMovie = TypeVar("TraktMovie")
|
||||||
|
|
||||||
def getShowByName(name, seasonNo, episodeNo):
|
SearchResult = Union[TraktTVShow, TraktMovie]
|
||||||
# Parse the TV Show's name for year, if one is present in the string
|
|
||||||
titleObj = getYearFromTitle(name)
|
|
||||||
|
|
||||||
# Create a boolean to indicate if the title contains a year,
|
|
||||||
# this is used later on to improve the accuracy of picking
|
|
||||||
# from search results
|
|
||||||
doesTitleIncludeYear = titleObj.yearValue != -1
|
|
||||||
|
|
||||||
# If the title contains a year, then replace the local variable with the stripped version
|
def get_items_with_same_name(title: Title, items: List[SearchResult]) -> List[SearchResult]:
|
||||||
if doesTitleIncludeYear:
|
shows_with_same_name = []
|
||||||
name = titleObj.titleWithoutYear
|
|
||||||
|
|
||||||
# Request the Trakt API for search results, using the name
|
for item in items:
|
||||||
tvSearch = TVShow.search(name)
|
if check_title_name_match(title.name, item.title):
|
||||||
|
|
||||||
# Create an array of shows which have been matched
|
|
||||||
showsWithSameName = []
|
|
||||||
|
|
||||||
# Go through each result from the search
|
|
||||||
for show in tvSearch:
|
|
||||||
# Check if the title is a match, based on our conditions (e.g over 50% of words match)
|
|
||||||
if checkTitleNameMatch(name, show.title):
|
|
||||||
# If the title included the year of broadcast, then we can be more picky in the results
|
# If the title included the year of broadcast, then we can be more picky in the results
|
||||||
# to look for a show with a broadcast year that matches
|
# to look for an item with a broadcast year that matches
|
||||||
if doesTitleIncludeYear:
|
if title.year:
|
||||||
# If the show title is a 1:1 match, with the same broadcast year, then bingo!
|
# If the item title is a 1:1 match, with the same broadcast year, then bingo!
|
||||||
if (name == show.title) and (show.year == titleObj.yearValue):
|
if (title.name == item.title) and (item.year == title.year):
|
||||||
# Clear previous results, and only use this one
|
# Clear previous results, and only use this one
|
||||||
showsWithSameName = []
|
shows_with_same_name = [item]
|
||||||
showsWithSameName.append(show)
|
|
||||||
break
|
break
|
||||||
|
|
||||||
# Otherwise, only add the show if the broadcast year matches
|
# Otherwise, only add the item if the broadcast year matches
|
||||||
if show.year == titleObj.yearValue:
|
if item.year == title.year:
|
||||||
showsWithSameName.append(show)
|
shows_with_same_name.append(item)
|
||||||
# If the program doesn't have the broadcast year, then add all the results
|
# If the item doesn't have the broadcast year, then add all the results
|
||||||
else:
|
else:
|
||||||
showsWithSameName.append(show)
|
shows_with_same_name.append(item)
|
||||||
|
|
||||||
# Sweep through the results once more for 1:1 title name matches,
|
return shows_with_same_name
|
||||||
# then if the list contains one entry with a 1:1 match, then clear the array
|
|
||||||
# and only use this one!
|
|
||||||
completeMatchNames = []
|
|
||||||
for nameFromSearch in showsWithSameName:
|
|
||||||
if nameFromSearch.title == name:
|
|
||||||
completeMatchNames.append(nameFromSearch)
|
|
||||||
|
|
||||||
if len(completeMatchNames) == 1:
|
|
||||||
showsWithSameName = completeMatchNames
|
|
||||||
|
|
||||||
# If the search contains multiple results, then we need to confirm with the user which show
|
def get_show_by_name(name: str, season_number: str, episode_number: str):
|
||||||
# the script should use, or access the local database to see if the user has already provided
|
# Parse the TV Show's name for year, if one is present in the string
|
||||||
# a manual selection
|
title = get_year_from_title(name)
|
||||||
if len(showsWithSameName) > 1:
|
|
||||||
|
# If the title contains a year, then replace the local variable with the stripped version
|
||||||
|
if title.year:
|
||||||
|
name = title.without_year
|
||||||
|
|
||||||
|
shows_with_same_name = get_items_with_same_name(title, TVShow.search(name))
|
||||||
|
|
||||||
|
complete_match_names = [name_from_search for name_from_search in shows_with_same_name if
|
||||||
|
name_from_search.title == name]
|
||||||
|
if len(complete_match_names) == 1:
|
||||||
|
return complete_match_names[0]
|
||||||
|
elif len(shows_with_same_name) == 1:
|
||||||
|
return shows_with_same_name[0]
|
||||||
|
elif len(shows_with_same_name) < 1:
|
||||||
|
return None
|
||||||
|
else:
|
||||||
|
# If the search contains multiple results, then we need to confirm with the user which show
|
||||||
|
# the script should use, or access the local database to see if the user has already provided
|
||||||
|
# a manual selection
|
||||||
|
|
||||||
# Query the local database for existing selection
|
# Query the local database for existing selection
|
||||||
userMatchedQuery = Query()
|
user_matched_query = Query()
|
||||||
queryResult = userMatchedShowsTable.search(userMatchedQuery.ShowName == name)
|
query_result = userMatchedShowsTable.search(user_matched_query.ShowName == name)
|
||||||
|
|
||||||
# If the local database already contains an entry for a manual selection
|
# If the local database already contains an entry for a manual selection
|
||||||
# then don't bother prompting the user to select it again!
|
# then don't bother prompting the user to select it again!
|
||||||
if len(queryResult) == 1:
|
if len(query_result) == 1:
|
||||||
# Get the first result from the query
|
# Get the first result from the query
|
||||||
firstMatch = queryResult[0]
|
first_match = query_result[0]
|
||||||
# Get the value contains the selection index
|
# Get the value contains the selection index
|
||||||
firstMatchSelectedIndex = int(firstMatch.get("UserSelectedIndex"))
|
first_match_selected_index = int(first_match.get("UserSelectedIndex"))
|
||||||
# Check if the user previously requested to skip the show
|
# Check if the user previously requested to skip the show
|
||||||
skipShow = firstMatch.get("SkipShow")
|
skip_show = first_match.get("SkipShow")
|
||||||
# If the user did not skip, but provided an index selection, get the
|
# If the user did not skip, but provided an index selection, get the
|
||||||
# matching show
|
# matching show
|
||||||
if not skipShow:
|
if not skip_show:
|
||||||
return showsWithSameName[firstMatchSelectedIndex]
|
return shows_with_same_name[first_match_selected_index]
|
||||||
# Otherwise, return None, which will trigger the script to skip
|
# Otherwise, return None, which will trigger the script to skip
|
||||||
# and move onto the next show
|
# and move onto the next show
|
||||||
else:
|
else:
|
||||||
|
@ -229,44 +237,46 @@ def getShowByName(name, seasonNo, episodeNo):
|
||||||
# then prompt the user to make a selection
|
# then prompt the user to make a selection
|
||||||
else:
|
else:
|
||||||
print(
|
print(
|
||||||
f"INFO - MANUAL INPUT REQUIRED: The TV Time data for Show '{name}' (Season {seasonNo}, Episode {episodeNo}) has {len(showsWithSameName)} matching Trakt shows with the same name.\a"
|
f"INFO - MANUAL INPUT REQUIRED: The TV Time data for Show '{name}' (Season {season_number},"
|
||||||
|
f"Episode {episode_number}) has {len(shows_with_same_name)} matching Trakt shows with the same name.\a "
|
||||||
)
|
)
|
||||||
|
|
||||||
# Output each show for manual selection
|
# Output each show for manual selection
|
||||||
for idx, item in enumerate(showsWithSameName):
|
for idx, item in enumerate(shows_with_same_name):
|
||||||
# Display the show's title, broadcast year, amount of seasons and a link to the Trakt page.
|
# Display the show's title, broadcast year, amount of seasons and a link to the Trakt page.
|
||||||
# This will provide the user with enough information to make a selection.
|
# This will provide the user with enough information to make a selection.
|
||||||
print(
|
print(
|
||||||
f" ({idx + 1}) {item.title} - {item.year} - {len(item.seasons)} Season(s) - More Info: https://trakt.tv/{item.ext}"
|
f" ({idx + 1}) {item.title} - {item.year} - {len(item.seasons)} "
|
||||||
|
f"Season(s) - More Info: https://trakt.tv/{item.ext}"
|
||||||
)
|
)
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
# Get the user's selection, either a numerical input, or a string 'SKIP' value
|
# Get the user's selection, either a numerical input, or a string 'SKIP' value
|
||||||
indexSelected = input(
|
index_selected = input(
|
||||||
"Please make a selection from above (or enter SKIP):"
|
"Please make a selection from above (or enter SKIP):"
|
||||||
)
|
)
|
||||||
|
|
||||||
if indexSelected != "SKIP":
|
# Exit the loop
|
||||||
# Since the value isn't 'skip', check that the result is numerical
|
if index_selected == "SKIP":
|
||||||
indexSelected = int(indexSelected) - 1
|
|
||||||
# Exit the selection loop
|
|
||||||
break
|
|
||||||
# Otherwise, exit the loop
|
|
||||||
else:
|
|
||||||
break
|
break
|
||||||
|
|
||||||
|
# Since the value isn't 'skip', check that the result is numerical
|
||||||
|
index_selected = int(index_selected) - 1
|
||||||
|
# Exit the selection loop
|
||||||
|
break
|
||||||
# Still allow the user to provide the exit input, and kill the program
|
# Still allow the user to provide the exit input, and kill the program
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
sys.exit("Cancel requested...")
|
sys.exit("Cancel requested...")
|
||||||
# Otherwise, the user has entered an invalid value, warn the user to try again
|
# Otherwise, the user has entered an invalid value, warn the user to try again
|
||||||
except Exception:
|
except Exception:
|
||||||
logging.error(
|
logging.error(
|
||||||
f"Sorry! Please select a value between 0 to {len(showsWithSameName)}"
|
f"Sorry! Please select a value between 0 to {len(shows_with_same_name)}"
|
||||||
)
|
)
|
||||||
|
|
||||||
# If the user entered 'SKIP', then exit from the loop with no selection, which
|
# If the user entered 'SKIP', then exit from the loop with no selection, which
|
||||||
# will trigger the program to move onto the next episode
|
# will trigger the program to move onto the next episode
|
||||||
if indexSelected == "SKIP":
|
if index_selected == "SKIP":
|
||||||
# Record that the user has skipped the TV Show for import, so that
|
# Record that the user has skipped the TV Show for import, so that
|
||||||
# manual input isn't required everytime
|
# manual input isn't required everytime
|
||||||
userMatchedShowsTable.insert(
|
userMatchedShowsTable.insert(
|
||||||
|
@ -276,103 +286,92 @@ def getShowByName(name, seasonNo, episodeNo):
|
||||||
return None
|
return None
|
||||||
# Otherwise, return the selection which the user made from the list
|
# Otherwise, return the selection which the user made from the list
|
||||||
else:
|
else:
|
||||||
selectedShow = showsWithSameName[int(indexSelected)]
|
selected_show = shows_with_same_name[int(index_selected)]
|
||||||
|
|
||||||
userMatchedShowsTable.insert(
|
userMatchedShowsTable.insert(
|
||||||
{
|
{
|
||||||
"ShowName": name,
|
"ShowName": name,
|
||||||
"UserSelectedIndex": indexSelected,
|
"UserSelectedIndex": index_selected,
|
||||||
"SkipShow": False,
|
"SkipShow": False,
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
return selectedShow
|
return selected_show
|
||||||
|
|
||||||
else:
|
|
||||||
if len(showsWithSameName) > 0:
|
|
||||||
# If the search returned only one result, then awesome!
|
|
||||||
# Return the show, so the import automation can continue.
|
|
||||||
return showsWithSameName[0]
|
|
||||||
else:
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
# Since the Trakt.Py starts the indexing of seasons in the array from 0 (e.g Season 1 in Index 0), then
|
# Since the Trakt.Py starts the indexing of seasons in the array from 0 (e.g. Season 1 in Index 0), then
|
||||||
# subtract the TV Time numerical value by 1 so it starts from 0 as well. However, when a TV series includes
|
# subtract the TV Time numerical value by 1, so it starts from 0 as well. However, when a TV series includes
|
||||||
# a 'special' season, Trakt.Py will place this as the first season in the array - so, don't subtract, since
|
# a 'special' season, Trakt.Py will place this as the first season in the array - so, don't subtract, since
|
||||||
# this will match TV Time's existing value.
|
# this will match TV Time's existing value.
|
||||||
|
|
||||||
|
|
||||||
def parseSeasonNo(seasonNo, traktShowObj):
|
def parse_season_number(season_number, trakt_show_obj):
|
||||||
# Parse the season number into a numerical value
|
# Parse the season number into a numerical value
|
||||||
seasonNo = int(seasonNo)
|
season_number = int(season_number)
|
||||||
|
|
||||||
# Then get the Season Number from the first item in the array
|
# Then get the Season Number from the first item in the array
|
||||||
firstSeasonNo = traktShowObj.seasons[0].number
|
first_season_no = trakt_show_obj.seasons[0].number
|
||||||
|
|
||||||
# If the season number is 0, then the Trakt show contains a "special" season
|
# If the season number is 0, then the Trakt show contains a "special" season
|
||||||
if firstSeasonNo == 0:
|
if first_season_no == 0:
|
||||||
# No need to modify the value, as the TV Time value will match Trakt
|
# No need to modify the value, as the TV Time value will match Trakt
|
||||||
return seasonNo
|
return season_number
|
||||||
# Otherwise, if the Trakt seasons start with no specials, then return the seasonNo,
|
# Otherwise, if the Trakt seasons start with no specials, then return the seasonNo,
|
||||||
# but subtracted by one (e.g Season 1 in TV Time, will be 0)
|
# but subtracted by one (e.g Season 1 in TV Time, will be 0)
|
||||||
else:
|
else:
|
||||||
# Only subtract if the TV Time season number is greater than 0.
|
# Only subtract if the TV Time season number is greater than 0.
|
||||||
if seasonNo != 0:
|
if season_number != 0:
|
||||||
return seasonNo - 1
|
return season_number - 1
|
||||||
# Otherwise, the TV Time season is a special! Then you don't need to change the starting position
|
# Otherwise, the TV Time season is a special! Then you don't need to change the starting position
|
||||||
else:
|
else:
|
||||||
return seasonNo
|
return season_number
|
||||||
|
|
||||||
|
|
||||||
def processWatchedShows():
|
def process_watched_shows() -> None:
|
||||||
# Total amount of rows which have been processed in the CSV file
|
|
||||||
rowsCount = 0
|
|
||||||
# Total amount of rows in the CSV file
|
|
||||||
errorStreak = 0
|
|
||||||
# Open the CSV file within the GDPR exported data
|
# Open the CSV file within the GDPR exported data
|
||||||
with open(getWatchedShowsPath(), newline="") as csvfile:
|
with open(WATCHED_SHOWS_PATH, newline="") as csvfile:
|
||||||
# Create the CSV reader, which will break up the fields using the delimiter ','
|
# Create the CSV reader, which will break up the fields using the delimiter ','
|
||||||
showsReader = csv.DictReader(csvfile, delimiter=",")
|
shows_reader = csv.DictReader(csvfile, delimiter=",")
|
||||||
# Get the total amount of rows in the CSV file,
|
# Get the total amount of rows in the CSV file,
|
||||||
rowsTotal = len(list(showsReader))
|
rows_total = len(list(shows_reader))
|
||||||
# Move position to the beginning of the file
|
# Move position to the beginning of the file
|
||||||
csvfile.seek(0, 0)
|
csvfile.seek(0, 0)
|
||||||
# Loop through each line/record of the CSV file
|
# Loop through each line/record of the CSV file
|
||||||
# Ignore the header row
|
# Ignore the header row
|
||||||
next(showsReader, None)
|
next(shows_reader, None)
|
||||||
for rowsCount, row in enumerate(showsReader):
|
for rowsCount, row in enumerate(shows_reader):
|
||||||
# Get the name of the TV show
|
# Get the name of the TV show
|
||||||
tvShowName = row["tv_show_name"]
|
tv_show_name = row["tv_show_name"]
|
||||||
# Get the TV Time Episode Id
|
# Get the TV Time Episode id
|
||||||
tvShowEpisodeId = row["episode_id"]
|
tv_show_episode_id = row["episode_id"]
|
||||||
# Get the TV Time Season Number
|
# Get the TV Time Season Number
|
||||||
tvShowSeasonNo = row["episode_season_number"]
|
tv_show_season_number = row["episode_season_number"]
|
||||||
# Get the TV Time Episode Number
|
# Get the TV Time Episode Number
|
||||||
tvShowEpisodeNo = row["episode_number"]
|
tv_show_episode_number = row["episode_number"]
|
||||||
# Get the date which the show was marked 'watched' in TV Time
|
# Get the date which the show was marked 'watched' in TV Time
|
||||||
tvShowDateWatched = row["updated_at"]
|
tv_show_date_watched = row["updated_at"]
|
||||||
# Parse the watched date value into a Python type
|
# Parse the watched date value into a Python type
|
||||||
tvShowDateWatchedConverted = datetime.strptime(
|
tv_show_date_watched_converted = datetime.strptime(
|
||||||
tvShowDateWatched, "%Y-%m-%d %H:%M:%S"
|
tv_show_date_watched, "%Y-%m-%d %H:%M:%S"
|
||||||
)
|
)
|
||||||
|
|
||||||
# Query the local database for previous entries indicating that
|
# Query the local database for previous entries indicating that
|
||||||
# the episode has already been imported in the past. Which will
|
# the episode has already been imported in the past. Which will
|
||||||
# ease pressure on TV Time's API server during a retry of the import
|
# ease pressure on TV Time's API server during a retry of the import
|
||||||
# process, and just save time overall without needing to create network requests
|
# process, and just save time overall without needing to create network requests
|
||||||
episodeCompletedQuery = Query()
|
episode_completed_query = Query()
|
||||||
queryResult = syncedEpisodesTable.search(
|
query_result = syncedEpisodesTable.search(
|
||||||
episodeCompletedQuery.episodeId == tvShowEpisodeId
|
episode_completed_query.episodeId == tv_show_episode_id
|
||||||
)
|
)
|
||||||
|
|
||||||
# If the query returned no results, then continue to import it into Trakt
|
# If the query returned no results, then continue to import it into Trakt
|
||||||
if len(queryResult) == 0:
|
if len(query_result) == 0:
|
||||||
# Create a repeating loop, which will break on success, but repeats on failures
|
# Create a repeating loop, which will break on success, but repeats on failures
|
||||||
|
error_streak = 0
|
||||||
while True:
|
while True:
|
||||||
# If more than 10 errors occurred in one streak, whilst trying to import the episode
|
# If more than 10 errors occurred in one streak, whilst trying to import the episode
|
||||||
# then give up, and move onto the next episode, but warn the user.
|
# then give up, and move onto the next episode, but warn the user.
|
||||||
if errorStreak > 10:
|
if error_streak > 10:
|
||||||
logging.warning(
|
logging.warning(
|
||||||
"An error occurred 10 times in a row... skipping episode..."
|
"An error occurred 10 times in a row... skipping episode..."
|
||||||
)
|
)
|
||||||
|
@ -383,46 +382,50 @@ def processWatchedShows():
|
||||||
# Other developers share the service, for free - so be considerate of your usage.
|
# Other developers share the service, for free - so be considerate of your usage.
|
||||||
time.sleep(DELAY_BETWEEN_EPISODES_IN_SECONDS)
|
time.sleep(DELAY_BETWEEN_EPISODES_IN_SECONDS)
|
||||||
# Search Trakt for the TV show matching TV Time's title value
|
# Search Trakt for the TV show matching TV Time's title value
|
||||||
traktShowObj = getShowByName(
|
trakt_show = get_show_by_name(
|
||||||
tvShowName, tvShowSeasonNo, tvShowEpisodeNo
|
tv_show_name, tv_show_season_number, tv_show_episode_number
|
||||||
)
|
)
|
||||||
# If the method returned 'None', then this is an indication to skip the episode, and
|
# If the method returned 'None', then this is an indication to skip the episode, and
|
||||||
# move onto the next one
|
# move onto the next one
|
||||||
if traktShowObj is None:
|
if not trakt_show:
|
||||||
break
|
break
|
||||||
# Show the progress of the import on-screen
|
# Show the progress of the import on-screen
|
||||||
logging.info(
|
logging.info(
|
||||||
f"({rowsCount+1}/{rowsTotal}) - Processing '{tvShowName}' Season {tvShowSeasonNo} / Episode {tvShowEpisodeNo}"
|
f"({rowsCount + 1}/{rows_total}) - Processing '{tv_show_name}' Season {tv_show_season_number} /"
|
||||||
|
f"Episode {tv_show_episode_number}"
|
||||||
)
|
)
|
||||||
# Get the season from the Trakt API
|
# Get the season from the Trakt API
|
||||||
season = traktShowObj.seasons[
|
season = trakt_show.seasons[
|
||||||
parseSeasonNo(tvShowSeasonNo, traktShowObj)
|
parse_season_number(tv_show_season_number, trakt_show)
|
||||||
]
|
]
|
||||||
# Get the episode from the season
|
# Get the episode from the season
|
||||||
episode = season.episodes[int(tvShowEpisodeNo) - 1]
|
episode = season.episodes[int(tv_show_episode_number) - 1]
|
||||||
# Mark the episode as watched!
|
# Mark the episode as watched!
|
||||||
episode.mark_as_seen(tvShowDateWatchedConverted)
|
episode.mark_as_seen(tv_show_date_watched_converted)
|
||||||
# Add the episode to the local database as imported, so it can be skipped,
|
# Add the episode to the local database as imported, so it can be skipped,
|
||||||
# if the process is repeated
|
# if the process is repeated
|
||||||
syncedEpisodesTable.insert({"episodeId": tvShowEpisodeId})
|
syncedEpisodesTable.insert({"episodeId": tv_show_episode_id})
|
||||||
# Clear the error streak on completing the method without errors
|
# Clear the error streak on completing the method without errors
|
||||||
errorStreak = 0
|
error_streak = 0
|
||||||
break
|
break
|
||||||
# Catch errors which occur because of an incorrect array index. This occurs when
|
# Catch errors which occur because of an incorrect array index. This occurs when
|
||||||
# an incorrect Trakt show has been selected, with season/episodes which don't match TV Time.
|
# an incorrect Trakt show has been selected, with season/episodes which don't match TV Time.
|
||||||
# It can also occur due to a bug in Trakt Py, whereby some seasons contain an empty array of episodes.
|
# It can also occur due to a bug in Trakt Py, whereby some seasons contain an empty array of episodes.
|
||||||
except IndexError:
|
except IndexError:
|
||||||
tvShowSlug = traktShowObj.to_json()["shows"][0]["ids"]["ids"][
|
tv_show_slug = trakt_show.to_json()["shows"][0]["ids"]["ids"][
|
||||||
"slug"
|
"slug"
|
||||||
]
|
]
|
||||||
logging.warning(
|
logging.warning(
|
||||||
f"({rowsCount}/{rowsTotal}) - {tvShowName} Season {tvShowSeasonNo}, Episode {tvShowEpisodeNo} does not exist in Trakt! (https://trakt.tv/shows/{tvShowSlug}/seasons/{tvShowSeasonNo}/episodes/{tvShowEpisodeNo})"
|
f"({rowsCount}/{rows_total}) - {tv_show_name} Season {tv_show_season_number}, "
|
||||||
|
f"Episode {tv_show_episode_number} does not exist in Trakt! "
|
||||||
|
f"(https://trakt.tv/shows/{tv_show_slug}/seasons/{tv_show_season_number}/episodes/{tv_show_episode_number})"
|
||||||
)
|
)
|
||||||
break
|
break
|
||||||
# Catch any errors which are raised because a show could not be found in Trakt
|
# Catch any errors which are raised because a show could not be found in Trakt
|
||||||
except trakt.errors.NotFoundException:
|
except trakt.errors.NotFoundException:
|
||||||
logging.warning(
|
logging.warning(
|
||||||
f"({rowsCount}/{rowsTotal}) - {tvShowName} Season {tvShowSeasonNo}, Episode {tvShowEpisodeNo} does not exist (search) in Trakt!"
|
f"({rowsCount}/{rows_total}) - {tv_show_name} Season {tv_show_season_number}, "
|
||||||
|
f"Episode {tv_show_episode_number} does not exist (search) in Trakt!"
|
||||||
)
|
)
|
||||||
break
|
break
|
||||||
# Catch errors because of the program breaching the Trakt API rate limit
|
# Catch errors because of the program breaching the Trakt API rate limit
|
||||||
|
@ -435,12 +438,12 @@ def processWatchedShows():
|
||||||
time.sleep(60)
|
time.sleep(60)
|
||||||
|
|
||||||
# Mark the exception in the error streak
|
# Mark the exception in the error streak
|
||||||
errorStreak += 1
|
error_streak += 1
|
||||||
# Catch a JSON decode error - this can be raised when the API server is down and produces a HTML page, instead of JSON
|
# Catch a JSON decode error - this can be raised when the API server is down and produces a HTML page, instead of JSON
|
||||||
except json.decoder.JSONDecodeError:
|
except json.decoder.JSONDecodeError:
|
||||||
logging.warning(
|
logging.warning(
|
||||||
f"({rowsCount}/{rowsTotal}) - A JSON decode error occuring whilst processing {tvShowName} "
|
f"({rowsCount}/{rows_total}) - A JSON decode error occuring whilst processing {tv_show_name} "
|
||||||
+ f"Season {tvShowSeasonNo}, Episode {tvShowEpisodeNo}! This might occur when the server is down and has produced "
|
+ f"Season {tv_show_season_number}, Episode {tv_show_episode_number}! This might occur when the server is down and has produced "
|
||||||
+ "a HTML document instead of JSON. The script will wait 60 seconds before trying again."
|
+ "a HTML document instead of JSON. The script will wait 60 seconds before trying again."
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -448,57 +451,347 @@ def processWatchedShows():
|
||||||
time.sleep(60)
|
time.sleep(60)
|
||||||
|
|
||||||
# Mark the exception in the error streak
|
# Mark the exception in the error streak
|
||||||
errorStreak += 1
|
error_streak += 1
|
||||||
# Catch a CTRL + C keyboard input, and exits the program
|
# Catch a CTRL + C keyboard input, and exits the program
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
sys.exit("Cancel requested...")
|
sys.exit("Cancel requested...")
|
||||||
# Skip the episode
|
# Skip the episode
|
||||||
else:
|
else:
|
||||||
logging.info(
|
logging.info(
|
||||||
f"({rowsCount}/{rowsTotal}) - Already imported, skipping '{tvShowName}' Season {tvShowSeasonNo} / Episode {tvShowEpisodeNo}."
|
f"({rowsCount}/{rows_total}) - Already imported, skipping '{tv_show_name}' Season {tv_show_season_number} / Episode {tv_show_episode_number}."
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def start():
|
# Using TV Time data (Name of Movie) - find the corresponding movie
|
||||||
# Create the initial authentication with Trakt, before starting the process
|
# in Trakt.TV either by automation, or asking the user to confirm.
|
||||||
if initTraktAuth():
|
|
||||||
# Display a menu selection
|
|
||||||
print(">> What do you want to do?")
|
|
||||||
print(" 1) Import Watch History from TV Time")
|
|
||||||
|
|
||||||
while True:
|
|
||||||
try:
|
def get_movie_by_name(name: str):
|
||||||
menuSelection = input("Enter your menu selection: ")
|
# Parse the Movie's name for year, if one is present in the string
|
||||||
menuSelection = 1 if not menuSelection else int(menuSelection)
|
title = get_year_from_title(name)
|
||||||
break
|
|
||||||
except ValueError:
|
# If the title contains a year, then replace the local variable with the stripped version
|
||||||
logging.warning("Invalid input. Please enter a numerical number.")
|
if title.year:
|
||||||
# Start the process which is required
|
name = title.without_year
|
||||||
if menuSelection == 1:
|
|
||||||
# Invoke the method which will import episodes which have been watched
|
movies_with_same_name = get_items_with_same_name(title, Movie.search(name))
|
||||||
# from TV Time into Trakt
|
|
||||||
processWatchedShows()
|
complete_match_names = [name_from_search for name_from_search in movies_with_same_name if
|
||||||
else:
|
name_from_search.title == name]
|
||||||
logging.warning("Sorry - that's an unknown menu selection")
|
if len(complete_match_names) == 1:
|
||||||
|
return complete_match_names[0]
|
||||||
|
elif len(movies_with_same_name) == 1:
|
||||||
|
return movies_with_same_name[0]
|
||||||
|
elif len(movies_with_same_name) < 1:
|
||||||
|
return None
|
||||||
else:
|
else:
|
||||||
|
# If the search contains multiple results, then we need to confirm with the user which movie
|
||||||
|
# the script should use, or access the local database to see if the user has already provided
|
||||||
|
# a manual selection
|
||||||
|
|
||||||
|
# Query the local database for existing selection
|
||||||
|
user_matched_query = Query()
|
||||||
|
query_result = userMatchedMoviesTable.search(user_matched_query.MovieName == name)
|
||||||
|
|
||||||
|
# If the local database already contains an entry for a manual selection
|
||||||
|
# then don't bother prompting the user to select it again!
|
||||||
|
if len(query_result) == 1:
|
||||||
|
# Get the first result from the query
|
||||||
|
first_match = query_result[0]
|
||||||
|
# Get the value contains the selection index
|
||||||
|
first_match_selected_index = int(first_match.get("UserSelectedIndex"))
|
||||||
|
# Check if the user previously requested to skip the movie
|
||||||
|
skip_movie = first_match.get("SkipMovie")
|
||||||
|
# If the user did not skip, but provided an index selection, get the
|
||||||
|
# matching movie
|
||||||
|
if not skip_movie:
|
||||||
|
return movies_with_same_name[first_match_selected_index]
|
||||||
|
# Otherwise, return None, which will trigger the script to skip
|
||||||
|
# and move onto the next movie
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
# If the user has not provided a manual selection already in the process
|
||||||
|
# then prompt the user to make a selection
|
||||||
|
else:
|
||||||
|
print(
|
||||||
|
f"INFO - MANUAL INPUT REQUIRED: The TV Time data for Movie '{name}' has {len(movies_with_same_name)} "
|
||||||
|
f"matching Trakt movies with the same name.\a"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Output each movie for manual selection
|
||||||
|
for idx, item in enumerate(movies_with_same_name):
|
||||||
|
# Display the movie's title, broadcast year, amount of seasons and a link to the Trakt page.
|
||||||
|
# This will provide the user with enough information to make a selection.
|
||||||
|
print(
|
||||||
|
f" ({idx + 1}) {item.title} - {item.year} - More Info: https://trakt.tv/{item.ext}"
|
||||||
|
)
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
# Get the user's selection, either a numerical input, or a string 'SKIP' value
|
||||||
|
index_selected = input(
|
||||||
|
"Please make a selection from above (or enter SKIP):"
|
||||||
|
)
|
||||||
|
|
||||||
|
if index_selected != "SKIP":
|
||||||
|
# Since the value isn't 'skip', check that the result is numerical
|
||||||
|
index_selected = int(index_selected) - 1
|
||||||
|
# Exit the selection loop
|
||||||
|
break
|
||||||
|
# Otherwise, exit the loop
|
||||||
|
else:
|
||||||
|
break
|
||||||
|
# Still allow the user to provide the exit input, and kill the program
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
sys.exit("Cancel requested...")
|
||||||
|
# Otherwise, the user has entered an invalid value, warn the user to try again
|
||||||
|
except Exception:
|
||||||
|
logging.error(
|
||||||
|
f"Sorry! Please select a value between 0 to {len(movies_with_same_name)}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# If the user entered 'SKIP', then exit from the loop with no selection, which
|
||||||
|
# will trigger the program to move onto the next episode
|
||||||
|
if index_selected == "SKIP":
|
||||||
|
# Record that the user has skipped the Movie for import, so that
|
||||||
|
# manual input isn't required everytime
|
||||||
|
userMatchedMoviesTable.insert(
|
||||||
|
{"MovieName": name, "UserSelectedIndex": 0, "SkipMovie": True}
|
||||||
|
)
|
||||||
|
|
||||||
|
return None
|
||||||
|
# Otherwise, return the selection which the user made from the list
|
||||||
|
else:
|
||||||
|
selected_movie = movies_with_same_name[int(index_selected)]
|
||||||
|
|
||||||
|
userMatchedMoviesTable.insert(
|
||||||
|
{
|
||||||
|
"MovieName": name,
|
||||||
|
"UserSelectedIndex": index_selected,
|
||||||
|
"SkipMovie": False,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
return selected_movie
|
||||||
|
|
||||||
|
|
||||||
|
def process_movies():
|
||||||
|
# Total amount of rows which have been processed in the CSV file
|
||||||
|
# Total amount of rows in the CSV file
|
||||||
|
error_streak = 0
|
||||||
|
# Open the CSV file within the GDPR exported data
|
||||||
|
with open(MOVIES_PATH, newline="") as csvfile:
|
||||||
|
# Create the CSV reader, which will break up the fields using the delimiter ','
|
||||||
|
movie_reader_temp = csv.DictReader(csvfile, delimiter=",")
|
||||||
|
movie_reader = filter(lambda p: "" != p["movie_name"], movie_reader_temp)
|
||||||
|
# First, list all movies with watched type so that watchlist entry for them is not created
|
||||||
|
watched_list = []
|
||||||
|
for row in movie_reader:
|
||||||
|
if row["type"] == "watch":
|
||||||
|
watched_list.append(row["movie_name"])
|
||||||
|
# Move position to the beginning of the file
|
||||||
|
csvfile.seek(0, 0)
|
||||||
|
# Get the total amount of rows in the CSV file,
|
||||||
|
rows_total = len(list(movie_reader))
|
||||||
|
# Move position to the beginning of the file
|
||||||
|
csvfile.seek(0, 0)
|
||||||
|
# Loop through each line/record of the CSV file
|
||||||
|
# Ignore the header row
|
||||||
|
next(movie_reader, None)
|
||||||
|
for rows_count, row in enumerate(movie_reader):
|
||||||
|
# Get the name of the Movie
|
||||||
|
movie_name = row["movie_name"]
|
||||||
|
# Get the date which the movie was marked 'watched' in TV Time
|
||||||
|
activity_type = row["type"]
|
||||||
|
movie_date_watched = row["updated_at"]
|
||||||
|
# Parse the watched date value into a Python type
|
||||||
|
movie_date_watched_converted = datetime.strptime(
|
||||||
|
movie_date_watched, "%Y-%m-%d %H:%M:%S"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Query the local database for previous entries indicating that
|
||||||
|
# the episode has already been imported in the past. Which will
|
||||||
|
# ease pressure on TV Time's API server during a retry of the import
|
||||||
|
# process, and just save time overall without needing to create network requests
|
||||||
|
movie_query = Query()
|
||||||
|
query_result = syncedMoviesTable.search(
|
||||||
|
(movie_query.movie_name == movie_name) & (movie_query.type == "watched")
|
||||||
|
)
|
||||||
|
|
||||||
|
watchlist_query = Query()
|
||||||
|
query_result_watchlist = syncedMoviesTable.search(
|
||||||
|
(watchlist_query.movie_name == movie_name)
|
||||||
|
& (watchlist_query.type == "watchlist")
|
||||||
|
)
|
||||||
|
|
||||||
|
# If the query returned no results, then continue to import it into Trakt
|
||||||
|
if len(query_result) == 0:
|
||||||
|
# Create a repeating loop, which will break on success, but repeats on failures
|
||||||
|
while True:
|
||||||
|
# If movie is watched but this is an entry for watchlist, then skip
|
||||||
|
if movie_name in watched_list and activity_type != "watch":
|
||||||
|
logging.info(
|
||||||
|
f"Skipping '{movie_name}' to avoid redundant watchlist entry."
|
||||||
|
)
|
||||||
|
break
|
||||||
|
# If more than 10 errors occurred in one streak, whilst trying to import the episode
|
||||||
|
# then give up, and move onto the next episode, but warn the user.
|
||||||
|
if error_streak > 10:
|
||||||
|
logging.warning(
|
||||||
|
"An error occurred 10 times in a row... skipping episode..."
|
||||||
|
)
|
||||||
|
break
|
||||||
|
try:
|
||||||
|
# Sleep for a second between each process, before going onto the next watched episode.
|
||||||
|
# This is required to remain within the API rate limit, and use the API server fairly.
|
||||||
|
# Other developers share the service, for free - so be considerate of your usage.
|
||||||
|
time.sleep(DELAY_BETWEEN_EPISODES_IN_SECONDS)
|
||||||
|
# Search Trakt for the Movie matching TV Time's title value
|
||||||
|
trakt_movie_obj = get_movie_by_name(movie_name)
|
||||||
|
# If the method returned 'None', then this is an indication to skip the episode, and
|
||||||
|
# move onto the next one
|
||||||
|
if trakt_movie_obj is None:
|
||||||
|
break
|
||||||
|
# Show the progress of the import on-screen
|
||||||
|
logging.info(
|
||||||
|
f"({rows_count + 1}/{rows_total}) - Processing '{movie_name}'"
|
||||||
|
)
|
||||||
|
if activity_type == "watch":
|
||||||
|
trakt_movie_obj.mark_as_seen(movie_date_watched_converted)
|
||||||
|
# Add the episode to the local database as imported, so it can be skipped,
|
||||||
|
# if the process is repeated
|
||||||
|
syncedMoviesTable.insert(
|
||||||
|
{"movie_name": movie_name, "type": "watched"}
|
||||||
|
)
|
||||||
|
logging.info(f"Marked as seen")
|
||||||
|
elif len(query_result_watchlist) == 0:
|
||||||
|
trakt_movie_obj.add_to_watchlist()
|
||||||
|
# Add the episode to the local database as imported, so it can be skipped,
|
||||||
|
# if the process is repeated
|
||||||
|
syncedMoviesTable.insert(
|
||||||
|
{"movie_name": movie_name, "type": "watchlist"}
|
||||||
|
)
|
||||||
|
logging.info(f"Added to watchlist")
|
||||||
|
else:
|
||||||
|
logging.warning(f"Already in watchlist")
|
||||||
|
# Clear the error streak on completing the method without errors
|
||||||
|
error_streak = 0
|
||||||
|
break
|
||||||
|
# Catch errors which occur because of an incorrect array index. This occurs when
|
||||||
|
# an incorrect Trakt movie has been selected, with season/episodes which don't match TV Time.
|
||||||
|
# It can also occur due to a bug in Trakt Py, whereby some seasons contain an empty array of episodes.
|
||||||
|
except IndexError:
|
||||||
|
movie_slug = trakt_movie_obj.to_json()["movies"][0]["ids"]["ids"][
|
||||||
|
"slug"
|
||||||
|
]
|
||||||
|
logging.warning(
|
||||||
|
f"({rows_count}/{rows_total}) - {movie_name} "
|
||||||
|
f"does not exist in Trakt! (https://trakt.tv/movies/{movie_slug}/)"
|
||||||
|
)
|
||||||
|
break
|
||||||
|
# Catch any errors which are raised because a movie could not be found in Trakt
|
||||||
|
except trakt.errors.NotFoundException:
|
||||||
|
logging.warning(
|
||||||
|
f"({rows_count}/{rows_total}) - {movie_name} does not exist (search) in Trakt!"
|
||||||
|
)
|
||||||
|
break
|
||||||
|
# Catch errors because of the program breaching the Trakt API rate limit
|
||||||
|
except trakt.errors.RateLimitException:
|
||||||
|
logging.warning(
|
||||||
|
"The program is running too quickly and has hit Trakt's API rate limit! Please increase the delay between "
|
||||||
|
+ "movies via the variable 'DELAY_BETWEEN_EPISODES_IN_SECONDS'. The program will now wait 60 seconds before "
|
||||||
|
+ "trying again."
|
||||||
|
)
|
||||||
|
time.sleep(60)
|
||||||
|
|
||||||
|
# Mark the exception in the error streak
|
||||||
|
error_streak += 1
|
||||||
|
# Catch a JSON decode error - this can be raised when the API server is down and produces a HTML page, instead of JSON
|
||||||
|
except json.decoder.JSONDecodeError:
|
||||||
|
logging.warning(
|
||||||
|
f"({rows_count}/{rows_total}) - A JSON decode error occuring whilst processing {movie_name} "
|
||||||
|
+ f" This might occur when the server is down and has produced "
|
||||||
|
+ "a HTML document instead of JSON. The script will wait 60 seconds before trying again."
|
||||||
|
)
|
||||||
|
|
||||||
|
# Wait 60 seconds
|
||||||
|
time.sleep(60)
|
||||||
|
|
||||||
|
# Mark the exception in the error streak
|
||||||
|
error_streak += 1
|
||||||
|
# Catch a CTRL + C keyboard input, and exits the program
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
sys.exit("Cancel requested...")
|
||||||
|
|
||||||
|
# Skip the episode
|
||||||
|
else:
|
||||||
|
logging.info(
|
||||||
|
f"({rows_count}/{rows_total}) - Already imported, skipping '{movie_name}'."
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def menu_selection() -> int:
|
||||||
|
# Display a menu selection
|
||||||
|
print(">> What do you want to do?")
|
||||||
|
print(" 1) Import Watch History for TV Shows from TV Time")
|
||||||
|
print(" 2) Import Watch Movies from TV Time")
|
||||||
|
print(" 3) Do both 1 and 2 (default)")
|
||||||
|
print(" 4) Exit")
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
selection = input("Enter your menu selection: ")
|
||||||
|
selection = 3 if not selection else int(selection)
|
||||||
|
break
|
||||||
|
except ValueError:
|
||||||
|
logging.warning("Invalid input. Please enter a numerical number.")
|
||||||
|
# Check if the input is valid
|
||||||
|
if not 1 <= selection <= 4:
|
||||||
|
logging.warning("Sorry - that's an unknown menu selection")
|
||||||
|
exit()
|
||||||
|
# Exit if the 4th option was chosen
|
||||||
|
if selection == 4:
|
||||||
|
logging.info("Exiting as per user's selection.")
|
||||||
|
exit()
|
||||||
|
|
||||||
|
return selection
|
||||||
|
|
||||||
|
|
||||||
|
def start():
|
||||||
|
selection = menu_selection()
|
||||||
|
|
||||||
|
# Create the initial authentication with Trakt, before starting the process
|
||||||
|
if not init_trakt_auth():
|
||||||
logging.error(
|
logging.error(
|
||||||
"ERROR: Unable to complete authentication to Trakt - please try again."
|
"ERROR: Unable to complete authentication to Trakt - please try again."
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Start the process which is required
|
||||||
|
if selection == 1:
|
||||||
|
# Invoke the method which will import episodes which have been watched
|
||||||
|
# from TV Time into Trakt
|
||||||
|
logging.info("Processing watched shows.")
|
||||||
|
process_watched_shows()
|
||||||
|
# TODO: Add support for followed shows
|
||||||
|
elif selection == 2:
|
||||||
|
# Invoke the method which will import movies which have been watched
|
||||||
|
# from TV Time into Trakt
|
||||||
|
logging.info("Processing movies.")
|
||||||
|
process_movies()
|
||||||
|
elif selection == 3:
|
||||||
|
# Invoke both the episodes and movies import methods
|
||||||
|
logging.info("Processing both watched shows and movies.")
|
||||||
|
process_watched_shows()
|
||||||
|
process_movies()
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
# Check that the user has created the config file
|
# Check that the user has provided the GDPR path
|
||||||
if os.path.exists("config.json"):
|
if os.path.isdir(config.gdpr_workspace_path):
|
||||||
# Check that the user has provided the GDPR path
|
start()
|
||||||
if os.path.isdir(config.GDPR_WORKSPACE_PATH):
|
|
||||||
start()
|
|
||||||
else:
|
|
||||||
logging.error(
|
|
||||||
"Oops! The TV Time GDPR folder '"
|
|
||||||
+ config.GDPR_WORKSPACE_PATH
|
|
||||||
+ "' does not exist on the local system. Please check it, and try again."
|
|
||||||
)
|
|
||||||
else:
|
else:
|
||||||
logging.error(
|
logging.error(
|
||||||
"The 'config.json' file cannot be found - have you created it yet?"
|
"Oops! The TV Time GDPR folder '"
|
||||||
|
+ config.gdpr_workspace_path
|
||||||
|
+ "' does not exist on the local system. Please check it, and try again."
|
||||||
)
|
)
|
||||||
|
|
Loading…
Reference in a new issue