mirror of
https://github.com/SinTan1729/TvTimeToTrakt.git
synced 2024-12-26 05:18:36 -06:00
331 lines
13 KiB
Python
331 lines
13 KiB
Python
|
# main.py
|
||
|
from logging import error
|
||
|
import sys
|
||
|
from trakt import init
|
||
|
from trakt.movies import get_recommended_movies
|
||
|
import trakt.core
|
||
|
import os
|
||
|
from trakt.tv import TVShow
|
||
|
import csv
|
||
|
from datetime import datetime
|
||
|
import time
|
||
|
from tinydb import TinyDB, Query
|
||
|
import json
|
||
|
import re
|
||
|
import sys
|
||
|
|
||
|
# Create a database to keep track of completed processes
|
||
|
database = TinyDB('localStorage.json')
|
||
|
syncedEpisodesTable = database.table('SyncedEpisodes')
|
||
|
userMatchedShowsTable = database.table('TvTimeTraktUserMatched')
|
||
|
|
||
|
|
||
|
class Expando(object):
|
||
|
pass
|
||
|
|
||
|
|
||
|
def initTraktAuth():
|
||
|
# Set the method of authentication
|
||
|
trakt.core.AUTH_METHOD = trakt.core.OAUTH_AUTH
|
||
|
return init(getConfiguration().TRAKT_USERNAME, store=True, client_id=getConfiguration().CLIENT_ID, client_secret=getConfiguration().CLIENT_SECRET)
|
||
|
|
||
|
|
||
|
def getConfiguration():
|
||
|
configEx = Expando()
|
||
|
|
||
|
with open('config.json') as f:
|
||
|
data = json.load(f)
|
||
|
|
||
|
configEx.TRAKT_USERNAME = data["TRAKT_USERNAME"]
|
||
|
configEx.CLIENT_ID = data["CLIENT_ID"]
|
||
|
configEx.CLIENT_SECRET = data["CLIENT_SECRET"]
|
||
|
configEx.GDPR_WORKSPACE_PATH = data["GDPR_WORKSPACE_PATH"]
|
||
|
|
||
|
return configEx
|
||
|
|
||
|
|
||
|
def getYearFromTitle(title):
|
||
|
ex = Expando()
|
||
|
|
||
|
try:
|
||
|
# Get the year
|
||
|
yearSearch = re.search(r"\(([A-Za-z0-9_]+)\)", title)
|
||
|
yearValue = yearSearch.group(1)
|
||
|
# Get the value outside the title
|
||
|
titleValue = title.split('(')[0].strip()
|
||
|
|
||
|
ex.titleWithoutYear = titleValue
|
||
|
ex.yearValue = int(yearValue)
|
||
|
return ex
|
||
|
except:
|
||
|
ex.titleWithoutYear = title
|
||
|
ex.yearValue = -1
|
||
|
return ex
|
||
|
|
||
|
|
||
|
def checkTitleNameMatch(tvTimeTitle, traktTitle):
|
||
|
# If a name is simply a 1:1 match, then return true
|
||
|
if tvTimeTitle == traktTitle:
|
||
|
return True
|
||
|
|
||
|
# Split the TvTime title
|
||
|
tvTimeTitleSplit = tvTimeTitle.split()
|
||
|
|
||
|
# Get all the words which were in the title
|
||
|
wordsMatched = []
|
||
|
|
||
|
# Go through each word of the title, and confirm if the is a match
|
||
|
for word in tvTimeTitleSplit:
|
||
|
if word in traktTitle:
|
||
|
wordsMatched.append(word)
|
||
|
|
||
|
# Calculate the accuracy of the match
|
||
|
quotient = len(wordsMatched) / len(traktTitle.split())
|
||
|
percentage = quotient * 100
|
||
|
|
||
|
# If the title contains more than 50% of the words, then bring it up for use
|
||
|
return percentage > 50
|
||
|
|
||
|
|
||
|
def getShowByName(name, seasonNo, episodeNo):
|
||
|
# Get the 'year' from the title - if one is present
|
||
|
titleObj = getYearFromTitle(name)
|
||
|
|
||
|
# Title includes a year value, which helps with ensuring accuracy of pick
|
||
|
doesTitleIncludeYear = titleObj.yearValue != -1
|
||
|
|
||
|
# If the title included the year, then replace the name value
|
||
|
# with the string without the year - which helps with ensuring
|
||
|
# Trakt search accuracy
|
||
|
if doesTitleIncludeYear:
|
||
|
name = titleObj.titleWithoutYear
|
||
|
|
||
|
# Search for a show with the name
|
||
|
tvSearch = TVShow.search(name)
|
||
|
|
||
|
showsWithSameName = []
|
||
|
|
||
|
# Check if the search returned more than 1 result with the same name
|
||
|
for show in tvSearch:
|
||
|
if checkTitleNameMatch(name, show.title):
|
||
|
# If the TV Time title included a year, then only add results with matching broadcast year
|
||
|
if doesTitleIncludeYear == True:
|
||
|
# If the show title is a 1:1 match, with the year, then don't bother with checking the rest -
|
||
|
# since this will be a complete match
|
||
|
if (name == show.title) and (show.year == titleObj.yearValue):
|
||
|
showsWithSameName = []
|
||
|
showsWithSameName.append(show)
|
||
|
break
|
||
|
|
||
|
# Otherwise, check if the year is a match
|
||
|
if show.year == titleObj.yearValue:
|
||
|
showsWithSameName.append(show)
|
||
|
# Otherwise, just add all options
|
||
|
else:
|
||
|
showsWithSameName.append(show)
|
||
|
|
||
|
# Filter down the results further to results containing a 1:1 match on title
|
||
|
completeMatchNames = []
|
||
|
for nameFromSearch in showsWithSameName:
|
||
|
if nameFromSearch.title == name:
|
||
|
completeMatchNames.append(nameFromSearch)
|
||
|
|
||
|
if (len(completeMatchNames) == 1):
|
||
|
showsWithSameName = completeMatchNames
|
||
|
|
||
|
# If the search contains more than one result with the same name, then confirm with user
|
||
|
if len(showsWithSameName) > 1:
|
||
|
|
||
|
# Check if the user has made a selection already
|
||
|
userMatchedQuery = Query()
|
||
|
queryResult = userMatchedShowsTable.search(
|
||
|
userMatchedQuery.ShowName == name)
|
||
|
|
||
|
# If the user has already made a selection for the show, then use the existing selection
|
||
|
if len(queryResult) == 1:
|
||
|
# Get the first row
|
||
|
firstMatch = queryResult[0]
|
||
|
firstMatchSelectedIndex = int(firstMatch.get('UserSelectedIndex'))
|
||
|
skipShow = firstMatch.get('SkipShow')
|
||
|
|
||
|
if skipShow == False:
|
||
|
return showsWithSameName[firstMatchSelectedIndex]
|
||
|
else:
|
||
|
return None
|
||
|
# Otherwise, ask the user which show they want to match with
|
||
|
else:
|
||
|
# Ask the user to pick
|
||
|
print(
|
||
|
f"MESSAGE: The TV Time Show '{name}' (Season {seasonNo}, Episode {episodeNo}) has {len(showsWithSameName)} matching Trakt shows with the same name.")
|
||
|
|
||
|
for idx, item in enumerate(showsWithSameName):
|
||
|
print(
|
||
|
f" ({idx}) {item.title} - {item.year} - {len(item.seasons)} Season(s) - More Info: https://trakt.tv/{item.ext}")
|
||
|
|
||
|
while(True):
|
||
|
try:
|
||
|
indexSelected = (input(
|
||
|
f"Please make a selection from above (or enter SKIP):"))
|
||
|
|
||
|
# If the input was not skip, then validate the selection before ending loop
|
||
|
if indexSelected != 'SKIP':
|
||
|
int(indexSelected)
|
||
|
break
|
||
|
# Otherwise, exit with SKIP
|
||
|
else:
|
||
|
break
|
||
|
except KeyboardInterrupt:
|
||
|
sys.exit("Cancel requested...")
|
||
|
except:
|
||
|
print(
|
||
|
f"Sorry! Please select a value between 0 to {len(showsWithSameName)}")
|
||
|
|
||
|
# If the user decides to skip the selection, return None
|
||
|
if (indexSelected == 'SKIP'):
|
||
|
userMatchedShowsTable.insert(
|
||
|
{'ShowName': name, 'UserSelectedIndex': 0, 'SkipShow': True})
|
||
|
|
||
|
return None
|
||
|
# Otherwise, return the selected show
|
||
|
else:
|
||
|
selectedShow = showsWithSameName[int(indexSelected)]
|
||
|
|
||
|
userMatchedShowsTable.insert(
|
||
|
{'ShowName': name, 'UserSelectedIndex': indexSelected, 'SkipShow': False})
|
||
|
|
||
|
return selectedShow
|
||
|
|
||
|
else:
|
||
|
return showsWithSameName[0]
|
||
|
|
||
|
# Confirm if the season has a "special" season starting at 0, if not, then subtract the seasonNo by 1
|
||
|
|
||
|
|
||
|
def parseSeasonNo(seasonNo, traktShowObj):
|
||
|
seasonNo = int(seasonNo)
|
||
|
|
||
|
# Get the first season number in the array
|
||
|
firstSeasonNo = traktShowObj.seasons[0].number
|
||
|
|
||
|
# If the season number is 0, then the show contains a "special" season
|
||
|
if firstSeasonNo == 0:
|
||
|
# Return the Season Number, as is
|
||
|
return seasonNo
|
||
|
else:
|
||
|
# Otherwise, if the seasons start from 0, without any specials, then return the seasonNo,
|
||
|
# but subtracted by one (unless it is a special season in TV Time)
|
||
|
if seasonNo != 0:
|
||
|
return seasonNo - 1
|
||
|
else:
|
||
|
return seasonNo
|
||
|
|
||
|
|
||
|
def getWatchedShowsPath():
|
||
|
return getConfiguration().GDPR_WORKSPACE_PATH + "/seen_episode.csv"
|
||
|
|
||
|
|
||
|
def processWatchedShows():
|
||
|
# Keep a count of rows processed etc
|
||
|
rowsCount = 0
|
||
|
rowsTotal = 0
|
||
|
errorStreak = 0
|
||
|
# Quickly sweep through the file to get the row count
|
||
|
with open(getWatchedShowsPath()) as f:
|
||
|
rowsTotal = sum(1 for line in f)
|
||
|
|
||
|
with open(getWatchedShowsPath(), newline='') as csvfile:
|
||
|
showsReader = csv.reader(csvfile, delimiter=',')
|
||
|
|
||
|
for row in showsReader:
|
||
|
rowsCount += 1
|
||
|
# Get the values from the CSV record
|
||
|
tvShowName = row[4]
|
||
|
|
||
|
# Skip first row
|
||
|
if tvShowName != "tv_show_name":
|
||
|
tvShowEpisodeId = row[1]
|
||
|
tvShowSeasonNo = row[7]
|
||
|
tvShowEpisodeNo = row[8]
|
||
|
tvShowDateWatched = row[5]
|
||
|
tvShowDateWatchedConverted = datetime.strptime(
|
||
|
tvShowDateWatched, '%Y-%m-%d %H:%M:%S')
|
||
|
|
||
|
# Query the database to check if it's already been processed
|
||
|
episodeCompletedQuery = Query()
|
||
|
queryResult = syncedEpisodesTable.search(
|
||
|
episodeCompletedQuery.episodeId == tvShowEpisodeId)
|
||
|
|
||
|
if len(queryResult) == 0:
|
||
|
while True:
|
||
|
if (errorStreak > 10):
|
||
|
print(
|
||
|
f"An error occurred 10 times in a row... skipping episode...")
|
||
|
break
|
||
|
|
||
|
try:
|
||
|
# Sleep for a second between each process, before adding the next watched episode,
|
||
|
# this ensures that the program is within the rate limit of 1 per second.
|
||
|
time.sleep(5)
|
||
|
# Output to console
|
||
|
print(f"({rowsCount}/{rowsTotal}) Processing Show '" + tvShowName +
|
||
|
"' on Season " + tvShowSeasonNo + " - Episode " + tvShowEpisodeNo)
|
||
|
# Get the Trakt version of the show
|
||
|
traktShowObj = getShowByName(
|
||
|
tvShowName, tvShowSeasonNo, tvShowEpisodeNo)
|
||
|
# Skip the episode, if no show was selected
|
||
|
if traktShowObj == None:
|
||
|
break
|
||
|
# Add the show to the user's library
|
||
|
traktShowObj.add_to_library()
|
||
|
# Get the season
|
||
|
season = traktShowObj.seasons[parseSeasonNo(
|
||
|
tvShowSeasonNo, traktShowObj)]
|
||
|
episode = season.episodes[int(tvShowEpisodeNo) - 1]
|
||
|
# Mark the episode as watched
|
||
|
episode.mark_as_seen(tvShowDateWatchedConverted)
|
||
|
# Add the show to the tracker as completed
|
||
|
syncedEpisodesTable.insert(
|
||
|
{'episodeId': tvShowEpisodeId})
|
||
|
# Once the episode has been marked watched, then break out of the loop
|
||
|
errorStreak = 0
|
||
|
break
|
||
|
except IndexError:
|
||
|
print("Oops! '" + tvShowName +
|
||
|
"' on Season " + tvShowSeasonNo + " - Episode " + tvShowEpisodeNo + " is not within range of show array!")
|
||
|
break
|
||
|
except trakt.errors.NotFoundException:
|
||
|
print("Show '" + tvShowName +
|
||
|
"' on Season " + tvShowSeasonNo + " - Episode " + tvShowEpisodeNo + " does not exist!")
|
||
|
break
|
||
|
except trakt.errors.RateLimitException:
|
||
|
print(
|
||
|
"Oops! You have hit the rate limit! The program will now pause for 1 minute...")
|
||
|
time.sleep(60)
|
||
|
errorStreak += 1
|
||
|
except json.decoder.JSONDecodeError:
|
||
|
print(
|
||
|
f"Oh, oh! A JSON Decode error occurred - maybe a dodgy response from the server? Waiting 60 seconds before resuming")
|
||
|
time.sleep(60)
|
||
|
errorStreak += 1
|
||
|
except KeyboardInterrupt:
|
||
|
sys.exit("Cancel requested...")
|
||
|
else:
|
||
|
print(f"({rowsCount}/{rowsTotal}) Skipping '" + tvShowName +
|
||
|
"' on Season " + tvShowSeasonNo + " - Episode " + tvShowEpisodeNo + ". It's already been imported!")
|
||
|
|
||
|
|
||
|
def start():
|
||
|
if initTraktAuth():
|
||
|
# Start processing the TV shows
|
||
|
processWatchedShows()
|
||
|
else:
|
||
|
print("Unable to authenticate with Trakt!")
|
||
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
if os.path.isdir(getConfiguration().GDPR_WORKSPACE_PATH):
|
||
|
start()
|
||
|
else:
|
||
|
print("Oops! The TV Time GDPR folder '" + getConfiguration().GDPR_WORKSPACE_PATH +
|
||
|
"' does not exist on the local system. Please check it, and try again.")
|