184 lines
6.0 KiB
Python
184 lines
6.0 KiB
Python
from __future__ import unicode_literals
|
|
|
|
import collections
|
|
import logging
|
|
import re
|
|
|
|
from mopidy import backend, models
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
def simplify_search_query(query):
|
|
|
|
if isinstance(query, dict):
|
|
r = []
|
|
for v in query.values():
|
|
if isinstance(v, list):
|
|
r.extend(v)
|
|
else:
|
|
r.append(v)
|
|
return ' '.join(r)
|
|
if isinstance(query, list):
|
|
return ' '.join(query)
|
|
else:
|
|
return query
|
|
|
|
|
|
class FunkwhaleLibraryProvider(backend.LibraryProvider):
|
|
root_directory = models.Ref.directory(
|
|
uri='funkwhale:directory',
|
|
name='Funkwhale'
|
|
)
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super(FunkwhaleLibraryProvider, self).__init__(*args, **kwargs)
|
|
self.vfs = {'funkwhale:directory': collections.OrderedDict()}
|
|
# self.add_to_vfs(new_folder('Favorites', ['favorites']))
|
|
# self.add_to_vfs(new_folder('Following', ['following']))
|
|
# self.add_to_vfs(new_folder('Sets', ['sets']))
|
|
# self.add_to_vfs(new_folder('Stream', ['stream']))
|
|
|
|
def add_to_vfs(self, _model):
|
|
self.vfs['funkwhale:directory'][_model.uri] = _model
|
|
|
|
def list_sets(self):
|
|
sets_vfs = collections.OrderedDict()
|
|
for (name, set_id, tracks) in self.backend.remote.get_sets():
|
|
sets_list = new_folder(name, ['sets', set_id])
|
|
logger.debug('Adding set %s to vfs' % sets_list.name)
|
|
sets_vfs[set_id] = sets_list
|
|
return sets_vfs.values()
|
|
|
|
def list_favorites(self):
|
|
vfs_list = collections.OrderedDict()
|
|
for track in self.backend.remote.get_likes():
|
|
logger.debug('Adding liked track %s to vfs' % track.name)
|
|
vfs_list[track.name] = models.Ref.track(
|
|
uri=track.uri, name=track.name)
|
|
return vfs_list.values()
|
|
|
|
def list_user_follows(self):
|
|
sets_vfs = collections.OrderedDict()
|
|
for (name, user_id) in self.backend.remote.get_followings():
|
|
sets_list = new_folder(name, ['following', user_id])
|
|
logger.debug('Adding set %s to vfs' % sets_list.name)
|
|
sets_vfs[user_id] = sets_list
|
|
return sets_vfs.values()
|
|
|
|
def tracklist_to_vfs(self, track_list):
|
|
vfs_list = collections.OrderedDict()
|
|
for temp_track in track_list:
|
|
if not isinstance(temp_track, Track):
|
|
temp_track = self.backend.remote.parse_track(temp_track)
|
|
if hasattr(temp_track, 'uri'):
|
|
vfs_list[temp_track.name] = models.Ref.track(
|
|
uri=temp_track.uri,
|
|
name=temp_track.name
|
|
)
|
|
return vfs_list.values()
|
|
|
|
def browse(self, uri):
|
|
if not self.vfs.get(uri):
|
|
(req_type, res_id) = re.match(r'.*:(\w*)(?:/(\d*))?', uri).groups()
|
|
|
|
if 'favorites' == req_type:
|
|
return self.list_favorites()
|
|
|
|
# root directory
|
|
return self.vfs.get(uri, {}).values()
|
|
|
|
def search(self, query=None, uris=None, exact=False):
|
|
# TODO Support exact search
|
|
if not query:
|
|
return
|
|
|
|
if 'uri' in query:
|
|
search_query = ''.join(query['uri'])
|
|
url = urlparse(search_query)
|
|
if 'soundcloud.com' in url.netloc:
|
|
logger.info('Resolving SoundCloud for: %s', search_query)
|
|
return SearchResult(
|
|
uri='soundcloud:search',
|
|
tracks=self.backend.remote.resolve_url(search_query)
|
|
)
|
|
else:
|
|
search_query = simplify_search_query(query)
|
|
logger.info('Searching Funkwhale for: %s', search_query)
|
|
raw_results = self.backend.remote.http_client.search(search_query)
|
|
artists = [convert_to_artist(row) for row in raw_results['artists']]
|
|
albums = [convert_to_album(row) for row in raw_results['albums']]
|
|
tracks = [convert_to_track(row) for row in raw_results['tracks']]
|
|
|
|
return models.SearchResult(
|
|
uri='funkwhale:search',
|
|
tracks=tracks,
|
|
albums=albums,
|
|
artists=artists,
|
|
)
|
|
|
|
def lookup(self, uri):
|
|
if 'fw:' in uri:
|
|
uri = uri.replace('fw:', '')
|
|
return self.backend.remote.resolve_url(uri)
|
|
|
|
client = self.backend.remote.http_client
|
|
config = {
|
|
'track': lambda id: [client.get_track(id)],
|
|
'album': lambda id: client.list_tracks({'album': id})['results'],
|
|
'artist': lambda id: client.list_tracks({'artist': id})['results'],
|
|
}
|
|
type, id = parse_uri(uri)
|
|
payload = config[type](id)
|
|
|
|
return [convert_to_track(row) for row in payload]
|
|
|
|
|
|
def parse_uri(uri):
|
|
uri = uri.replace('funkwhale:', '')
|
|
parts = uri.split(':')
|
|
type = parts[0].rstrip('s')
|
|
id = int(parts[1])
|
|
return type, id
|
|
|
|
|
|
def convert_to_artist(payload):
|
|
return models.Artist(
|
|
uri='funkwhale:artists:%s' % (payload['id'],),
|
|
name=payload['name'],
|
|
sortname=payload['name'],
|
|
musicbrainz_id=payload['mbid'],
|
|
)
|
|
|
|
|
|
def convert_to_album(payload):
|
|
artist = convert_to_artist(payload['artist'])
|
|
image = payload['cover']['original'] if payload['cover'] else None
|
|
|
|
return models.Album(
|
|
uri='funkwhale:albums:%s' % (payload['id'],),
|
|
name=payload['title'],
|
|
musicbrainz_id=payload['mbid'],
|
|
images=[image] if image else [],
|
|
artists=[artist],
|
|
date=payload['release_date'],
|
|
num_tracks=len(payload.get('tracks', [])),
|
|
)
|
|
|
|
|
|
def convert_to_track(payload):
|
|
artist = convert_to_artist(payload['artist'])
|
|
album = convert_to_album(payload['album'])
|
|
return models.Track(
|
|
uri='funkwhale:tracks:%s' % (payload['id'],),
|
|
name=payload['title'],
|
|
musicbrainz_id=payload['mbid'],
|
|
artists=[artist],
|
|
album=album,
|
|
date=payload['album']['release_date'],
|
|
bitrate=(payload['bitrate'] or 0) / 1000,
|
|
length=(payload['duration'] or 0) * 1000,
|
|
track_no=payload['position'],
|
|
)
|