Skip to content

Commit

Permalink
clients/spotify/spotifyproxy/tizspotifyproxy.py: recommendations by t…
Browse files Browse the repository at this point in the history
…rack name

and artist name (#710 and #727)
  • Loading branch information
juanrubio committed May 1, 2020
1 parent 223dad1 commit dcb7a0a
Showing 1 changed file with 210 additions and 79 deletions.
289 changes: 210 additions & 79 deletions clients/spotify/spotifyproxy/tizspotifyproxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -616,34 +616,34 @@ def enqueue_new_releases(self, arg):
)
)

#
# Unfortunately, podcasts URLs are not supported by libspotify
#
# def enqueue_show(self, arg):
# """Obtain a show/podcast from Spotify and add all its tracks to the playback
# queue.

# :param arg: a show/podcast search term

# """
# arg_dec = arg
# logging.info("arg : %s", arg_dec)
# print_msg("[Spotify] [Podcast search] '{0}'.".format(arg_dec))
# try:
# self._login_with_user_token('user-read-playback-position')
# count = len(self.queue)
# results = self._spotify.search(arg_dec, limit=10, offset=0, type="show")
# shows = results["shows"]
# for i, show in enumerate(shows["items"]):
# if show:
# self._enqueue_show(show)
# break

# self._remove_explicit_tracks()
# if count == len(self.queue):
# raise ValueError

# self._update_play_queue_order()
#
# Unfortunately, podcasts URLs are not supported by libspotify
#
# def enqueue_show(self, arg):
# """Obtain a show/podcast from Spotify and add all its tracks to the playback
# queue.

# :param arg: a show/podcast search term

# """
# arg_dec = arg
# logging.info("arg : %s", arg_dec)
# print_msg("[Spotify] [Podcast search] '{0}'.".format(arg_dec))
# try:
# self._login_with_user_token('user-read-playback-position')
# count = len(self.queue)
# results = self._spotify.search(arg_dec, limit=10, offset=0, type="show")
# shows = results["shows"]
# for i, show in enumerate(shows["items"]):
# if show:
# self._enqueue_show(show)
# break

# self._remove_explicit_tracks()
# if count == len(self.queue):
# raise ValueError

# self._update_play_queue_order()

except ValueError:
raise ValueError(
Expand Down Expand Up @@ -704,7 +704,12 @@ def enqueue_artist_id(self, id):
self._update_play_queue_order()

except ValueError:
raise ValueError(str("Artist not found : %s" % to_ascii(id)))
raise ValueError(
str(
"Artist not found : %s (or no suitable tracks in queue)"
% to_ascii(arg_dec)
)
)

def enqueue_album_id(self, id):
"""Obtain an album from Spotify and add all its audio tracks to the playback
Expand Down Expand Up @@ -773,23 +778,7 @@ def enqueue_recommendations_by_track_id(self, id):
logging.info("id : %s", id)
print_msg("[Spotify] [Recomendations by track id] '{0}'.".format(id))
try:
self._login_with_client_credentials()
count = len(self.queue)
track_seed = list()
track_seed.append(id)
tracks = self._spotify.recommendations(
seed_artists=None, seed_genres=None, seed_tracks=track_seed, limit=100
)
if tracks:
for track in tracks["tracks"]:
self._enqueue_track(track)

self._remove_explicit_tracks()
if count == len(self.queue):
logging.info("no tracks found with track id : %s", id)
raise ValueError

self._update_play_queue_order()
self._recommendations_by_track_id(id)

except ValueError:
raise ValueError(str("Track not found : %s" % to_ascii(id)))
Expand All @@ -804,26 +793,108 @@ def enqueue_recommendations_by_artist_id(self, id):
logging.info("id : %s", id)
print_msg("[Spotify] [Recomendations by artist id] '{0}'.".format(id))
try:
self._recommendations_by_artist_id(id)

except ValueError:
raise ValueError(str("Artist not found : %s" % to_ascii(id)))

def enqueue_recommendations_by_track(self, arg):
"""Obtain Spotify recommendations by track name and add tracks to the playback
queue.
:param arg: a track name or search term
"""
logging.info("arg : %s", arg)
print_msg("[Spotify] [Recomendations by track] '{0}'.".format(arg))
try:
author = ""
if arg != "":
word_list = arg.split()
if len(word_list) > 2 and word_list[-2] == "by":
author = word_list[-1]
arg = arg.rsplit(" ", 1)[0]
arg = arg.rsplit(" ", 1)[0]
count = len(self.queue)
self._login_with_client_credentials()
track_name = None
artist_name = None
track_dict = dict()
track_names = list()
results = self._spotify.search(arg, limit=20, offset=0, type="track")
tracks = results["tracks"]
for i, trk in enumerate(tracks["items"]):
name = trk["name"]
artist = trk["artists"][0]
artist_name = artist["name"]
print_wrn("[Spotify] [Track] '{0}' by {1}.".format(name, artist_name))
if (
arg.lower() in name.lower()
and author.lower() in artist_name.lower()
):
track_name = name
track = trk
break
if fuzz.partial_ratio(arg, name) > 50:
track_dict[name] = trk
track_names.append(name)

if not track_name:
if len(track_names) > 1:
track_name = process.extractOne(arg, track_names)[0]
track = track_dict[track_name]
elif len(track_names) == 1:
track_name = track_names[0]
track = track_dict[track_name]

if track:
print_nfo(
"[Spotify] [Recomendations by track] Seeding recommendations search with '{0}' by {1}.".format(
track_name, artist_name
)
)
self._recommendations_by_track_id(track["id"])

if count == len(self.queue):
logging.info("no tracks found : %s", arg)
raise ValueError

except ValueError:
raise ValueError(str("Tracks not found : %s" % to_ascii(arg)))

def enqueue_recommendations_by_artist(self, arg):
"""Obtain Spotify recommendations by artist name and add tracks to the playback
queue.
:param arg: an artist name or search term
"""
logging.info("arg : %s", arg)
print_msg("[Spotify] [Recomendations by artist] '{0}'.".format(arg))
try:
count = len(self.queue)
artist_seed = list()
artist_seed.append(id)
tracks = self._spotify.recommendations(
seed_artists=artist_seed, seed_genres=None, seed_tracks=None, limit=100
self._login_with_client_credentials()
artist = self._search_artists(arg)
if not artist:
results = self._spotify.search(arg, limit=1, offset=0, type="track")
if results:
tracks = results["tracks"]
track = tracks["items"][0]
artist = track["artists"][0]

print_nfo(
"[Spotify] [Recommendations by artist] Seeding recommendations search with '{0}'.".format(
artist["name"]
)
)
if tracks:
for track in tracks["tracks"]:
self._enqueue_track(track)
self._recommendations_by_artist_id(artist["id"])

self._remove_explicit_tracks()
if count == len(self.queue):
logging.info("not tracks found with artist id : %s", id)
logging.info("no tracks found : %s", arg)
raise ValueError

self._update_play_queue_order()

except ValueError:
raise ValueError(str("Artist not found : %s" % to_ascii(id)))
raise ValueError(str("Artist not found : %s" % to_ascii(arg)))

def enqueue_recommendations_by_genre(self, arg):
"""Obtain Spotify recommendations by genre and add tracks to the playback
Expand Down Expand Up @@ -1305,27 +1376,27 @@ def _enqueue_album(self, album):
except:
pass

#
# Unfortunately, podcasts URLs are not supported by libspotify
#
# def _enqueue_show(self, show):
# """ Add an show tracks to the playback queue.

# :param show: a show object

# """
# logging.info("_enqueue_show")
# if show:
# show_name = show["name"]
# print_wrn("[Spotify] [Show] '{0}'.".format(show_name))
# try:
# results = self._spotify.show_episodes(show["id"], limit=50, offset=0)
# for track in results["items"]:
# #pprint(track)
# track_info = TrackInfo(track, show_name)
# self._add_to_playback_queue(track_info)
# except:
# pass
#
# Unfortunately, podcasts URLs are not supported by libspotify
#
# def _enqueue_show(self, show):
# """ Add an show tracks to the playback queue.

# :param show: a show object

# """
# logging.info("_enqueue_show")
# if show:
# show_name = show["name"]
# print_wrn("[Spotify] [Show] '{0}'.".format(show_name))
# try:
# results = self._spotify.show_episodes(show["id"], limit=50, offset=0)
# for track in results["items"]:
# #pprint(track)
# track_info = TrackInfo(track, show_name)
# self._add_to_playback_queue(track_info)
# except:
# pass

def _enqueue_playlist(self, playlist):
""" Add an playlist tracks to the playback queue.
Expand Down Expand Up @@ -1448,6 +1519,66 @@ def _search_playlist(self, arg, owner=None, is_featured=False):

return playlist

def _recommendations_by_track_id(self, id):
"""Obtain Spotify recommendations by track id and add tracks to the playback
queue.
:param id: a Spotify track ID, URI, or URL
"""
logging.info("id : %s", id)
try:
self._login_with_client_credentials()
count = len(self.queue)
track_seed = list()
track_seed.append(id)
tracks = self._spotify.recommendations(
seed_artists=None, seed_genres=None, seed_tracks=track_seed, limit=100
)
if tracks:
for track in tracks["tracks"]:
self._enqueue_track(track)

self._remove_explicit_tracks()
if count == len(self.queue):
logging.info("no tracks found with track id : %s", id)
raise ValueError

self._update_play_queue_order()

except ValueError:
raise ValueError(str("Track not found : %s" % to_ascii(id)))

def _recommendations_by_artist_id(self, id):
"""Obtain Spotify recommendations by artist id and add tracks to the playback
queue.
:param id: a Spotify artist ID, URI, or URL
"""
logging.info("id : %s", id)
try:
self._login_with_client_credentials()
count = len(self.queue)
artist_seed = list()
artist_seed.append(id)
tracks = self._spotify.recommendations(
seed_artists=artist_seed, seed_genres=None, seed_tracks=None, limit=100
)
if tracks:
for track in tracks["tracks"]:
self._enqueue_track(track)

self._remove_explicit_tracks()
if count == len(self.queue):
logging.info("not tracks found with artist id : %s", id)
raise ValueError

self._update_play_queue_order()

except ValueError:
raise ValueError(str("Artist not found : %s" % to_ascii(id)))

def _update_play_queue_order(self):
""" Update the queue playback order.
Expand Down

0 comments on commit dcb7a0a

Please sign in to comment.