Poc of browsing favorites with cache
parent
8d51087d87
commit
8696450267
|
@ -9,39 +9,40 @@ import os
|
||||||
|
|
||||||
|
|
||||||
__author__ = """Eliot Berriot"""
|
__author__ = """Eliot Berriot"""
|
||||||
__email__ = 'contact+funkwhale@eliotberriot.com'
|
__email__ = "contact+funkwhale@eliotberriot.com"
|
||||||
__version__ = '0.1.0'
|
__version__ = "0.1.0"
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class Extension(mopidy.ext.Extension):
|
class Extension(mopidy.ext.Extension):
|
||||||
|
|
||||||
dist_name = 'Mopidy-Funkwhale'
|
dist_name = "Mopidy-Funkwhale"
|
||||||
ext_name = 'funkwhale'
|
ext_name = "funkwhale"
|
||||||
version = __version__
|
version = __version__
|
||||||
|
|
||||||
def get_default_config(self):
|
def get_default_config(self):
|
||||||
conf_file = os.path.join(os.path.dirname(__file__), 'ext.conf')
|
conf_file = os.path.join(os.path.dirname(__file__), "ext.conf")
|
||||||
return mopidy.config.read(conf_file)
|
return mopidy.config.read(conf_file)
|
||||||
|
|
||||||
def get_config_schema(self):
|
def get_config_schema(self):
|
||||||
schema = super(Extension, self).get_config_schema()
|
schema = super(Extension, self).get_config_schema()
|
||||||
schema['url'] = mopidy.config.String()
|
schema["url"] = mopidy.config.String()
|
||||||
schema['username'] = mopidy.config.String(optional=True)
|
schema["username"] = mopidy.config.String(optional=True)
|
||||||
schema['password'] = mopidy.config.Secret(optional=True)
|
schema["password"] = mopidy.config.Secret(optional=True)
|
||||||
return schema
|
return schema
|
||||||
|
|
||||||
def validate_config(self, config):
|
def validate_config(self, config):
|
||||||
if not config.getboolean('funkwhale', 'enabled'):
|
if not config.getboolean("funkwhale", "enabled"):
|
||||||
return
|
return
|
||||||
username = config.getstring('funkwhale', 'username')
|
username = config.getstring("funkwhale", "username")
|
||||||
password = config.getstring('funkwhale', 'password')
|
password = config.getstring("funkwhale", "password")
|
||||||
if any([username, password]) and not all([username, password]):
|
if any([username, password]) and not all([username, password]):
|
||||||
raise mopidy.ext.ExtensionError(
|
raise mopidy.ext.ExtensionError(
|
||||||
'You need to provide username and password to authenticate with the funkwhale backend'
|
"You need to provide username and password to authenticate with the funkwhale backend"
|
||||||
)
|
)
|
||||||
|
|
||||||
def setup(self, registry):
|
def setup(self, registry):
|
||||||
from . import actor
|
from . import actor
|
||||||
registry.add('backend', actor.FunkwhaleBackend)
|
|
||||||
|
registry.add("backend", actor.FunkwhaleBackend)
|
||||||
|
|
|
@ -14,34 +14,37 @@ logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class FunkwhaleBackend(pykka.ThreadingActor, backend.Backend):
|
class FunkwhaleBackend(pykka.ThreadingActor, backend.Backend):
|
||||||
|
|
||||||
def __init__(self, config, audio):
|
def __init__(self, config, audio):
|
||||||
super(FunkwhaleBackend, self).__init__()
|
super(FunkwhaleBackend, self).__init__()
|
||||||
self.config = config
|
self.config = config
|
||||||
self.remote = client.FunkwhaleClient(config)
|
self.client = client.APIClient(config)
|
||||||
self.library = library.FunkwhaleLibraryProvider(backend=self)
|
self.library = library.FunkwhaleLibraryProvider(backend=self)
|
||||||
self.playback = FunkwhalePlaybackProvider(audio=audio, backend=self)
|
self.playback = FunkwhalePlaybackProvider(audio=audio, backend=self)
|
||||||
|
|
||||||
self.uri_schemes = ['funkwhale', 'fw']
|
self.uri_schemes = ["funkwhale", "fw"]
|
||||||
|
|
||||||
def on_start(self):
|
def on_start(self):
|
||||||
username = self.remote.username
|
if self.client.username is not None:
|
||||||
if username is not None:
|
self.client.login()
|
||||||
logger.info('Logged in to Funkwhale as "%s" on "%s"', username, self.config['funkwhale']['url'])
|
logger.info(
|
||||||
|
'Logged in to Funkwhale as "%s" on "%s"',
|
||||||
|
self.client.username,
|
||||||
|
self.config["funkwhale"]["url"],
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
logger.info('Using "%s" anonymously', self.config['funkwhale']['url'])
|
logger.info('Using "%s" anonymously', self.config["funkwhale"]["url"])
|
||||||
|
|
||||||
|
|
||||||
class FunkwhalePlaybackProvider(backend.PlaybackProvider):
|
class FunkwhalePlaybackProvider(backend.PlaybackProvider):
|
||||||
def translate_uri(self, uri):
|
def translate_uri(self, uri):
|
||||||
_, id = library.parse_uri(uri)
|
_, id = library.parse_uri(uri)
|
||||||
track = self.backend.remote.http_client.get_track(id)
|
track = self.backend.client.get_track(id)
|
||||||
|
|
||||||
if track is None:
|
if track is None:
|
||||||
return None
|
return None
|
||||||
url = track['listen_url']
|
url = track["listen_url"]
|
||||||
if url.startswith('/'):
|
if url.startswith("/"):
|
||||||
url = self.backend.config['funkwhale']['url'] + url
|
url = self.backend.config["funkwhale"]["url"] + url
|
||||||
if self.backend.remote.token:
|
if self.backend.client.token:
|
||||||
url += '?jwt=' + self.backend.remote.token
|
url += "?jwt=" + self.backend.client.token
|
||||||
return url
|
return url
|
||||||
|
|
|
@ -23,68 +23,66 @@ class SessionWithUrlBase(requests.Session):
|
||||||
|
|
||||||
|
|
||||||
def get_requests_session(url, proxy_config, user_agent):
|
def get_requests_session(url, proxy_config, user_agent):
|
||||||
if not url.endswith('/'):
|
if not url.endswith("/"):
|
||||||
url += '/'
|
url += "/"
|
||||||
url += 'api/v1/'
|
url += "api/v1/"
|
||||||
|
|
||||||
proxy = httpclient.format_proxy(proxy_config)
|
proxy = httpclient.format_proxy(proxy_config)
|
||||||
full_user_agent = httpclient.format_user_agent(user_agent)
|
full_user_agent = httpclient.format_user_agent(user_agent)
|
||||||
|
|
||||||
session = SessionWithUrlBase(url_base=url)
|
session = SessionWithUrlBase(url_base=url)
|
||||||
session.proxies.update({'http': proxy, 'https': proxy})
|
session.proxies.update({"http": proxy, "https": proxy})
|
||||||
session.headers.update({'user-agent': full_user_agent})
|
session.headers.update({"user-agent": full_user_agent})
|
||||||
|
|
||||||
return session
|
return session
|
||||||
|
|
||||||
|
|
||||||
def login(session, username, password):
|
def login(session, username, password):
|
||||||
response = session.post('token/', {'username': username, 'password': password})
|
if not username:
|
||||||
|
return
|
||||||
|
response = session.post("token/", {"username": username, "password": password})
|
||||||
try:
|
try:
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
except requests.exceptions.HTTPError:
|
except requests.exceptions.HTTPError:
|
||||||
raise exceptions.BackendError('Authentication failed for user %s' % (username,))
|
raise exceptions.BackendError("Authentication failed for user %s" % (username,))
|
||||||
token = response.json()['token']
|
token = response.json()["token"]
|
||||||
session.headers.update({'Authorization': 'JWT %s' % (token,)})
|
session.headers.update({"Authorization": "JWT %s" % (token,)})
|
||||||
return token
|
return token
|
||||||
|
|
||||||
|
|
||||||
class APIClient(object):
|
class APIClient(object):
|
||||||
def __init__(self, session):
|
def __init__(self, config):
|
||||||
self.session = session
|
self.config = config
|
||||||
|
self.session = get_requests_session(
|
||||||
|
config["funkwhale"]["url"],
|
||||||
|
proxy_config=config["proxy"],
|
||||||
|
user_agent="%s/%s" % (Extension.dist_name, __version__),
|
||||||
|
)
|
||||||
|
self.username = self.config["funkwhale"]["username"]
|
||||||
|
self.token = None
|
||||||
|
|
||||||
|
def login(self):
|
||||||
|
self.username = self.config["funkwhale"]["username"]
|
||||||
|
if self.username:
|
||||||
|
self.token = login(
|
||||||
|
self.session,
|
||||||
|
self.config["funkwhale"]["username"],
|
||||||
|
self.config["funkwhale"]["password"],
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
self.token = None
|
||||||
|
|
||||||
def search(self, query):
|
def search(self, query):
|
||||||
response = self.session.get('search', params={'query': query})
|
response = self.session.get("search", params={"query": query})
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
return response.json()
|
return response.json()
|
||||||
|
|
||||||
def get_track(self, id):
|
def get_track(self, id):
|
||||||
response = self.session.get('tracks/{}/'.format(id))
|
response = self.session.get("tracks/{}/".format(id))
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
return response.json()
|
return response.json()
|
||||||
|
|
||||||
def list_tracks(self, filters):
|
def list_tracks(self, filters):
|
||||||
response = self.session.get('tracks/', params=filters)
|
response = self.session.get("tracks/", params=filters)
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
return response.json()
|
return response.json()
|
||||||
|
|
||||||
|
|
||||||
class FunkwhaleClient(object):
|
|
||||||
|
|
||||||
def __init__(self, config):
|
|
||||||
super(FunkwhaleClient, self).__init__()
|
|
||||||
self.page_size = config['funkwhale'].get('page_size', 50)
|
|
||||||
self.http_client = APIClient(
|
|
||||||
session=get_requests_session(
|
|
||||||
config['funkwhale']['url'],
|
|
||||||
proxy_config=config['proxy'],
|
|
||||||
user_agent='%s/%s' % (
|
|
||||||
Extension.dist_name,
|
|
||||||
__version__),
|
|
||||||
)
|
|
||||||
)
|
|
||||||
self.username = config['funkwhale']['username']
|
|
||||||
self.token = None
|
|
||||||
if config['funkwhale']['username']:
|
|
||||||
self.token = login(
|
|
||||||
self.http_client.session,
|
|
||||||
config['funkwhale']['username'],
|
|
||||||
config['funkwhale']['password'])
|
|
||||||
|
|
|
@ -3,12 +3,21 @@ from __future__ import unicode_literals
|
||||||
import collections
|
import collections
|
||||||
import logging
|
import logging
|
||||||
import re
|
import re
|
||||||
|
import time
|
||||||
|
import urllib
|
||||||
|
|
||||||
from mopidy import backend, models
|
from mopidy import backend, models
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def generate_uri(path):
|
||||||
|
return "funkwhale:directory:%s" % path
|
||||||
|
|
||||||
|
|
||||||
|
def new_folder(name, path):
|
||||||
|
return models.Ref.directory(uri=generate_uri(path), name=name)
|
||||||
|
|
||||||
|
|
||||||
def simplify_search_query(query):
|
def simplify_search_query(query):
|
||||||
|
|
||||||
|
@ -19,165 +28,185 @@ def simplify_search_query(query):
|
||||||
r.extend(v)
|
r.extend(v)
|
||||||
else:
|
else:
|
||||||
r.append(v)
|
r.append(v)
|
||||||
return ' '.join(r)
|
return " ".join(r)
|
||||||
if isinstance(query, list):
|
if isinstance(query, list):
|
||||||
return ' '.join(query)
|
return " ".join(query)
|
||||||
else:
|
else:
|
||||||
return query
|
return query
|
||||||
|
|
||||||
|
|
||||||
|
class Cache(collections.OrderedDict):
|
||||||
|
def __init__(self, max_age=0):
|
||||||
|
self.max_age = max_age
|
||||||
|
super(Cache, self).__init__()
|
||||||
|
|
||||||
|
def set(self, key, value):
|
||||||
|
now = time.time()
|
||||||
|
self[key] = (now, value)
|
||||||
|
|
||||||
|
def get(self, key):
|
||||||
|
value = super(Cache, self).get(key)
|
||||||
|
if value is None:
|
||||||
|
return
|
||||||
|
now = time.time()
|
||||||
|
t, v = value
|
||||||
|
if self.max_age and t + self.max_age < now:
|
||||||
|
# entry is too old, we delete it
|
||||||
|
del self[key]
|
||||||
|
return None
|
||||||
|
return v
|
||||||
|
|
||||||
|
|
||||||
class FunkwhaleLibraryProvider(backend.LibraryProvider):
|
class FunkwhaleLibraryProvider(backend.LibraryProvider):
|
||||||
root_directory = models.Ref.directory(
|
root_directory = models.Ref.directory(uri="funkwhale:directory", name="Funkwhale")
|
||||||
uri='funkwhale:directory',
|
|
||||||
name='Funkwhale'
|
|
||||||
)
|
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
super(FunkwhaleLibraryProvider, self).__init__(*args, **kwargs)
|
super(FunkwhaleLibraryProvider, self).__init__(*args, **kwargs)
|
||||||
self.vfs = {'funkwhale:directory': collections.OrderedDict()}
|
self.vfs = {"funkwhale:directory": collections.OrderedDict()}
|
||||||
# self.add_to_vfs(new_folder('Favorites', ['favorites']))
|
self.add_to_vfs(new_folder("Favorites", "favorites"))
|
||||||
# self.add_to_vfs(new_folder('Following', ['following']))
|
# self.add_to_vfs(new_folder('Following', ['following']))
|
||||||
# self.add_to_vfs(new_folder('Sets', ['sets']))
|
# self.add_to_vfs(new_folder('Sets', ['sets']))
|
||||||
# self.add_to_vfs(new_folder('Stream', ['stream']))
|
# self.add_to_vfs(new_folder('Stream', ['stream']))
|
||||||
|
self.cache = Cache()
|
||||||
|
|
||||||
def add_to_vfs(self, _model):
|
def add_to_vfs(self, _model):
|
||||||
self.vfs['funkwhale:directory'][_model.uri] = _model
|
self.vfs["funkwhale:directory"][_model.uri] = _model
|
||||||
|
|
||||||
def list_sets(self):
|
|
||||||
sets_vfs = collections.OrderedDict()
|
|
||||||
for (name, set_id, tracks) in self.backend.remote.get_sets():
|
|
||||||
sets_list = new_folder(name, ['sets', set_id])
|
|
||||||
logger.debug('Adding set %s to vfs' % sets_list.name)
|
|
||||||
sets_vfs[set_id] = sets_list
|
|
||||||
return sets_vfs.values()
|
|
||||||
|
|
||||||
def list_favorites(self):
|
|
||||||
vfs_list = collections.OrderedDict()
|
|
||||||
for track in self.backend.remote.get_likes():
|
|
||||||
logger.debug('Adding liked track %s to vfs' % track.name)
|
|
||||||
vfs_list[track.name] = models.Ref.track(
|
|
||||||
uri=track.uri, name=track.name)
|
|
||||||
return vfs_list.values()
|
|
||||||
|
|
||||||
def list_user_follows(self):
|
|
||||||
sets_vfs = collections.OrderedDict()
|
|
||||||
for (name, user_id) in self.backend.remote.get_followings():
|
|
||||||
sets_list = new_folder(name, ['following', user_id])
|
|
||||||
logger.debug('Adding set %s to vfs' % sets_list.name)
|
|
||||||
sets_vfs[user_id] = sets_list
|
|
||||||
return sets_vfs.values()
|
|
||||||
|
|
||||||
def tracklist_to_vfs(self, track_list):
|
|
||||||
vfs_list = collections.OrderedDict()
|
|
||||||
for temp_track in track_list:
|
|
||||||
if not isinstance(temp_track, Track):
|
|
||||||
temp_track = self.backend.remote.parse_track(temp_track)
|
|
||||||
if hasattr(temp_track, 'uri'):
|
|
||||||
vfs_list[temp_track.name] = models.Ref.track(
|
|
||||||
uri=temp_track.uri,
|
|
||||||
name=temp_track.name
|
|
||||||
)
|
|
||||||
return vfs_list.values()
|
|
||||||
|
|
||||||
def browse(self, uri):
|
def browse(self, uri):
|
||||||
if not self.vfs.get(uri):
|
if not self.vfs.get(uri):
|
||||||
(req_type, res_id) = re.match(r'.*:(\w*)(?:/(\d*))?', uri).groups()
|
if uri.startswith("funkwhale:directory:"):
|
||||||
|
uri = uri.replace("funkwhale:directory:", "", 1)
|
||||||
if 'favorites' == req_type:
|
parts = uri.split(":")
|
||||||
return self.list_favorites()
|
remaining = parts[1:] if len(parts) > 1 else []
|
||||||
|
print("PARTS", parts, remaining)
|
||||||
|
handler = getattr(self, "browse_%s" % parts[0])
|
||||||
|
return handler(remaining)
|
||||||
|
|
||||||
# root directory
|
# root directory
|
||||||
return self.vfs.get(uri, {}).values()
|
return self.vfs.get(uri, {}).values()
|
||||||
|
|
||||||
|
def browse_favorites(self, remaining):
|
||||||
|
if remaining == []:
|
||||||
|
return [
|
||||||
|
new_folder("Recent", "favorites:recent"),
|
||||||
|
new_folder("By artist", "favorites:by-artist"),
|
||||||
|
]
|
||||||
|
|
||||||
|
if remaining == ["recent"]:
|
||||||
|
payload = self.backend.client.list_tracks(
|
||||||
|
{"favorites": "true", "ordering": "-creation_date", "page_size": 100}
|
||||||
|
)["results"]
|
||||||
|
return [
|
||||||
|
convert_to_track(row, ref=True, cache=self.cache) for row in payload
|
||||||
|
]
|
||||||
|
return []
|
||||||
|
|
||||||
def search(self, query=None, uris=None, exact=False):
|
def search(self, query=None, uris=None, exact=False):
|
||||||
# TODO Support exact search
|
# TODO Support exact search
|
||||||
if not query:
|
if not query:
|
||||||
return
|
return
|
||||||
|
|
||||||
if 'uri' in query:
|
|
||||||
search_query = ''.join(query['uri'])
|
|
||||||
url = urlparse(search_query)
|
|
||||||
if 'soundcloud.com' in url.netloc:
|
|
||||||
logger.info('Resolving SoundCloud for: %s', search_query)
|
|
||||||
return SearchResult(
|
|
||||||
uri='soundcloud:search',
|
|
||||||
tracks=self.backend.remote.resolve_url(search_query)
|
|
||||||
)
|
|
||||||
else:
|
else:
|
||||||
search_query = simplify_search_query(query)
|
search_query = simplify_search_query(query)
|
||||||
logger.info('Searching Funkwhale for: %s', search_query)
|
logger.info("Searching Funkwhale for: %s", search_query)
|
||||||
raw_results = self.backend.remote.http_client.search(search_query)
|
raw_results = self.backend.client.search(search_query)
|
||||||
artists = [convert_to_artist(row) for row in raw_results['artists']]
|
artists = [convert_to_artist(row) for row in raw_results["artists"]]
|
||||||
albums = [convert_to_album(row) for row in raw_results['albums']]
|
albums = [convert_to_album(row) for row in raw_results["albums"]]
|
||||||
tracks = [convert_to_track(row) for row in raw_results['tracks']]
|
tracks = [convert_to_track(row) for row in raw_results["tracks"]]
|
||||||
|
|
||||||
return models.SearchResult(
|
return models.SearchResult(
|
||||||
uri='funkwhale:search',
|
uri="funkwhale:search", tracks=tracks, albums=albums, artists=artists
|
||||||
tracks=tracks,
|
|
||||||
albums=albums,
|
|
||||||
artists=artists,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
def lookup(self, uri):
|
def lookup(self, uri):
|
||||||
if 'fw:' in uri:
|
print("CACHE", self.cache, uri)
|
||||||
uri = uri.replace('fw:', '')
|
from_cache = self.cache.get(uri)
|
||||||
|
if from_cache:
|
||||||
|
try:
|
||||||
|
len(from_cache)
|
||||||
|
return from_cache
|
||||||
|
except TypeError:
|
||||||
|
return [from_cache]
|
||||||
|
|
||||||
|
if "fw:" in uri:
|
||||||
|
uri = uri.replace("fw:", "")
|
||||||
return self.backend.remote.resolve_url(uri)
|
return self.backend.remote.resolve_url(uri)
|
||||||
|
|
||||||
client = self.backend.remote.http_client
|
client = self.backend.client
|
||||||
config = {
|
config = {
|
||||||
'track': lambda id: [client.get_track(id)],
|
"track": lambda id: [client.get_track(id)],
|
||||||
'album': lambda id: client.list_tracks({'album': id})['results'],
|
"album": lambda id: client.list_tracks({"album": id})["results"],
|
||||||
'artist': lambda id: client.list_tracks({'artist': id})['results'],
|
"artist": lambda id: client.list_tracks({"artist": id})["results"],
|
||||||
}
|
}
|
||||||
|
|
||||||
type, id = parse_uri(uri)
|
type, id = parse_uri(uri)
|
||||||
payload = config[type](id)
|
payload = config[type](id)
|
||||||
|
return [convert_to_track(row, cache=self.cache) for row in payload]
|
||||||
return [convert_to_track(row) for row in payload]
|
|
||||||
|
|
||||||
|
|
||||||
def parse_uri(uri):
|
def parse_uri(uri):
|
||||||
uri = uri.replace('funkwhale:', '')
|
uri = uri.replace("funkwhale:", "", 1)
|
||||||
parts = uri.split(':')
|
parts = uri.split(":")
|
||||||
type = parts[0].rstrip('s')
|
type = parts[0].rstrip("s")
|
||||||
id = int(parts[1])
|
id = int(parts[1])
|
||||||
return type, id
|
return type, id
|
||||||
|
|
||||||
|
|
||||||
|
def cast_to_ref(f):
|
||||||
|
def inner(payload, ref=False, cache=None):
|
||||||
|
result = f(payload)
|
||||||
|
if cache is not None:
|
||||||
|
cache.set(result.uri, result)
|
||||||
|
if ref:
|
||||||
|
return to_ref(result)
|
||||||
|
return result
|
||||||
|
|
||||||
|
return inner
|
||||||
|
|
||||||
|
|
||||||
|
@cast_to_ref
|
||||||
def convert_to_artist(payload):
|
def convert_to_artist(payload):
|
||||||
return models.Artist(
|
return models.Artist(
|
||||||
uri='funkwhale:artists:%s' % (payload['id'],),
|
uri="funkwhale:artists:%s" % (payload["id"],),
|
||||||
name=payload['name'],
|
name=payload["name"],
|
||||||
sortname=payload['name'],
|
sortname=payload["name"],
|
||||||
musicbrainz_id=payload['mbid'],
|
musicbrainz_id=payload["mbid"],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@cast_to_ref
|
||||||
def convert_to_album(payload):
|
def convert_to_album(payload):
|
||||||
artist = convert_to_artist(payload['artist'])
|
artist = convert_to_artist(payload["artist"])
|
||||||
image = payload['cover']['original'] if payload['cover'] else None
|
image = payload["cover"]["original"] if payload["cover"] else None
|
||||||
|
|
||||||
return models.Album(
|
return models.Album(
|
||||||
uri='funkwhale:albums:%s' % (payload['id'],),
|
uri="funkwhale:albums:%s" % (payload["id"],),
|
||||||
name=payload['title'],
|
name=payload["title"],
|
||||||
musicbrainz_id=payload['mbid'],
|
musicbrainz_id=payload["mbid"],
|
||||||
images=[image] if image else [],
|
images=[image] if image else [],
|
||||||
artists=[artist],
|
artists=[artist],
|
||||||
date=payload['release_date'],
|
date=payload["release_date"],
|
||||||
num_tracks=len(payload.get('tracks', [])),
|
num_tracks=len(payload.get("tracks", [])),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@cast_to_ref
|
||||||
def convert_to_track(payload):
|
def convert_to_track(payload):
|
||||||
artist = convert_to_artist(payload['artist'])
|
artist = convert_to_artist(payload["artist"])
|
||||||
album = convert_to_album(payload['album'])
|
album = convert_to_album(payload["album"])
|
||||||
return models.Track(
|
return models.Track(
|
||||||
uri='funkwhale:tracks:%s' % (payload['id'],),
|
uri="funkwhale:tracks:%s" % (payload["id"],),
|
||||||
name=payload['title'],
|
name=payload["title"],
|
||||||
musicbrainz_id=payload['mbid'],
|
musicbrainz_id=payload["mbid"],
|
||||||
artists=[artist],
|
artists=[artist],
|
||||||
album=album,
|
album=album,
|
||||||
date=payload['album']['release_date'],
|
date=payload["album"]["release_date"],
|
||||||
bitrate=(payload['bitrate'] or 0) / 1000,
|
bitrate=(payload["bitrate"] or 0) / 1000,
|
||||||
length=(payload['duration'] or 0) * 1000,
|
length=(payload["duration"] or 0) * 1000,
|
||||||
track_no=payload['position'],
|
track_no=payload["position"],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def to_ref(obj):
|
||||||
|
getter = getattr(models.Ref, obj.__class__.__name__.lower())
|
||||||
|
return getter(uri=obj.uri, name=obj.name)
|
||||||
|
|
|
@ -35,6 +35,7 @@ test =
|
||||||
pytest-cov
|
pytest-cov
|
||||||
requests-mock
|
requests-mock
|
||||||
pytest-mock
|
pytest-mock
|
||||||
|
factory_boy
|
||||||
|
|
||||||
dev =
|
dev =
|
||||||
ipython
|
ipython
|
||||||
|
|
|
@ -0,0 +1,37 @@
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
import mopidy_funkwhale.actor
|
||||||
|
import mopidy_funkwhale.client
|
||||||
|
import mopidy_funkwhale.library
|
||||||
|
|
||||||
|
FUNKWHALE_URL = "https://test.funkwhale"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture()
|
||||||
|
def config():
|
||||||
|
return {
|
||||||
|
"funkwhale": {"url": FUNKWHALE_URL, "username": "user", "password": "passw0rd"},
|
||||||
|
"proxy": {},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def backend(config):
|
||||||
|
return mopidy_funkwhale.actor.FunkwhaleBackend(config=config, audio=None)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture()
|
||||||
|
def session(backend):
|
||||||
|
return mopidy_funkwhale.client.get_requests_session(
|
||||||
|
FUNKWHALE_URL, {}, "test/something"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture()
|
||||||
|
def client(backend, session):
|
||||||
|
return backend.client
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def library(backend):
|
||||||
|
return backend.library
|
|
@ -0,0 +1,54 @@
|
||||||
|
import random
|
||||||
|
|
||||||
|
from mopidy import models
|
||||||
|
|
||||||
|
import factory
|
||||||
|
|
||||||
|
|
||||||
|
class ArtistJSONFactory(factory.Factory):
|
||||||
|
id = factory.Sequence(int)
|
||||||
|
mbid = factory.Faker("uuid4")
|
||||||
|
name = factory.Faker("name")
|
||||||
|
|
||||||
|
class Meta:
|
||||||
|
model = dict
|
||||||
|
|
||||||
|
|
||||||
|
class CoverJSONFactory(factory.Factory):
|
||||||
|
original = factory.Faker("url")
|
||||||
|
|
||||||
|
class Meta:
|
||||||
|
model = dict
|
||||||
|
|
||||||
|
|
||||||
|
class AlbumJSONFactory(factory.Factory):
|
||||||
|
id = factory.Sequence(int)
|
||||||
|
mbid = factory.Faker("uuid4")
|
||||||
|
title = factory.Faker("name")
|
||||||
|
tracks = factory.Iterator([range(i) for i in range(1, 30)])
|
||||||
|
artist = factory.SubFactory(ArtistJSONFactory)
|
||||||
|
release_date = factory.Faker("date")
|
||||||
|
cover = factory.SubFactory(CoverJSONFactory)
|
||||||
|
|
||||||
|
class Meta:
|
||||||
|
model = dict
|
||||||
|
|
||||||
|
|
||||||
|
class TrackJSONFactory(factory.Factory):
|
||||||
|
id = factory.Sequence(int)
|
||||||
|
mbid = factory.Faker("uuid4")
|
||||||
|
title = factory.Faker("name")
|
||||||
|
position = factory.Faker("pyint")
|
||||||
|
duration = factory.Faker("pyint")
|
||||||
|
creation_date = factory.Faker("date")
|
||||||
|
bitrate = factory.Iterator([i * 1000 for i in (128, 256, 360)])
|
||||||
|
artist = factory.SubFactory(ArtistJSONFactory)
|
||||||
|
album = factory.SubFactory(AlbumJSONFactory)
|
||||||
|
|
||||||
|
class Meta:
|
||||||
|
model = dict
|
||||||
|
|
||||||
|
|
||||||
|
class ArtistFactory(factory.Factory):
|
||||||
|
class Meta:
|
||||||
|
model = models.Artist
|
|
@ -1,42 +1,26 @@
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
import mopidy_funkwhale.client
|
|
||||||
|
|
||||||
FUNKWHALE_URL = 'https://test.funkwhale'
|
|
||||||
FUNKWHALE_API_URL = FUNKWHALE_URL + '/api/v1/'
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture()
|
|
||||||
def session():
|
|
||||||
return mopidy_funkwhale.client.get_requests_session(
|
|
||||||
FUNKWHALE_URL,
|
|
||||||
{},
|
|
||||||
'test/something'
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture()
|
|
||||||
def client(session):
|
|
||||||
return mopidy_funkwhale.client.APIClient(session)
|
|
||||||
|
|
||||||
|
|
||||||
def test_client_search(client, requests_mock):
|
def test_client_search(client, requests_mock):
|
||||||
requests_mock.get(FUNKWHALE_API_URL + 'search?query=myquery', json={'hello': 'world'})
|
requests_mock.get(
|
||||||
|
client.session.url_base + "search?query=myquery", json={"hello": "world"}
|
||||||
|
)
|
||||||
|
|
||||||
result = client.search('myquery')
|
result = client.search("myquery")
|
||||||
assert result == {'hello': 'world'}
|
assert result == {"hello": "world"}
|
||||||
|
|
||||||
|
|
||||||
def test_client_get_track(client, requests_mock):
|
def test_client_get_track(client, requests_mock):
|
||||||
requests_mock.get(FUNKWHALE_API_URL + 'tracks/12/', json={'hello': 'world'})
|
requests_mock.get(client.session.url_base + "tracks/12/", json={"hello": "world"})
|
||||||
|
|
||||||
result = client.get_track(12)
|
result = client.get_track(12)
|
||||||
assert result == {'hello': 'world'}
|
assert result == {"hello": "world"}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def test_client_list_tracks(client, requests_mock):
|
def test_client_list_tracks(client, requests_mock):
|
||||||
requests_mock.get(FUNKWHALE_API_URL + 'tracks/?artist=12', json={'hello': 'world'})
|
requests_mock.get(
|
||||||
|
client.session.url_base + "tracks/?artist=12", json={"hello": "world"}
|
||||||
|
)
|
||||||
|
|
||||||
result = client.list_tracks({'artist': 12})
|
result = client.list_tracks({"artist": 12})
|
||||||
assert result == {'hello': 'world'}
|
assert result == {"hello": "world"}
|
||||||
|
|
|
@ -8,17 +8,17 @@ def test_get_default_config():
|
||||||
|
|
||||||
config = ext.get_default_config()
|
config = ext.get_default_config()
|
||||||
|
|
||||||
assert '[funkwhale]' in config
|
assert "[funkwhale]" in config
|
||||||
assert 'enabled = true' in config
|
assert "enabled = true" in config
|
||||||
assert 'url = https://demo.funkwhale.audio' in config
|
assert "url = https://demo.funkwhale.audio" in config
|
||||||
assert 'username = demo' in config
|
assert "username = demo" in config
|
||||||
assert 'password = demo' in config
|
assert "password = demo" in config
|
||||||
|
|
||||||
|
|
||||||
def test_get_config_schema():
|
def test_get_config_schema():
|
||||||
ext = mopidy_funkwhale.Extension()
|
ext = mopidy_funkwhale.Extension()
|
||||||
|
|
||||||
schema = ext.get_config_schema()
|
schema = ext.get_config_schema()
|
||||||
assert 'url' in schema
|
assert "url" in schema
|
||||||
assert 'username' in schema
|
assert "username" in schema
|
||||||
assert 'password' in schema
|
assert "password" in schema
|
||||||
|
|
|
@ -1,24 +1,23 @@
|
||||||
import pytest
|
import pytest
|
||||||
|
import time
|
||||||
import uuid
|
import uuid
|
||||||
from mopidy import models
|
from mopidy import models
|
||||||
|
|
||||||
import mopidy_funkwhale.library
|
import mopidy_funkwhale.library
|
||||||
|
|
||||||
|
from . import factories
|
||||||
|
|
||||||
|
|
||||||
def test_convert_artist_to_model():
|
def test_convert_artist_to_model():
|
||||||
payload = {
|
payload = {"id": 42, "mbid": str(uuid.uuid4()), "name": "Test artist"}
|
||||||
'id': 42,
|
|
||||||
'mbid': str(uuid.uuid4()),
|
|
||||||
'name': "Test artist",
|
|
||||||
}
|
|
||||||
|
|
||||||
result = mopidy_funkwhale.library.convert_to_artist(payload)
|
result = mopidy_funkwhale.library.convert_to_artist(payload)
|
||||||
|
|
||||||
assert type(result) == models.Artist
|
assert type(result) == models.Artist
|
||||||
assert result.musicbrainz_id == payload['mbid']
|
assert result.musicbrainz_id == payload["mbid"]
|
||||||
assert result.uri == 'funkwhale:artists:%s' % (payload['id'],)
|
assert result.uri == "funkwhale:artists:%s" % (payload["id"],)
|
||||||
assert result.name == payload['name']
|
assert result.name == payload["name"]
|
||||||
assert result.sortname == payload['name']
|
assert result.sortname == payload["name"]
|
||||||
|
|
||||||
|
|
||||||
def test_convert_album_to_model():
|
def test_convert_album_to_model():
|
||||||
|
@ -27,11 +26,7 @@ def test_convert_album_to_model():
|
||||||
"tracks": [1, 2, 3, 4],
|
"tracks": [1, 2, 3, 4],
|
||||||
"mbid": str(uuid.uuid4()),
|
"mbid": str(uuid.uuid4()),
|
||||||
"title": "Test album",
|
"title": "Test album",
|
||||||
"artist": {
|
"artist": {"id": 42, "mbid": str(uuid.uuid4()), "name": "Test artist"},
|
||||||
'id': 42,
|
|
||||||
'mbid': str(uuid.uuid4()),
|
|
||||||
'name': "Test artist",
|
|
||||||
},
|
|
||||||
"release_date": "2017-01-01",
|
"release_date": "2017-01-01",
|
||||||
"cover": {
|
"cover": {
|
||||||
"original": "/media/albums/covers/2018/10/03/b4e94b07e-da27-4df4-ae2a-d924a9448544.jpg"
|
"original": "/media/albums/covers/2018/10/03/b4e94b07e-da27-4df4-ae2a-d924a9448544.jpg"
|
||||||
|
@ -41,64 +36,129 @@ def test_convert_album_to_model():
|
||||||
result = mopidy_funkwhale.library.convert_to_album(payload)
|
result = mopidy_funkwhale.library.convert_to_album(payload)
|
||||||
|
|
||||||
assert type(result) == models.Album
|
assert type(result) == models.Album
|
||||||
assert result.musicbrainz_id == payload['mbid']
|
assert result.musicbrainz_id == payload["mbid"]
|
||||||
assert result.uri == 'funkwhale:albums:%s' % (payload['id'],)
|
assert result.uri == "funkwhale:albums:%s" % (payload["id"],)
|
||||||
assert result.name == payload['title']
|
assert result.name == payload["title"]
|
||||||
assert result.date == payload['release_date']
|
assert result.date == payload["release_date"]
|
||||||
assert result.num_tracks == len(payload['tracks'])
|
assert result.num_tracks == len(payload["tracks"])
|
||||||
assert result.artists == frozenset([mopidy_funkwhale.library.convert_to_artist(payload['artist'])])
|
assert result.artists == frozenset(
|
||||||
assert result.images == frozenset([payload['cover']['original']])
|
[mopidy_funkwhale.library.convert_to_artist(payload["artist"])]
|
||||||
|
)
|
||||||
|
assert result.images == frozenset([payload["cover"]["original"]])
|
||||||
|
|
||||||
|
|
||||||
def test_convert_album_to_model():
|
def test_convert_album_to_model():
|
||||||
payload = {
|
payload = {
|
||||||
"id": 2,
|
"id": 2,
|
||||||
"title": 'Test track',
|
"title": "Test track",
|
||||||
"mbid": str(uuid.uuid4()),
|
"mbid": str(uuid.uuid4()),
|
||||||
"creation_date": "2017-01-01",
|
"creation_date": "2017-01-01",
|
||||||
"position": 12,
|
"position": 12,
|
||||||
"bitrate": 128000,
|
"bitrate": 128000,
|
||||||
"duration": 120,
|
"duration": 120,
|
||||||
"artist": {
|
"artist": {"id": 43, "mbid": str(uuid.uuid4()), "name": "Test artist 2"},
|
||||||
'id': 43,
|
|
||||||
'mbid': str(uuid.uuid4()),
|
|
||||||
'name': "Test artist 2",
|
|
||||||
},
|
|
||||||
"album": {
|
"album": {
|
||||||
"id": 3,
|
"id": 3,
|
||||||
"tracks": [1, 2, 3, 4],
|
"tracks": [1, 2, 3, 4],
|
||||||
"mbid": str(uuid.uuid4()),
|
"mbid": str(uuid.uuid4()),
|
||||||
"title": "Test album",
|
"title": "Test album",
|
||||||
"artist": {
|
"artist": {"id": 42, "mbid": str(uuid.uuid4()), "name": "Test artist"},
|
||||||
'id': 42,
|
|
||||||
'mbid': str(uuid.uuid4()),
|
|
||||||
'name': "Test artist",
|
|
||||||
},
|
|
||||||
"release_date": "2017-01-01",
|
"release_date": "2017-01-01",
|
||||||
"cover": {
|
"cover": {
|
||||||
"original": "/media/albums/covers/2018/10/03/b4e94b07e-da27-4df4-ae2a-d924a9448544.jpg"
|
"original": "/media/albums/covers/2018/10/03/b4e94b07e-da27-4df4-ae2a-d924a9448544.jpg"
|
||||||
},
|
},
|
||||||
}
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
result = mopidy_funkwhale.library.convert_to_track(payload)
|
result = mopidy_funkwhale.library.convert_to_track(payload)
|
||||||
|
|
||||||
assert type(result) == models.Track
|
assert type(result) == models.Track
|
||||||
assert result.musicbrainz_id == payload['mbid']
|
assert result.musicbrainz_id == payload["mbid"]
|
||||||
assert result.uri == 'funkwhale:tracks:%s' % (payload['id'],)
|
assert result.uri == "funkwhale:tracks:%s" % (payload["id"],)
|
||||||
assert result.name == payload['title']
|
assert result.name == payload["title"]
|
||||||
assert result.date == payload['album']['release_date']
|
assert result.date == payload["album"]["release_date"]
|
||||||
assert result.length == payload['duration'] * 1000
|
assert result.length == payload["duration"] * 1000
|
||||||
assert result.bitrate == payload['bitrate'] / 1000
|
assert result.bitrate == payload["bitrate"] / 1000
|
||||||
|
|
||||||
assert result.album == mopidy_funkwhale.library.convert_to_album(payload['album'])
|
assert result.album == mopidy_funkwhale.library.convert_to_album(payload["album"])
|
||||||
assert result.artists == frozenset([mopidy_funkwhale.library.convert_to_artist(payload['artist'])])
|
assert result.artists == frozenset(
|
||||||
|
[mopidy_funkwhale.library.convert_to_artist(payload["artist"])]
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize('uri, expected', [
|
@pytest.mark.parametrize(
|
||||||
('funkwhale:albums:42', ('album', 42)),
|
"uri, expected",
|
||||||
('funkwhale:tracks:42', ('track', 42)),
|
[
|
||||||
('funkwhale:artists:42', ('artist', 42)),
|
("funkwhale:albums:42", ("album", 42)),
|
||||||
])
|
("funkwhale:tracks:42", ("track", 42)),
|
||||||
|
("funkwhale:artists:42", ("artist", 42)),
|
||||||
|
],
|
||||||
|
)
|
||||||
def test_parse_uri(uri, expected):
|
def test_parse_uri(uri, expected):
|
||||||
assert mopidy_funkwhale.library.parse_uri(uri) == expected
|
assert mopidy_funkwhale.library.parse_uri(uri) == expected
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("type", ["track", "album", "artist"])
|
||||||
|
def test_parse_uri(type):
|
||||||
|
obj = getattr(models, type.capitalize())(uri="hello:world", name="Hello")
|
||||||
|
expected = getattr(models.Ref, type)(uri=obj.uri, name=obj.name)
|
||||||
|
assert mopidy_funkwhale.library.to_ref(obj) == expected
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"path, expected_handler,remaining",
|
||||||
|
[
|
||||||
|
("funkwhale:directory:favorites", "browse_favorites", []),
|
||||||
|
("funkwhale:directory:favorites:by-date", "browse_favorites", ["by-date"]),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
def test_browse_routing(library, path, expected_handler, mocker, remaining):
|
||||||
|
handler = mocker.patch.object(library, expected_handler, return_value="test")
|
||||||
|
|
||||||
|
assert library.browse(path) == "test"
|
||||||
|
assert handler.called_once_with(remaining)
|
||||||
|
|
||||||
|
|
||||||
|
def test_browse_favorites_root(library):
|
||||||
|
expected = [
|
||||||
|
models.Ref.directory(uri="funkwhale:directory:favorites:recent", name="Recent"),
|
||||||
|
models.Ref.directory(
|
||||||
|
uri="funkwhale:directory:favorites:by-artist", name="By artist"
|
||||||
|
),
|
||||||
|
]
|
||||||
|
assert library.browse_favorites([]) == expected
|
||||||
|
|
||||||
|
|
||||||
|
def test_browse_favorites_recent(library, client, requests_mock):
|
||||||
|
track = factories.TrackJSONFactory()
|
||||||
|
url = (
|
||||||
|
client.session.url_base
|
||||||
|
+ "tracks/?favorites=true&page_size=100&&ordering=-creation_date"
|
||||||
|
)
|
||||||
|
requests_mock.get(url, json={"results": [track]})
|
||||||
|
|
||||||
|
expected = [mopidy_funkwhale.library.convert_to_track(track, ref=True)]
|
||||||
|
result = library.browse_favorites(["recent"])
|
||||||
|
|
||||||
|
assert result == expected
|
||||||
|
|
||||||
|
|
||||||
|
def test_cache_set():
|
||||||
|
cache = mopidy_funkwhale.library.Cache()
|
||||||
|
cache.set("hello:world", "value")
|
||||||
|
assert cache["hello:world"][0] < time.time()
|
||||||
|
assert cache["hello:world"][1] == "value"
|
||||||
|
|
||||||
|
|
||||||
|
def test_cache_get():
|
||||||
|
cache = mopidy_funkwhale.library.Cache()
|
||||||
|
cache["hello:world"] = (time.time(), "value")
|
||||||
|
assert cache.get("hello:world") == "value"
|
||||||
|
|
||||||
|
|
||||||
|
def test_cache_key_too_old():
|
||||||
|
cache = mopidy_funkwhale.library.Cache(max_age=60)
|
||||||
|
t = time.time() - 60
|
||||||
|
cache["hello:world"] = (t, "value")
|
||||||
|
assert cache.get("hello:world") is None
|
||||||
|
assert "hello:world" not in cache
|
||||||
|
|
Loading…
Reference in New Issue