mirror of
https://github.com/SinTan1729/TvTimeToTrakt.git
synced 2025-02-05 14:12:32 -06:00
Formatting
This commit is contained in:
parent
9436798e47
commit
8c58b78b8e
1 changed files with 79 additions and 46 deletions
125
TimeToTrackt.py
125
TimeToTrackt.py
|
@ -17,9 +17,9 @@ from trakt.tv import TVShow
|
|||
DELAY_BETWEEN_EPISODES_IN_SECONDS = 5
|
||||
|
||||
# Create a database to keep track of completed processes
|
||||
database = TinyDB('localStorage.json')
|
||||
syncedEpisodesTable = database.table('SyncedEpisodes')
|
||||
userMatchedShowsTable = database.table('TvTimeTraktUserMatched')
|
||||
database = TinyDB("localStorage.json")
|
||||
syncedEpisodesTable = database.table("SyncedEpisodes")
|
||||
userMatchedShowsTable = database.table("TvTimeTraktUserMatched")
|
||||
|
||||
|
||||
class Expando(object):
|
||||
|
@ -29,7 +29,7 @@ class Expando(object):
|
|||
def getConfiguration():
|
||||
configEx = Expando()
|
||||
|
||||
with open('config.json') as f:
|
||||
with open("config.json") as f:
|
||||
data = json.load(f)
|
||||
|
||||
configEx.TRAKT_USERNAME = data["TRAKT_USERNAME"]
|
||||
|
@ -59,7 +59,13 @@ def initTraktAuth():
|
|||
return True
|
||||
# Set the method of authentication
|
||||
trakt.core.AUTH_METHOD = trakt.core.OAUTH_AUTH
|
||||
return init(config.TRAKT_USERNAME, store=True, client_id=config.CLIENT_ID, client_secret=config.CLIENT_SECRET)
|
||||
return init(
|
||||
config.TRAKT_USERNAME,
|
||||
store=True,
|
||||
client_id=config.CLIENT_ID,
|
||||
client_secret=config.CLIENT_SECRET,
|
||||
)
|
||||
|
||||
|
||||
# With a given title, check if it contains a year (e.g Doctor Who (2005))
|
||||
# and then return this value, with the title and year removed to improve
|
||||
|
@ -74,7 +80,7 @@ def getYearFromTitle(title):
|
|||
yearSearch = re.search(r"\(([A-Za-z0-9_]+)\)", title)
|
||||
yearValue = yearSearch.group(1)
|
||||
# Then, get the title without the year value included
|
||||
titleValue = title.split('(')[0].strip()
|
||||
titleValue = title.split("(")[0].strip()
|
||||
# Put this together into an object
|
||||
ex.titleWithoutYear = titleValue
|
||||
ex.yearValue = int(yearValue)
|
||||
|
@ -86,6 +92,7 @@ def getYearFromTitle(title):
|
|||
ex.yearValue = -1
|
||||
return ex
|
||||
|
||||
|
||||
# Shows in TV Time are often different to Trakt.TV - in order to improve results and automation,
|
||||
# calculate how many words are in the title, and return true if more than 50% of the title is a match,
|
||||
# It seems to improve automation, and reduce manual selection....
|
||||
|
@ -115,6 +122,7 @@ def checkTitleNameMatch(tvTimeTitle, traktTitle):
|
|||
# then return the title as a possibility to use
|
||||
return percentage > 50
|
||||
|
||||
|
||||
# Using TV Time data (Name of Show, Season No and Episode) - find the corresponding show
|
||||
# in Trakt.TV either by automation, or asking the user to confirm.
|
||||
|
||||
|
@ -167,7 +175,7 @@ def getShowByName(name, seasonNo, episodeNo):
|
|||
if nameFromSearch.title == name:
|
||||
completeMatchNames.append(nameFromSearch)
|
||||
|
||||
if (len(completeMatchNames) == 1):
|
||||
if len(completeMatchNames) == 1:
|
||||
showsWithSameName = completeMatchNames
|
||||
|
||||
# If the search contains multiple results, then we need to confirm with the user which show
|
||||
|
@ -177,8 +185,7 @@ def getShowByName(name, seasonNo, episodeNo):
|
|||
|
||||
# Query the local database for existing selection
|
||||
userMatchedQuery = Query()
|
||||
queryResult = userMatchedShowsTable.search(
|
||||
userMatchedQuery.ShowName == name)
|
||||
queryResult = userMatchedShowsTable.search(userMatchedQuery.ShowName == name)
|
||||
|
||||
# If the local database already contains an entry for a manual selection
|
||||
# then don't bother prompting the user to select it again!
|
||||
|
@ -186,9 +193,9 @@ def getShowByName(name, seasonNo, episodeNo):
|
|||
# Get the first result from the query
|
||||
firstMatch = queryResult[0]
|
||||
# Get the value contains the selection index
|
||||
firstMatchSelectedIndex = int(firstMatch.get('UserSelectedIndex'))
|
||||
firstMatchSelectedIndex = int(firstMatch.get("UserSelectedIndex"))
|
||||
# Check if the user previously requested to skip the show
|
||||
skipShow = firstMatch.get('SkipShow')
|
||||
skipShow = firstMatch.get("SkipShow")
|
||||
# If the user did not skip, but provided an index selection, get the
|
||||
# matching show
|
||||
if skipShow == False:
|
||||
|
@ -201,22 +208,25 @@ def getShowByName(name, seasonNo, episodeNo):
|
|||
# then prompt the user to make a selection
|
||||
else:
|
||||
print(
|
||||
f"INFO - MANUAL INPUT REQUIRED: The TV Time data for Show '{name}' (Season {seasonNo}, Episode {episodeNo}) has {len(showsWithSameName)} matching Trakt shows with the same name.")
|
||||
f"INFO - MANUAL INPUT REQUIRED: The TV Time data for Show '{name}' (Season {seasonNo}, Episode {episodeNo}) has {len(showsWithSameName)} matching Trakt shows with the same name."
|
||||
)
|
||||
|
||||
# Output each show for manual selection
|
||||
for idx, item in enumerate(showsWithSameName):
|
||||
# Display the show's title, broadcast year, amount of seasons and a link to the Trakt page.
|
||||
# This will provide the user with enough information to make a selection.
|
||||
print(
|
||||
f" ({idx + 1}) {item.title} - {item.year} - {len(item.seasons)} Season(s) - More Info: https://trakt.tv/{item.ext}")
|
||||
f" ({idx + 1}) {item.title} - {item.year} - {len(item.seasons)} Season(s) - More Info: https://trakt.tv/{item.ext}"
|
||||
)
|
||||
|
||||
while(True):
|
||||
while True:
|
||||
try:
|
||||
# Get the user's selection, either a numerical input, or a string 'SKIP' value
|
||||
indexSelected = (input(
|
||||
f"Please make a selection from above (or enter SKIP):"))
|
||||
indexSelected = input(
|
||||
f"Please make a selection from above (or enter SKIP):"
|
||||
)
|
||||
|
||||
if indexSelected != 'SKIP':
|
||||
if indexSelected != "SKIP":
|
||||
# Since the value isn't 'skip', check that the result is numerical
|
||||
indexSelected = int(indexSelected) - 1
|
||||
# Exit the selection loop
|
||||
|
@ -230,15 +240,17 @@ def getShowByName(name, seasonNo, episodeNo):
|
|||
# Otherwise, the user has entered an invalid value, warn the user to try again
|
||||
except:
|
||||
print(
|
||||
f"Sorry! Please select a value between 0 to {len(showsWithSameName)}")
|
||||
f"Sorry! Please select a value between 0 to {len(showsWithSameName)}"
|
||||
)
|
||||
|
||||
# If the user entered 'SKIP', then exit from the loop with no selection, which
|
||||
# will trigger the program to move onto the next episode
|
||||
if (indexSelected == 'SKIP'):
|
||||
if indexSelected == "SKIP":
|
||||
# Record that the user has skipped the TV Show for import, so that
|
||||
# manual input isn't required everytime
|
||||
userMatchedShowsTable.insert(
|
||||
{'ShowName': name, 'UserSelectedIndex': 0, 'SkipShow': True})
|
||||
{"ShowName": name, "UserSelectedIndex": 0, "SkipShow": True}
|
||||
)
|
||||
|
||||
return None
|
||||
# Otherwise, return the selection which the user made from the list
|
||||
|
@ -246,18 +258,24 @@ def getShowByName(name, seasonNo, episodeNo):
|
|||
selectedShow = showsWithSameName[int(indexSelected)]
|
||||
|
||||
userMatchedShowsTable.insert(
|
||||
{'ShowName': name, 'UserSelectedIndex': indexSelected, 'SkipShow': False})
|
||||
{
|
||||
"ShowName": name,
|
||||
"UserSelectedIndex": indexSelected,
|
||||
"SkipShow": False,
|
||||
}
|
||||
)
|
||||
|
||||
return selectedShow
|
||||
|
||||
else:
|
||||
if (len(showsWithSameName) > 0):
|
||||
if len(showsWithSameName) > 0:
|
||||
# If the search returned only one result, then awesome!
|
||||
# Return the show, so the import automation can continue.
|
||||
return showsWithSameName[0]
|
||||
else:
|
||||
return None
|
||||
|
||||
|
||||
# Since the Trakt.Py starts the indexing of seasons in the array from 0 (e.g Season 1 in Index 0), then
|
||||
# subtract the TV Time numerical value by 1 so it starts from 0 as well. However, when a TV series includes
|
||||
# a 'special' season, Trakt.Py will place this as the first season in the array - so, don't subtract, since
|
||||
|
@ -292,9 +310,9 @@ def processWatchedShows():
|
|||
# Total amount of rows in the CSV file
|
||||
errorStreak = 0
|
||||
# Open the CSV file within the GDPR exported data
|
||||
with open(getWatchedShowsPath(), newline='') as csvfile:
|
||||
with open(getWatchedShowsPath(), newline="") as csvfile:
|
||||
# Create the CSV reader, which will break up the fields using the delimiter ','
|
||||
showsReader = csv.DictReader(csvfile, delimiter=',')
|
||||
showsReader = csv.DictReader(csvfile, delimiter=",")
|
||||
# Get the total amount of rows in the CSV file,
|
||||
rowsTotal = len(list(showsReader))
|
||||
# Move position to the beginning of the file
|
||||
|
@ -316,7 +334,8 @@ def processWatchedShows():
|
|||
# Parse the watched date value into a Python type
|
||||
print(tvShowDateWatched)
|
||||
tvShowDateWatchedConverted = datetime.strptime(
|
||||
tvShowDateWatched, '%Y-%m-%d %H:%M:%S')
|
||||
tvShowDateWatched, "%Y-%m-%d %H:%M:%S"
|
||||
)
|
||||
|
||||
# Query the local database for previous entries indicating that
|
||||
# the episode has already been imported in the past. Which will
|
||||
|
@ -324,7 +343,8 @@ def processWatchedShows():
|
|||
# process, and just save time overall without needing to create network requests
|
||||
episodeCompletedQuery = Query()
|
||||
queryResult = syncedEpisodesTable.search(
|
||||
episodeCompletedQuery.episodeId == tvShowEpisodeId)
|
||||
episodeCompletedQuery.episodeId == tvShowEpisodeId
|
||||
)
|
||||
|
||||
# If the query returned no results, then continue to import it into Trakt
|
||||
if len(queryResult) == 0:
|
||||
|
@ -332,9 +352,10 @@ def processWatchedShows():
|
|||
while True:
|
||||
# If more than 10 errors occurred in one streak, whilst trying to import the episode
|
||||
# then give up, and move onto the next episode, but warn the user.
|
||||
if (errorStreak > 10):
|
||||
if errorStreak > 10:
|
||||
print(
|
||||
f"WARNING: An error occurred 10 times in a row... skipping episode...")
|
||||
f"WARNING: An error occurred 10 times in a row... skipping episode..."
|
||||
)
|
||||
break
|
||||
try:
|
||||
# Sleep for a second between each process, before going onto the next watched episode.
|
||||
|
@ -343,25 +364,27 @@ def processWatchedShows():
|
|||
time.sleep(DELAY_BETWEEN_EPISODES_IN_SECONDS)
|
||||
# Search Trakt for the TV show matching TV Time's title value
|
||||
traktShowObj = getShowByName(
|
||||
tvShowName, tvShowSeasonNo, tvShowEpisodeNo)
|
||||
tvShowName, tvShowSeasonNo, tvShowEpisodeNo
|
||||
)
|
||||
# If the method returned 'None', then this is an indication to skip the episode, and
|
||||
# move onto the next one
|
||||
if traktShowObj == None:
|
||||
break
|
||||
# Show the progress of the import on-screen
|
||||
print(
|
||||
f"({rowsCount}/{rowsTotal}) Processing Show {tvShowName} on Season {tvShowSeasonNo} - Episode {tvShowEpisodeNo}")
|
||||
f"({rowsCount+1}/{rowsTotal}) Processing Show {tvShowName} on Season {tvShowSeasonNo} - Episode {tvShowEpisodeNo}"
|
||||
)
|
||||
# Get the season from the Trakt API
|
||||
season = traktShowObj.seasons[parseSeasonNo(
|
||||
tvShowSeasonNo, traktShowObj)]
|
||||
season = traktShowObj.seasons[
|
||||
parseSeasonNo(tvShowSeasonNo, traktShowObj)
|
||||
]
|
||||
# Get the episode from the season
|
||||
episode = season.episodes[int(tvShowEpisodeNo) - 1]
|
||||
# Mark the episode as watched!
|
||||
episode.mark_as_seen(tvShowDateWatchedConverted)
|
||||
# Add the episode to the local database as imported, so it can be skipped,
|
||||
# if the process is repeated
|
||||
syncedEpisodesTable.insert(
|
||||
{'episodeId': tvShowEpisodeId})
|
||||
syncedEpisodesTable.insert({"episodeId": tvShowEpisodeId})
|
||||
# Clear the error streak on completing the method without errors
|
||||
errorStreak = 0
|
||||
break
|
||||
|
@ -370,19 +393,22 @@ def processWatchedShows():
|
|||
# It can also occur due to a bug in Trakt Py, whereby some seasons contain an empty array of episodes.
|
||||
except IndexError:
|
||||
print(
|
||||
f"({rowsCount}/{rowsTotal}) WARNING: {tvShowName} Season {tvShowSeasonNo}, Episode {tvShowEpisodeNo} does not exist (season/episode index) in Trakt!")
|
||||
f"({rowsCount}/{rowsTotal}) WARNING: {tvShowName} Season {tvShowSeasonNo}, Episode {tvShowEpisodeNo} does not exist (season/episode index) in Trakt!"
|
||||
)
|
||||
break
|
||||
# Catch any errors which are raised because a show could not be found in Trakt
|
||||
except trakt.errors.NotFoundException:
|
||||
print(
|
||||
f"({rowsCount}/{rowsTotal}) WARNING: {tvShowName} Season {tvShowSeasonNo}, Episode {tvShowEpisodeNo} does not exist (search) in Trakt!")
|
||||
f"({rowsCount}/{rowsTotal}) WARNING: {tvShowName} Season {tvShowSeasonNo}, Episode {tvShowEpisodeNo} does not exist (search) in Trakt!"
|
||||
)
|
||||
break
|
||||
# Catch errors because of the program breaching the Trakt API rate limit
|
||||
except trakt.errors.RateLimitException:
|
||||
print(
|
||||
"WARNING: The program is running too quickly and has hit Trakt's API rate limit! Please increase the delay between " +
|
||||
"episdoes via the variable 'DELAY_BETWEEN_EPISODES_IN_SECONDS'. The program will now wait 60 seconds before " +
|
||||
"trying again.")
|
||||
"WARNING: The program is running too quickly and has hit Trakt's API rate limit! Please increase the delay between "
|
||||
+ "episdoes via the variable 'DELAY_BETWEEN_EPISODES_IN_SECONDS'. The program will now wait 60 seconds before "
|
||||
+ "trying again."
|
||||
)
|
||||
time.sleep(60)
|
||||
|
||||
# Mark the exception in the error streak
|
||||
|
@ -390,9 +416,10 @@ def processWatchedShows():
|
|||
# Catch a JSON decode error - this can be raised when the API server is down and produces a HTML page, instead of JSON
|
||||
except json.decoder.JSONDecodeError:
|
||||
print(
|
||||
f"({rowsCount}/{rowsTotal}) WARNING: A JSON decode error occuring whilst processing {tvShowName} " +
|
||||
f"Season {tvShowSeasonNo}, Episode {tvShowEpisodeNo}! This might occur when the server is down and has produced " +
|
||||
"a HTML document instead of JSON. The script will wait 60 seconds before trying again.")
|
||||
f"({rowsCount}/{rowsTotal}) WARNING: A JSON decode error occuring whilst processing {tvShowName} "
|
||||
+ f"Season {tvShowSeasonNo}, Episode {tvShowEpisodeNo}! This might occur when the server is down and has produced "
|
||||
+ "a HTML document instead of JSON. The script will wait 60 seconds before trying again."
|
||||
)
|
||||
|
||||
# Wait 60 seconds
|
||||
time.sleep(60)
|
||||
|
@ -405,7 +432,8 @@ def processWatchedShows():
|
|||
# Skip the episode
|
||||
else:
|
||||
print(
|
||||
f"({rowsCount}/{rowsTotal}) Skipping '{tvShowName}' Season {tvShowSeasonNo} Episode {tvShowEpisodeNo}. It's already been imported.")
|
||||
f"({rowsCount}/{rowsTotal}) Skipping '{tvShowName}' Season {tvShowSeasonNo} Episode {tvShowEpisodeNo}. It's already been imported."
|
||||
)
|
||||
|
||||
|
||||
def start():
|
||||
|
@ -440,7 +468,12 @@ if __name__ == "__main__":
|
|||
if os.path.isdir(config.GDPR_WORKSPACE_PATH):
|
||||
start()
|
||||
else:
|
||||
print("Oops! The TV Time GDPR folder '" + config.GDPR_WORKSPACE_PATH +
|
||||
"' does not exist on the local system. Please check it, and try again.")
|
||||
print(
|
||||
"Oops! The TV Time GDPR folder '"
|
||||
+ config.GDPR_WORKSPACE_PATH
|
||||
+ "' does not exist on the local system. Please check it, and try again."
|
||||
)
|
||||
else:
|
||||
print(f"ERROR: The 'config.json' file cannot be found - have you created it yet?")
|
||||
print(
|
||||
f"ERROR: The 'config.json' file cannot be found - have you created it yet?"
|
||||
)
|
||||
|
|
Loading…
Reference in a new issue