commit f0623aac50444076b6985b19dd7b0fc21fce80b7 Author: Luke Arran Date: Fri Apr 9 15:13:06 2021 +0100 removing previous commits to protect private data diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ac64c2d --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +watched_show_process_tracker.json +localStorage.json +config.json +TimeToTrackt.py diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..17e15f2 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,15 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Python: Current File", + "type": "python", + "request": "launch", + "program": "${file}", + "console": "integratedTerminal" + } + ] +} \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..72f60fa --- /dev/null +++ b/README.md @@ -0,0 +1,35 @@ +# TimeToTrakt +A quick Python script to import TV Time watched episode data into Trakt.TV - using data provided by Whip Media Company via a GDPR request. + +# Issues +They'll be a few! If you come across anything then let me know in the 'Issue' section, and I'll provide help where possible. + +# Notes +1. The script is using limited data provided from a GDPR request - so the accuracy isn't 100%. But you will be prompted to manually pick the Trakt show, when it can't be determined automatically. +2. A delay of 5 seconds is added between each episode to ensure fair use of Trakt's servers - especially with my import of 6,500 episodes. +3. Episodes which have been imported are be saved to `localStorage.json` - this will let you re-run and skip to episodes which have not been imported yet. + +# Setup +## Get your Data +The TV Time API is request based only. In order to get access to your data, you will have to request it from TV Time's Support via a GDPR request - or just ask for it! + +1. Copy the template provided by [www.datarequests.org](https://www.datarequests.org/blog/sample-letter-gdpr-access-request/) into an email. +2. Send it to support@tvtime.com +3. Wait a few working days to process the request +4. Extract the data somewhere safe. + +## Register API Access at Trakt +1. Go to "Settings" under your profile +2. Select ["Your API Applications"](https://trakt.tv/oauth/applications) +3. Select "New Application" +4. Provide a random name into "Name" +5. Paste "urn:ietf:wg:oauth:2.0:oob" into "Redirect uri:" +6. Click "Save App" +7. Make note of your app + +## Setup Script +Install the following frameworks via Pip: +1. trakt +2. tinydb + +Then, execute the program using the `./python3 TimeToTrakt.py` and provide the requested information when prompted. \ No newline at end of file diff --git a/TimeToTrackt.py b/TimeToTrackt.py new file mode 100644 index 0000000..3fe0d74 --- /dev/null +++ b/TimeToTrackt.py @@ -0,0 +1,330 @@ +# main.py +from logging import error +import sys +from trakt import init +from trakt.movies import get_recommended_movies +import trakt.core +import os +from trakt.tv import TVShow +import csv +from datetime import datetime +import time +from tinydb import TinyDB, Query +import json +import re +import sys + +# Create a database to keep track of completed processes +database = TinyDB('localStorage.json') +syncedEpisodesTable = database.table('SyncedEpisodes') +userMatchedShowsTable = database.table('TvTimeTraktUserMatched') + + +class Expando(object): + pass + + +def initTraktAuth(): + # Set the method of authentication + trakt.core.AUTH_METHOD = trakt.core.OAUTH_AUTH + return init(getConfiguration().TRAKT_USERNAME, store=True, client_id=getConfiguration().CLIENT_ID, client_secret=getConfiguration().CLIENT_SECRET) + + +def getConfiguration(): + configEx = Expando() + + with open('config.json') as f: + data = json.load(f) + + configEx.TRAKT_USERNAME = data["TRAKT_USERNAME"] + configEx.CLIENT_ID = data["CLIENT_ID"] + configEx.CLIENT_SECRET = data["CLIENT_SECRET"] + configEx.GDPR_WORKSPACE_PATH = data["GDPR_WORKSPACE_PATH"] + + return configEx + + +def getYearFromTitle(title): + ex = Expando() + + try: + # Get the year + yearSearch = re.search(r"\(([A-Za-z0-9_]+)\)", title) + yearValue = yearSearch.group(1) + # Get the value outside the title + titleValue = title.split('(')[0].strip() + + ex.titleWithoutYear = titleValue + ex.yearValue = int(yearValue) + return ex + except: + ex.titleWithoutYear = title + ex.yearValue = -1 + return ex + + +def checkTitleNameMatch(tvTimeTitle, traktTitle): + # If a name is simply a 1:1 match, then return true + if tvTimeTitle == traktTitle: + return True + + # Split the TvTime title + tvTimeTitleSplit = tvTimeTitle.split() + + # Get all the words which were in the title + wordsMatched = [] + + # Go through each word of the title, and confirm if the is a match + for word in tvTimeTitleSplit: + if word in traktTitle: + wordsMatched.append(word) + + # Calculate the accuracy of the match + quotient = len(wordsMatched) / len(traktTitle.split()) + percentage = quotient * 100 + + # If the title contains more than 50% of the words, then bring it up for use + return percentage > 50 + + +def getShowByName(name, seasonNo, episodeNo): + # Get the 'year' from the title - if one is present + titleObj = getYearFromTitle(name) + + # Title includes a year value, which helps with ensuring accuracy of pick + doesTitleIncludeYear = titleObj.yearValue != -1 + + # If the title included the year, then replace the name value + # with the string without the year - which helps with ensuring + # Trakt search accuracy + if doesTitleIncludeYear: + name = titleObj.titleWithoutYear + + # Search for a show with the name + tvSearch = TVShow.search(name) + + showsWithSameName = [] + + # Check if the search returned more than 1 result with the same name + for show in tvSearch: + if checkTitleNameMatch(name, show.title): + # If the TV Time title included a year, then only add results with matching broadcast year + if doesTitleIncludeYear == True: + # If the show title is a 1:1 match, with the year, then don't bother with checking the rest - + # since this will be a complete match + if (name == show.title) and (show.year == titleObj.yearValue): + showsWithSameName = [] + showsWithSameName.append(show) + break + + # Otherwise, check if the year is a match + if show.year == titleObj.yearValue: + showsWithSameName.append(show) + # Otherwise, just add all options + else: + showsWithSameName.append(show) + + # Filter down the results further to results containing a 1:1 match on title + completeMatchNames = [] + for nameFromSearch in showsWithSameName: + if nameFromSearch.title == name: + completeMatchNames.append(nameFromSearch) + + if (len(completeMatchNames) == 1): + showsWithSameName = completeMatchNames + + # If the search contains more than one result with the same name, then confirm with user + if len(showsWithSameName) > 1: + + # Check if the user has made a selection already + userMatchedQuery = Query() + queryResult = userMatchedShowsTable.search( + userMatchedQuery.ShowName == name) + + # If the user has already made a selection for the show, then use the existing selection + if len(queryResult) == 1: + # Get the first row + firstMatch = queryResult[0] + firstMatchSelectedIndex = int(firstMatch.get('UserSelectedIndex')) + skipShow = firstMatch.get('SkipShow') + + if skipShow == False: + return showsWithSameName[firstMatchSelectedIndex] + else: + return None + # Otherwise, ask the user which show they want to match with + else: + # Ask the user to pick + print( + f"MESSAGE: The TV Time Show '{name}' (Season {seasonNo}, Episode {episodeNo}) has {len(showsWithSameName)} matching Trakt shows with the same name.") + + for idx, item in enumerate(showsWithSameName): + print( + f" ({idx}) {item.title} - {item.year} - {len(item.seasons)} Season(s) - More Info: https://trakt.tv/{item.ext}") + + while(True): + try: + indexSelected = (input( + f"Please make a selection from above (or enter SKIP):")) + + # If the input was not skip, then validate the selection before ending loop + if indexSelected != 'SKIP': + int(indexSelected) + break + # Otherwise, exit with SKIP + else: + break + except KeyboardInterrupt: + sys.exit("Cancel requested...") + except: + print( + f"Sorry! Please select a value between 0 to {len(showsWithSameName)}") + + # If the user decides to skip the selection, return None + if (indexSelected == 'SKIP'): + userMatchedShowsTable.insert( + {'ShowName': name, 'UserSelectedIndex': 0, 'SkipShow': True}) + + return None + # Otherwise, return the selected show + else: + selectedShow = showsWithSameName[int(indexSelected)] + + userMatchedShowsTable.insert( + {'ShowName': name, 'UserSelectedIndex': indexSelected, 'SkipShow': False}) + + return selectedShow + + else: + return showsWithSameName[0] + +# Confirm if the season has a "special" season starting at 0, if not, then subtract the seasonNo by 1 + + +def parseSeasonNo(seasonNo, traktShowObj): + seasonNo = int(seasonNo) + + # Get the first season number in the array + firstSeasonNo = traktShowObj.seasons[0].number + + # If the season number is 0, then the show contains a "special" season + if firstSeasonNo == 0: + # Return the Season Number, as is + return seasonNo + else: + # Otherwise, if the seasons start from 0, without any specials, then return the seasonNo, + # but subtracted by one (unless it is a special season in TV Time) + if seasonNo != 0: + return seasonNo - 1 + else: + return seasonNo + + +def getWatchedShowsPath(): + return getConfiguration().GDPR_WORKSPACE_PATH + "/seen_episode.csv" + + +def processWatchedShows(): + # Keep a count of rows processed etc + rowsCount = 0 + rowsTotal = 0 + errorStreak = 0 + # Quickly sweep through the file to get the row count + with open(getWatchedShowsPath()) as f: + rowsTotal = sum(1 for line in f) + + with open(getWatchedShowsPath(), newline='') as csvfile: + showsReader = csv.reader(csvfile, delimiter=',') + + for row in showsReader: + rowsCount += 1 + # Get the values from the CSV record + tvShowName = row[4] + + # Skip first row + if tvShowName != "tv_show_name": + tvShowEpisodeId = row[1] + tvShowSeasonNo = row[7] + tvShowEpisodeNo = row[8] + tvShowDateWatched = row[5] + tvShowDateWatchedConverted = datetime.strptime( + tvShowDateWatched, '%Y-%m-%d %H:%M:%S') + + # Query the database to check if it's already been processed + episodeCompletedQuery = Query() + queryResult = syncedEpisodesTable.search( + episodeCompletedQuery.episodeId == tvShowEpisodeId) + + if len(queryResult) == 0: + while True: + if (errorStreak > 10): + print( + f"An error occurred 10 times in a row... skipping episode...") + break + + try: + # Sleep for a second between each process, before adding the next watched episode, + # this ensures that the program is within the rate limit of 1 per second. + time.sleep(5) + # Output to console + print(f"({rowsCount}/{rowsTotal}) Processing Show '" + tvShowName + + "' on Season " + tvShowSeasonNo + " - Episode " + tvShowEpisodeNo) + # Get the Trakt version of the show + traktShowObj = getShowByName( + tvShowName, tvShowSeasonNo, tvShowEpisodeNo) + # Skip the episode, if no show was selected + if traktShowObj == None: + break + # Add the show to the user's library + traktShowObj.add_to_library() + # Get the season + season = traktShowObj.seasons[parseSeasonNo( + tvShowSeasonNo, traktShowObj)] + episode = season.episodes[int(tvShowEpisodeNo) - 1] + # Mark the episode as watched + episode.mark_as_seen(tvShowDateWatchedConverted) + # Add the show to the tracker as completed + syncedEpisodesTable.insert( + {'episodeId': tvShowEpisodeId}) + # Once the episode has been marked watched, then break out of the loop + errorStreak = 0 + break + except IndexError: + print("Oops! '" + tvShowName + + "' on Season " + tvShowSeasonNo + " - Episode " + tvShowEpisodeNo + " is not within range of show array!") + break + except trakt.errors.NotFoundException: + print("Show '" + tvShowName + + "' on Season " + tvShowSeasonNo + " - Episode " + tvShowEpisodeNo + " does not exist!") + break + except trakt.errors.RateLimitException: + print( + "Oops! You have hit the rate limit! The program will now pause for 1 minute...") + time.sleep(60) + errorStreak += 1 + except json.decoder.JSONDecodeError: + print( + f"Oh, oh! A JSON Decode error occurred - maybe a dodgy response from the server? Waiting 60 seconds before resuming") + time.sleep(60) + errorStreak += 1 + except KeyboardInterrupt: + sys.exit("Cancel requested...") + else: + print(f"({rowsCount}/{rowsTotal}) Skipping '" + tvShowName + + "' on Season " + tvShowSeasonNo + " - Episode " + tvShowEpisodeNo + ". It's already been imported!") + + +def start(): + if initTraktAuth(): + # Start processing the TV shows + processWatchedShows() + else: + print("Unable to authenticate with Trakt!") + + +if __name__ == "__main__": + if os.path.isdir(getConfiguration().GDPR_WORKSPACE_PATH): + start() + else: + print("Oops! The TV Time GDPR folder '" + getConfiguration().GDPR_WORKSPACE_PATH + + "' does not exist on the local system. Please check it, and try again.")