from concurrent.futures import ThreadPoolExecutor import json from pathlib import Path import re from urllib.request import urlopen from deemix.plugins import Plugin from deemix.utils.localpaths import getConfigFolder from deemix.itemgen import generateTrackItem, generateAlbumItem, GenerationError, TrackNotOnDeezer, AlbumNotOnDeezer from deemix.types.DownloadObjects import Convertable import spotipy SpotifyClientCredentials = spotipy.oauth2.SpotifyClientCredentials class Spotify(Plugin): def __init__(self, configFolder=None): super().__init__() self.credentials = {'clientId': "", 'clientSecret': ""} self.settings = { 'fallbackSearch': False } self.enabled = False self.sp = None self.configFolder = Path(configFolder or getConfigFolder()) self.configFolder /= 'spotify' def setup(self): if not self.configFolder.is_dir(): self.configFolder.mkdir() self.loadSettings() return self @classmethod def parseLink(cls, link): if 'link.tospotify.com' in link: link = urlopen(link).url # Remove extra stuff if '?' in link: link = link[:link.find('?')] if '&' in link: link = link[:link.find('&')] if link.endswith('/'): link = link[:-1] # Remove last slash if present link_type = None link_id = None if not 'spotify' in link: return (link, link_type, link_id) # return if not a spotify link if re.search(r"[/:]track[/:](.+)", link): link_type = 'track' link_id = re.search(r"[/:]track[/:](.+)", link).group(1) elif re.search(r"[/:]album[/:](.+)", link): link_type = 'album' link_id = re.search(r"[/:]album[/:](.+)", link).group(1) elif re.search(r"[/:]playlist[/:](.+)", link): link_type = 'playlist' link_id = re.search(r"[/:]playlist[/:](.+)", link).group(1) return (link, link_type, link_id) def generateDownloadObject(self, dz, link, bitrate): (link, link_type, link_id) = self.parseLink(link) if link_type is None or link_id is None: return None if link_type == "track": return self.generateTrackItem(dz, link_id, bitrate) if link_type == "album": return self.generateAlbumItem(dz, link_id, bitrate) if link_type == "playlist": return self.generatePlaylistItem(dz, link_id, bitrate) return None def generateTrackItem(self, dz, link_id, bitrate): cache = self.loadCache() if link_id in cache['tracks']: cachedTrack = cache['tracks'][link_id] else: cachedTrack = self.getTrack(link_id) cache['tracks'][link_id] = cachedTrack self.saveCache(cache) if 'isrc' in cachedTrack: try: return generateTrackItem(dz, f"isrc:{cachedTrack['isrc']}", bitrate) except GenerationError: pass if self.settings['fallbackSearch']: if 'id' not in cachedTrack or cachedTrack['id'] == "0": trackID = dz.api.get_track_id_from_metadata( cachedTrack['data']['artist'], cachedTrack['data']['title'], cachedTrack['data']['album'], ) if trackID != "0": cachedTrack['id'] = trackID cache['tracks'][link_id] = cachedTrack self.saveCache(cache) if cachedTrack['id'] != "0": return generateTrackItem(dz, cachedTrack['id'], bitrate) raise TrackNotOnDeezer(f"https://open.spotify.com/track/{link_id}") def generateAlbumItem(self, dz, link_id, bitrate): cache = self.loadCache() if link_id in cache['albums']: cachedAlbum = cache['albums'][link_id] else: cachedAlbum = self.getAlbum(link_id) cache['albums'][link_id] = cachedAlbum self.saveCache(cache) try: return generateAlbumItem(dz, f"upc:{cachedAlbum['upc']}", bitrate) except GenerationError as e: raise AlbumNotOnDeezer(f"https://open.spotify.com/album/{link_id}") from e def generatePlaylistItem(self, dz, link_id, bitrate): if not self.enabled: raise Exception("Spotify plugin not enabled") spotifyPlaylist = self.sp.playlist(link_id) playlistAPI = self._convertPlaylistStructure(spotifyPlaylist) playlistAPI.various_artist = dz.api.get_artist(5080) # Useful for save as compilation tracklistTemp = spotifyPlaylist.track.items while spotifyPlaylist['tracks']['next']: spotifyPlaylist['tracks'] = self.sp.next(spotifyPlaylist['tracks']) tracklistTemp += spotifyPlaylist['tracks']['items'] tracklist = [] for item in tracklistTemp: if item['track']: if item['track']['explicit']: playlistAPI['explicit'] = True tracklist.append(item['track']) if 'explicit' not in playlistAPI: playlistAPI['explicit'] = False return Convertable({ 'type': 'spotify_playlist', 'id': link_id, 'bitrate': bitrate, 'title': spotifyPlaylist['name'], 'artist': spotifyPlaylist['owner']['display_name'], 'cover': playlistAPI['picture_thumbnail'], 'explicit': playlistAPI['explicit'], 'size': len(tracklist), 'collection': { 'tracks_gw': [], 'playlistAPI': playlistAPI }, 'plugin': 'spotify', 'conversion_data': tracklist }) def getTrack(self, track_id, spotifyTrack=None): if not self.enabled: raise Exception("Spotify plugin not enabled") cachedTrack = { 'isrc': None, 'data': None } if not spotifyTrack: spotifyTrack = self.sp.track(track_id) if 'isrc' in spotifyTrack.get('external_ids', {}): cachedTrack['isrc'] = spotifyTrack['external_ids']['isrc'] cachedTrack['data'] = { 'title': spotifyTrack['name'], 'artist': spotifyTrack['artists'][0]['name'], 'album': spotifyTrack['album']['name'] } return cachedTrack def getAlbum(self, album_id, spotifyAlbum=None): if not self.enabled: raise Exception("Spotify plugin not enabled") cachedAlbum = { 'upc': None, 'data': None } if not spotifyAlbum: spotifyAlbum = self.sp.album(album_id) if 'upc' in spotifyAlbum.get('external_ids', {}): cachedAlbum['upc'] = spotifyAlbum['external_ids']['upc'] cachedAlbum['data'] = { 'title': spotifyAlbum['name'], 'artist': spotifyAlbum['artists'][0]['name'] } return cachedAlbum def convertTrack(self, dz, downloadObject, track, pos, conversion, conversionNext, cache, listener): if downloadObject.isCanceled: return if track['id'] in cache['tracks']: cachedTrack = cache['tracks'][track['id']] else: cachedTrack = self.getTrack(track['id'], track) cache['tracks'][track['id']] = cachedTrack self.saveCache(cache) if 'isrc' in cachedTrack: try: trackAPI = dz.api.get_track_by_ISRC(cachedTrack['isrc']) if 'id' not in trackAPI or 'title' not in trackAPI: trackAPI = None except GenerationError: pass if self.settings['fallbackSearch'] and not trackAPI: if 'id' not in cachedTrack or cachedTrack['id'] == "0": trackID = dz.api.get_track_id_from_metadata( cachedTrack['data']['artist'], cachedTrack['data']['title'], cachedTrack['data']['album'], ) if trackID != "0": cachedTrack['id'] = trackID cache['tracks'][track['id']] = cachedTrack self.saveCache(cache) if cachedTrack['id'] != "0": trackAPI = dz.api.get_track(cachedTrack['id']) if not trackAPI: deezerTrack = { 'SNG_ID': "0", 'SNG_TITLE': track['name'], 'DURATION': 0, 'MD5_ORIGIN': 0, 'MEDIA_VERSION': 0, 'FILESIZE': 0, 'ALB_TITLE': track['album']['name'], 'ALB_PICTURE': "", 'ART_ID': 0, 'ART_NAME': track['artists'][0]['name'] } else: deezerTrack = dz.gw.get_track_with_fallback(trackAPI['id']) deezerTrack['_EXTRA_TRACK'] = trackAPI deezerTrack['POSITION'] = pos+1 conversionNext += (1 / downloadObject.size) * 100 if round(conversionNext) != conversion and round(conversionNext) % 2 == 0: conversion = round(conversionNext) if listener: listener.send("updateQueue", {'uuid': downloadObject.uuid, 'conversion': conversion}) def convert(self, dz, downloadObject, settings, listener=None): cache = self.loadCache() conversion = 0 conversionNext = 0 collection = [None] * len(downloadObject.conversion_data) with ThreadPoolExecutor(settings['queueConcurrency']) as executor: for pos, track in enumerate(downloadObject.conversion_data, start=0): collection[pos] = executor.submit(self.convertTrack, dz, downloadObject, track, pos, conversion, conversionNext, cache, listener ) @classmethod def _convertPlaylistStructure(cls, spotifyPlaylist): cover = None if len(spotifyPlaylist['images']): cover = spotifyPlaylist['images'][0]['url'] deezerPlaylist = { 'checksum': spotifyPlaylist['snapshot_id'], 'collaborative': spotifyPlaylist['collaborative'], 'creation_date': "XXXX-00-00", 'creator': { 'id': spotifyPlaylist['owner']['id'], 'name': spotifyPlaylist['owner']['display_name'], 'tracklist': spotifyPlaylist['owner']['href'], 'type': "user" }, 'description': spotifyPlaylist['description'], 'duration': 0, 'fans': spotifyPlaylist['followers']['total'] if 'followers' in spotifyPlaylist else 0, 'id': spotifyPlaylist['id'], 'is_loved_track': False, 'link': spotifyPlaylist['external_urls']['spotify'], 'nb_tracks': spotifyPlaylist['tracks']['total'], 'picture': cover, 'picture_small': cover or "https://e-cdns-images.dzcdn.net/images/cover/d41d8cd98f00b204e9800998ecf8427e/56x56-000000-80-0-0.jpg", 'picture_medium': cover or "https://e-cdns-images.dzcdn.net/images/cover/d41d8cd98f00b204e9800998ecf8427e/250x250-000000-80-0-0.jpg", 'picture_big': cover or "https://e-cdns-images.dzcdn.net/images/cover/d41d8cd98f00b204e9800998ecf8427e/500x500-000000-80-0-0.jpg", 'picture_xl': cover or "https://e-cdns-images.dzcdn.net/images/cover/d41d8cd98f00b204e9800998ecf8427e/1000x1000-000000-80-0-0.jpg", 'public': spotifyPlaylist['public'], 'share': spotifyPlaylist['external_urls']['spotify'], 'title': spotifyPlaylist['name'], 'tracklist': spotifyPlaylist['tracks']['href'], 'type': "playlist" } return deezerPlaylist def loadSettings(self): if not (self.configFolder / 'settings.json').is_file(): with open(self.configFolder / 'settings.json', 'w') as f: json.dump({**self.credentials, **self.settings}, f, indent=2) with open(self.configFolder / 'settings.json', 'r') as settingsFile: settings = json.load(settingsFile) self.setSettings(settings) self.checkCredentials() def saveSettings(self, newSettings=None): if newSettings: self.setSettings(newSettings) self.checkCredentials() with open(self.configFolder / 'settings.json', 'w') as f: json.dump({**self.credentials, **self.settings}, f, indent=2) def getSettings(self): return {**self.credentials, **self.settings} def setSettings(self, newSettings): self.credentials = { 'clientId': newSettings['clientId'], 'clientSecret': newSettings['clientSecret'] } settings = {**newSettings} del settings['clientId'] del settings['clientSecret'] self.settings = settings def loadCache(self): if (self.configFolder / 'cache.json').is_file(): with open(self.configFolder / 'cache.json', 'r') as f: cache = json.load(f) else: cache = {'tracks': {}, 'albums': {}} return cache def saveCache(self, newCache): with open(self.configFolder / 'cache.json', 'w') as spotifyCache: json.dump(newCache, spotifyCache) def checkCredentials(self): if self.credentials['clientId'] == "" or self.credentials['clientSecret'] == "": self.enabled = False return try: client_credentials_manager = SpotifyClientCredentials(client_id=self.credentials['clientId'], client_secret=self.credentials['clientSecret']) self.sp = spotipy.Spotify(client_credentials_manager=client_credentials_manager) self.sp.user_playlists('spotify') self.enabled = True except Exception: self.enabled = False def getCredentials(self): return self.credentials def setCredentials(self, clientId, clientSecret): # Remove extra spaces, just to be sure clientId = clientId.strip() clientSecret = clientSecret.strip() # Save them to disk self.credentials = { 'clientId': clientId, 'clientSecret': clientSecret} self.saveSettings()