diff --git a/mopidy_funkwhale/__init__.py b/mopidy_funkwhale/__init__.py index aba4d1b..7a553cf 100644 --- a/mopidy_funkwhale/__init__.py +++ b/mopidy_funkwhale/__init__.py @@ -9,39 +9,40 @@ import os __author__ = """Eliot Berriot""" -__email__ = 'contact+funkwhale@eliotberriot.com' -__version__ = '0.1.0' +__email__ = "contact+funkwhale@eliotberriot.com" +__version__ = "0.1.0" logger = logging.getLogger(__name__) class Extension(mopidy.ext.Extension): - dist_name = 'Mopidy-Funkwhale' - ext_name = 'funkwhale' + dist_name = "Mopidy-Funkwhale" + ext_name = "funkwhale" version = __version__ def get_default_config(self): - conf_file = os.path.join(os.path.dirname(__file__), 'ext.conf') + conf_file = os.path.join(os.path.dirname(__file__), "ext.conf") return mopidy.config.read(conf_file) def get_config_schema(self): schema = super(Extension, self).get_config_schema() - schema['url'] = mopidy.config.String() - schema['username'] = mopidy.config.String(optional=True) - schema['password'] = mopidy.config.Secret(optional=True) + schema["url"] = mopidy.config.String() + schema["username"] = mopidy.config.String(optional=True) + schema["password"] = mopidy.config.Secret(optional=True) return schema def validate_config(self, config): - if not config.getboolean('funkwhale', 'enabled'): + if not config.getboolean("funkwhale", "enabled"): return - username = config.getstring('funkwhale', 'username') - password = config.getstring('funkwhale', 'password') + username = config.getstring("funkwhale", "username") + password = config.getstring("funkwhale", "password") if any([username, password]) and not all([username, password]): raise mopidy.ext.ExtensionError( - 'You need to provide username and password to authenticate with the funkwhale backend' + "You need to provide username and password to authenticate with the funkwhale backend" ) def setup(self, registry): from . import actor - registry.add('backend', actor.FunkwhaleBackend) + + registry.add("backend", actor.FunkwhaleBackend) diff --git a/mopidy_funkwhale/actor.py b/mopidy_funkwhale/actor.py index 39afd47..a2b221c 100644 --- a/mopidy_funkwhale/actor.py +++ b/mopidy_funkwhale/actor.py @@ -14,34 +14,37 @@ logger = logging.getLogger(__name__) class FunkwhaleBackend(pykka.ThreadingActor, backend.Backend): - def __init__(self, config, audio): super(FunkwhaleBackend, self).__init__() self.config = config - self.remote = client.FunkwhaleClient(config) + self.client = client.APIClient(config) self.library = library.FunkwhaleLibraryProvider(backend=self) self.playback = FunkwhalePlaybackProvider(audio=audio, backend=self) - self.uri_schemes = ['funkwhale', 'fw'] + self.uri_schemes = ["funkwhale", "fw"] def on_start(self): - username = self.remote.username - if username is not None: - logger.info('Logged in to Funkwhale as "%s" on "%s"', username, self.config['funkwhale']['url']) + if self.client.username is not None: + self.client.login() + logger.info( + 'Logged in to Funkwhale as "%s" on "%s"', + self.client.username, + self.config["funkwhale"]["url"], + ) else: - logger.info('Using "%s" anonymously', self.config['funkwhale']['url']) + logger.info('Using "%s" anonymously', self.config["funkwhale"]["url"]) class FunkwhalePlaybackProvider(backend.PlaybackProvider): def translate_uri(self, uri): _, id = library.parse_uri(uri) - track = self.backend.remote.http_client.get_track(id) + track = self.backend.client.get_track(id) if track is None: return None - url = track['listen_url'] - if url.startswith('/'): - url = self.backend.config['funkwhale']['url'] + url - if self.backend.remote.token: - url += '?jwt=' + self.backend.remote.token + url = track["listen_url"] + if url.startswith("/"): + url = self.backend.config["funkwhale"]["url"] + url + if self.backend.client.token: + url += "?jwt=" + self.backend.client.token return url diff --git a/mopidy_funkwhale/client.py b/mopidy_funkwhale/client.py index 9b685ac..8c2cd44 100644 --- a/mopidy_funkwhale/client.py +++ b/mopidy_funkwhale/client.py @@ -23,68 +23,66 @@ class SessionWithUrlBase(requests.Session): def get_requests_session(url, proxy_config, user_agent): - if not url.endswith('/'): - url += '/' - url += 'api/v1/' + if not url.endswith("/"): + url += "/" + url += "api/v1/" proxy = httpclient.format_proxy(proxy_config) full_user_agent = httpclient.format_user_agent(user_agent) session = SessionWithUrlBase(url_base=url) - session.proxies.update({'http': proxy, 'https': proxy}) - session.headers.update({'user-agent': full_user_agent}) + session.proxies.update({"http": proxy, "https": proxy}) + session.headers.update({"user-agent": full_user_agent}) return session def login(session, username, password): - response = session.post('token/', {'username': username, 'password': password}) + if not username: + return + response = session.post("token/", {"username": username, "password": password}) try: response.raise_for_status() except requests.exceptions.HTTPError: - raise exceptions.BackendError('Authentication failed for user %s' % (username,)) - token = response.json()['token'] - session.headers.update({'Authorization': 'JWT %s' % (token,)}) + raise exceptions.BackendError("Authentication failed for user %s" % (username,)) + token = response.json()["token"] + session.headers.update({"Authorization": "JWT %s" % (token,)}) return token + class APIClient(object): - def __init__(self, session): - self.session = session + def __init__(self, config): + self.config = config + self.session = get_requests_session( + config["funkwhale"]["url"], + proxy_config=config["proxy"], + user_agent="%s/%s" % (Extension.dist_name, __version__), + ) + self.username = self.config["funkwhale"]["username"] + self.token = None + + def login(self): + self.username = self.config["funkwhale"]["username"] + if self.username: + self.token = login( + self.session, + self.config["funkwhale"]["username"], + self.config["funkwhale"]["password"], + ) + else: + self.token = None def search(self, query): - response = self.session.get('search', params={'query': query}) + response = self.session.get("search", params={"query": query}) response.raise_for_status() return response.json() def get_track(self, id): - response = self.session.get('tracks/{}/'.format(id)) + response = self.session.get("tracks/{}/".format(id)) response.raise_for_status() return response.json() def list_tracks(self, filters): - response = self.session.get('tracks/', params=filters) + response = self.session.get("tracks/", params=filters) response.raise_for_status() return response.json() - - -class FunkwhaleClient(object): - - def __init__(self, config): - super(FunkwhaleClient, self).__init__() - self.page_size = config['funkwhale'].get('page_size', 50) - self.http_client = APIClient( - session=get_requests_session( - config['funkwhale']['url'], - proxy_config=config['proxy'], - user_agent='%s/%s' % ( - Extension.dist_name, - __version__), - ) - ) - self.username = config['funkwhale']['username'] - self.token = None - if config['funkwhale']['username']: - self.token = login( - self.http_client.session, - config['funkwhale']['username'], - config['funkwhale']['password']) diff --git a/mopidy_funkwhale/library.py b/mopidy_funkwhale/library.py index ad7d46e..1e7b123 100644 --- a/mopidy_funkwhale/library.py +++ b/mopidy_funkwhale/library.py @@ -3,12 +3,21 @@ from __future__ import unicode_literals import collections import logging import re +import time +import urllib from mopidy import backend, models logger = logging.getLogger(__name__) +def generate_uri(path): + return "funkwhale:directory:%s" % path + + +def new_folder(name, path): + return models.Ref.directory(uri=generate_uri(path), name=name) + def simplify_search_query(query): @@ -19,165 +28,185 @@ def simplify_search_query(query): r.extend(v) else: r.append(v) - return ' '.join(r) + return " ".join(r) if isinstance(query, list): - return ' '.join(query) + return " ".join(query) else: return query +class Cache(collections.OrderedDict): + def __init__(self, max_age=0): + self.max_age = max_age + super(Cache, self).__init__() + + def set(self, key, value): + now = time.time() + self[key] = (now, value) + + def get(self, key): + value = super(Cache, self).get(key) + if value is None: + return + now = time.time() + t, v = value + if self.max_age and t + self.max_age < now: + # entry is too old, we delete it + del self[key] + return None + return v + + class FunkwhaleLibraryProvider(backend.LibraryProvider): - root_directory = models.Ref.directory( - uri='funkwhale:directory', - name='Funkwhale' - ) + root_directory = models.Ref.directory(uri="funkwhale:directory", name="Funkwhale") def __init__(self, *args, **kwargs): super(FunkwhaleLibraryProvider, self).__init__(*args, **kwargs) - self.vfs = {'funkwhale:directory': collections.OrderedDict()} - # self.add_to_vfs(new_folder('Favorites', ['favorites'])) + self.vfs = {"funkwhale:directory": collections.OrderedDict()} + self.add_to_vfs(new_folder("Favorites", "favorites")) # self.add_to_vfs(new_folder('Following', ['following'])) # self.add_to_vfs(new_folder('Sets', ['sets'])) # self.add_to_vfs(new_folder('Stream', ['stream'])) + self.cache = Cache() def add_to_vfs(self, _model): - self.vfs['funkwhale:directory'][_model.uri] = _model - - def list_sets(self): - sets_vfs = collections.OrderedDict() - for (name, set_id, tracks) in self.backend.remote.get_sets(): - sets_list = new_folder(name, ['sets', set_id]) - logger.debug('Adding set %s to vfs' % sets_list.name) - sets_vfs[set_id] = sets_list - return sets_vfs.values() - - def list_favorites(self): - vfs_list = collections.OrderedDict() - for track in self.backend.remote.get_likes(): - logger.debug('Adding liked track %s to vfs' % track.name) - vfs_list[track.name] = models.Ref.track( - uri=track.uri, name=track.name) - return vfs_list.values() - - def list_user_follows(self): - sets_vfs = collections.OrderedDict() - for (name, user_id) in self.backend.remote.get_followings(): - sets_list = new_folder(name, ['following', user_id]) - logger.debug('Adding set %s to vfs' % sets_list.name) - sets_vfs[user_id] = sets_list - return sets_vfs.values() - - def tracklist_to_vfs(self, track_list): - vfs_list = collections.OrderedDict() - for temp_track in track_list: - if not isinstance(temp_track, Track): - temp_track = self.backend.remote.parse_track(temp_track) - if hasattr(temp_track, 'uri'): - vfs_list[temp_track.name] = models.Ref.track( - uri=temp_track.uri, - name=temp_track.name - ) - return vfs_list.values() + self.vfs["funkwhale:directory"][_model.uri] = _model def browse(self, uri): if not self.vfs.get(uri): - (req_type, res_id) = re.match(r'.*:(\w*)(?:/(\d*))?', uri).groups() - - if 'favorites' == req_type: - return self.list_favorites() + if uri.startswith("funkwhale:directory:"): + uri = uri.replace("funkwhale:directory:", "", 1) + parts = uri.split(":") + remaining = parts[1:] if len(parts) > 1 else [] + print("PARTS", parts, remaining) + handler = getattr(self, "browse_%s" % parts[0]) + return handler(remaining) # root directory return self.vfs.get(uri, {}).values() + def browse_favorites(self, remaining): + if remaining == []: + return [ + new_folder("Recent", "favorites:recent"), + new_folder("By artist", "favorites:by-artist"), + ] + + if remaining == ["recent"]: + payload = self.backend.client.list_tracks( + {"favorites": "true", "ordering": "-creation_date", "page_size": 100} + )["results"] + return [ + convert_to_track(row, ref=True, cache=self.cache) for row in payload + ] + return [] + def search(self, query=None, uris=None, exact=False): # TODO Support exact search if not query: return - if 'uri' in query: - search_query = ''.join(query['uri']) - url = urlparse(search_query) - if 'soundcloud.com' in url.netloc: - logger.info('Resolving SoundCloud for: %s', search_query) - return SearchResult( - uri='soundcloud:search', - tracks=self.backend.remote.resolve_url(search_query) - ) else: search_query = simplify_search_query(query) - logger.info('Searching Funkwhale for: %s', search_query) - raw_results = self.backend.remote.http_client.search(search_query) - artists = [convert_to_artist(row) for row in raw_results['artists']] - albums = [convert_to_album(row) for row in raw_results['albums']] - tracks = [convert_to_track(row) for row in raw_results['tracks']] + logger.info("Searching Funkwhale for: %s", search_query) + raw_results = self.backend.client.search(search_query) + artists = [convert_to_artist(row) for row in raw_results["artists"]] + albums = [convert_to_album(row) for row in raw_results["albums"]] + tracks = [convert_to_track(row) for row in raw_results["tracks"]] return models.SearchResult( - uri='funkwhale:search', - tracks=tracks, - albums=albums, - artists=artists, + uri="funkwhale:search", tracks=tracks, albums=albums, artists=artists ) def lookup(self, uri): - if 'fw:' in uri: - uri = uri.replace('fw:', '') + print("CACHE", self.cache, uri) + from_cache = self.cache.get(uri) + if from_cache: + try: + len(from_cache) + return from_cache + except TypeError: + return [from_cache] + + if "fw:" in uri: + uri = uri.replace("fw:", "") return self.backend.remote.resolve_url(uri) - client = self.backend.remote.http_client + client = self.backend.client config = { - 'track': lambda id: [client.get_track(id)], - 'album': lambda id: client.list_tracks({'album': id})['results'], - 'artist': lambda id: client.list_tracks({'artist': id})['results'], + "track": lambda id: [client.get_track(id)], + "album": lambda id: client.list_tracks({"album": id})["results"], + "artist": lambda id: client.list_tracks({"artist": id})["results"], } + type, id = parse_uri(uri) payload = config[type](id) - - return [convert_to_track(row) for row in payload] + return [convert_to_track(row, cache=self.cache) for row in payload] def parse_uri(uri): - uri = uri.replace('funkwhale:', '') - parts = uri.split(':') - type = parts[0].rstrip('s') + uri = uri.replace("funkwhale:", "", 1) + parts = uri.split(":") + type = parts[0].rstrip("s") id = int(parts[1]) return type, id +def cast_to_ref(f): + def inner(payload, ref=False, cache=None): + result = f(payload) + if cache is not None: + cache.set(result.uri, result) + if ref: + return to_ref(result) + return result + + return inner + + +@cast_to_ref def convert_to_artist(payload): return models.Artist( - uri='funkwhale:artists:%s' % (payload['id'],), - name=payload['name'], - sortname=payload['name'], - musicbrainz_id=payload['mbid'], + uri="funkwhale:artists:%s" % (payload["id"],), + name=payload["name"], + sortname=payload["name"], + musicbrainz_id=payload["mbid"], ) +@cast_to_ref def convert_to_album(payload): - artist = convert_to_artist(payload['artist']) - image = payload['cover']['original'] if payload['cover'] else None + artist = convert_to_artist(payload["artist"]) + image = payload["cover"]["original"] if payload["cover"] else None return models.Album( - uri='funkwhale:albums:%s' % (payload['id'],), - name=payload['title'], - musicbrainz_id=payload['mbid'], + uri="funkwhale:albums:%s" % (payload["id"],), + name=payload["title"], + musicbrainz_id=payload["mbid"], images=[image] if image else [], artists=[artist], - date=payload['release_date'], - num_tracks=len(payload.get('tracks', [])), + date=payload["release_date"], + num_tracks=len(payload.get("tracks", [])), ) +@cast_to_ref def convert_to_track(payload): - artist = convert_to_artist(payload['artist']) - album = convert_to_album(payload['album']) + artist = convert_to_artist(payload["artist"]) + album = convert_to_album(payload["album"]) return models.Track( - uri='funkwhale:tracks:%s' % (payload['id'],), - name=payload['title'], - musicbrainz_id=payload['mbid'], + uri="funkwhale:tracks:%s" % (payload["id"],), + name=payload["title"], + musicbrainz_id=payload["mbid"], artists=[artist], album=album, - date=payload['album']['release_date'], - bitrate=(payload['bitrate'] or 0) / 1000, - length=(payload['duration'] or 0) * 1000, - track_no=payload['position'], + date=payload["album"]["release_date"], + bitrate=(payload["bitrate"] or 0) / 1000, + length=(payload["duration"] or 0) * 1000, + track_no=payload["position"], ) + + +def to_ref(obj): + getter = getattr(models.Ref, obj.__class__.__name__.lower()) + return getter(uri=obj.uri, name=obj.name) diff --git a/setup.cfg b/setup.cfg index e7e56a1..0f1a184 100644 --- a/setup.cfg +++ b/setup.cfg @@ -35,6 +35,7 @@ test = pytest-cov requests-mock pytest-mock + factory_boy dev = ipython diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..03bd95a --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,37 @@ +import pytest + +import mopidy_funkwhale.actor +import mopidy_funkwhale.client +import mopidy_funkwhale.library + +FUNKWHALE_URL = "https://test.funkwhale" + + +@pytest.fixture() +def config(): + return { + "funkwhale": {"url": FUNKWHALE_URL, "username": "user", "password": "passw0rd"}, + "proxy": {}, + } + + +@pytest.fixture +def backend(config): + return mopidy_funkwhale.actor.FunkwhaleBackend(config=config, audio=None) + + +@pytest.fixture() +def session(backend): + return mopidy_funkwhale.client.get_requests_session( + FUNKWHALE_URL, {}, "test/something" + ) + + +@pytest.fixture() +def client(backend, session): + return backend.client + + +@pytest.fixture +def library(backend): + return backend.library diff --git a/tests/factories.py b/tests/factories.py new file mode 100644 index 0000000..44ccb97 --- /dev/null +++ b/tests/factories.py @@ -0,0 +1,54 @@ +import random + +from mopidy import models + +import factory + + +class ArtistJSONFactory(factory.Factory): + id = factory.Sequence(int) + mbid = factory.Faker("uuid4") + name = factory.Faker("name") + + class Meta: + model = dict + + +class CoverJSONFactory(factory.Factory): + original = factory.Faker("url") + + class Meta: + model = dict + + +class AlbumJSONFactory(factory.Factory): + id = factory.Sequence(int) + mbid = factory.Faker("uuid4") + title = factory.Faker("name") + tracks = factory.Iterator([range(i) for i in range(1, 30)]) + artist = factory.SubFactory(ArtistJSONFactory) + release_date = factory.Faker("date") + cover = factory.SubFactory(CoverJSONFactory) + + class Meta: + model = dict + + +class TrackJSONFactory(factory.Factory): + id = factory.Sequence(int) + mbid = factory.Faker("uuid4") + title = factory.Faker("name") + position = factory.Faker("pyint") + duration = factory.Faker("pyint") + creation_date = factory.Faker("date") + bitrate = factory.Iterator([i * 1000 for i in (128, 256, 360)]) + artist = factory.SubFactory(ArtistJSONFactory) + album = factory.SubFactory(AlbumJSONFactory) + + class Meta: + model = dict + + +class ArtistFactory(factory.Factory): + class Meta: + model = models.Artist diff --git a/tests/test_client.py b/tests/test_client.py index 1d04162..0765d36 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -1,42 +1,26 @@ import pytest -import mopidy_funkwhale.client - -FUNKWHALE_URL = 'https://test.funkwhale' -FUNKWHALE_API_URL = FUNKWHALE_URL + '/api/v1/' - - -@pytest.fixture() -def session(): - return mopidy_funkwhale.client.get_requests_session( - FUNKWHALE_URL, - {}, - 'test/something' - ) - - -@pytest.fixture() -def client(session): - return mopidy_funkwhale.client.APIClient(session) - def test_client_search(client, requests_mock): - requests_mock.get(FUNKWHALE_API_URL + 'search?query=myquery', json={'hello': 'world'}) + requests_mock.get( + client.session.url_base + "search?query=myquery", json={"hello": "world"} + ) - result = client.search('myquery') - assert result == {'hello': 'world'} + result = client.search("myquery") + assert result == {"hello": "world"} def test_client_get_track(client, requests_mock): - requests_mock.get(FUNKWHALE_API_URL + 'tracks/12/', json={'hello': 'world'}) + requests_mock.get(client.session.url_base + "tracks/12/", json={"hello": "world"}) result = client.get_track(12) - assert result == {'hello': 'world'} - + assert result == {"hello": "world"} def test_client_list_tracks(client, requests_mock): - requests_mock.get(FUNKWHALE_API_URL + 'tracks/?artist=12', json={'hello': 'world'}) + requests_mock.get( + client.session.url_base + "tracks/?artist=12", json={"hello": "world"} + ) - result = client.list_tracks({'artist': 12}) - assert result == {'hello': 'world'} + result = client.list_tracks({"artist": 12}) + assert result == {"hello": "world"} diff --git a/tests/test_extension.py b/tests/test_extension.py index bbf61e9..ab5e37f 100644 --- a/tests/test_extension.py +++ b/tests/test_extension.py @@ -8,17 +8,17 @@ def test_get_default_config(): config = ext.get_default_config() - assert '[funkwhale]' in config - assert 'enabled = true' in config - assert 'url = https://demo.funkwhale.audio' in config - assert 'username = demo' in config - assert 'password = demo' in config + assert "[funkwhale]" in config + assert "enabled = true" in config + assert "url = https://demo.funkwhale.audio" in config + assert "username = demo" in config + assert "password = demo" in config def test_get_config_schema(): ext = mopidy_funkwhale.Extension() schema = ext.get_config_schema() - assert 'url' in schema - assert 'username' in schema - assert 'password' in schema + assert "url" in schema + assert "username" in schema + assert "password" in schema diff --git a/tests/test_library.py b/tests/test_library.py index 7632b13..781d1b5 100644 --- a/tests/test_library.py +++ b/tests/test_library.py @@ -1,24 +1,23 @@ import pytest +import time import uuid from mopidy import models import mopidy_funkwhale.library +from . import factories + def test_convert_artist_to_model(): - payload = { - 'id': 42, - 'mbid': str(uuid.uuid4()), - 'name': "Test artist", - } + payload = {"id": 42, "mbid": str(uuid.uuid4()), "name": "Test artist"} result = mopidy_funkwhale.library.convert_to_artist(payload) assert type(result) == models.Artist - assert result.musicbrainz_id == payload['mbid'] - assert result.uri == 'funkwhale:artists:%s' % (payload['id'],) - assert result.name == payload['name'] - assert result.sortname == payload['name'] + assert result.musicbrainz_id == payload["mbid"] + assert result.uri == "funkwhale:artists:%s" % (payload["id"],) + assert result.name == payload["name"] + assert result.sortname == payload["name"] def test_convert_album_to_model(): @@ -27,11 +26,7 @@ def test_convert_album_to_model(): "tracks": [1, 2, 3, 4], "mbid": str(uuid.uuid4()), "title": "Test album", - "artist": { - 'id': 42, - 'mbid': str(uuid.uuid4()), - 'name': "Test artist", - }, + "artist": {"id": 42, "mbid": str(uuid.uuid4()), "name": "Test artist"}, "release_date": "2017-01-01", "cover": { "original": "/media/albums/covers/2018/10/03/b4e94b07e-da27-4df4-ae2a-d924a9448544.jpg" @@ -41,64 +36,129 @@ def test_convert_album_to_model(): result = mopidy_funkwhale.library.convert_to_album(payload) assert type(result) == models.Album - assert result.musicbrainz_id == payload['mbid'] - assert result.uri == 'funkwhale:albums:%s' % (payload['id'],) - assert result.name == payload['title'] - assert result.date == payload['release_date'] - assert result.num_tracks == len(payload['tracks']) - assert result.artists == frozenset([mopidy_funkwhale.library.convert_to_artist(payload['artist'])]) - assert result.images == frozenset([payload['cover']['original']]) + assert result.musicbrainz_id == payload["mbid"] + assert result.uri == "funkwhale:albums:%s" % (payload["id"],) + assert result.name == payload["title"] + assert result.date == payload["release_date"] + assert result.num_tracks == len(payload["tracks"]) + assert result.artists == frozenset( + [mopidy_funkwhale.library.convert_to_artist(payload["artist"])] + ) + assert result.images == frozenset([payload["cover"]["original"]]) def test_convert_album_to_model(): payload = { "id": 2, - "title": 'Test track', + "title": "Test track", "mbid": str(uuid.uuid4()), "creation_date": "2017-01-01", "position": 12, "bitrate": 128000, "duration": 120, - "artist": { - 'id': 43, - 'mbid': str(uuid.uuid4()), - 'name': "Test artist 2", - }, + "artist": {"id": 43, "mbid": str(uuid.uuid4()), "name": "Test artist 2"}, "album": { "id": 3, "tracks": [1, 2, 3, 4], "mbid": str(uuid.uuid4()), "title": "Test album", - "artist": { - 'id': 42, - 'mbid': str(uuid.uuid4()), - 'name': "Test artist", - }, + "artist": {"id": 42, "mbid": str(uuid.uuid4()), "name": "Test artist"}, "release_date": "2017-01-01", "cover": { "original": "/media/albums/covers/2018/10/03/b4e94b07e-da27-4df4-ae2a-d924a9448544.jpg" }, - } + }, } result = mopidy_funkwhale.library.convert_to_track(payload) assert type(result) == models.Track - assert result.musicbrainz_id == payload['mbid'] - assert result.uri == 'funkwhale:tracks:%s' % (payload['id'],) - assert result.name == payload['title'] - assert result.date == payload['album']['release_date'] - assert result.length == payload['duration'] * 1000 - assert result.bitrate == payload['bitrate'] / 1000 + assert result.musicbrainz_id == payload["mbid"] + assert result.uri == "funkwhale:tracks:%s" % (payload["id"],) + assert result.name == payload["title"] + assert result.date == payload["album"]["release_date"] + assert result.length == payload["duration"] * 1000 + assert result.bitrate == payload["bitrate"] / 1000 - assert result.album == mopidy_funkwhale.library.convert_to_album(payload['album']) - assert result.artists == frozenset([mopidy_funkwhale.library.convert_to_artist(payload['artist'])]) + assert result.album == mopidy_funkwhale.library.convert_to_album(payload["album"]) + assert result.artists == frozenset( + [mopidy_funkwhale.library.convert_to_artist(payload["artist"])] + ) -@pytest.mark.parametrize('uri, expected', [ - ('funkwhale:albums:42', ('album', 42)), - ('funkwhale:tracks:42', ('track', 42)), - ('funkwhale:artists:42', ('artist', 42)), -]) +@pytest.mark.parametrize( + "uri, expected", + [ + ("funkwhale:albums:42", ("album", 42)), + ("funkwhale:tracks:42", ("track", 42)), + ("funkwhale:artists:42", ("artist", 42)), + ], +) def test_parse_uri(uri, expected): assert mopidy_funkwhale.library.parse_uri(uri) == expected + + +@pytest.mark.parametrize("type", ["track", "album", "artist"]) +def test_parse_uri(type): + obj = getattr(models, type.capitalize())(uri="hello:world", name="Hello") + expected = getattr(models.Ref, type)(uri=obj.uri, name=obj.name) + assert mopidy_funkwhale.library.to_ref(obj) == expected + + +@pytest.mark.parametrize( + "path, expected_handler,remaining", + [ + ("funkwhale:directory:favorites", "browse_favorites", []), + ("funkwhale:directory:favorites:by-date", "browse_favorites", ["by-date"]), + ], +) +def test_browse_routing(library, path, expected_handler, mocker, remaining): + handler = mocker.patch.object(library, expected_handler, return_value="test") + + assert library.browse(path) == "test" + assert handler.called_once_with(remaining) + + +def test_browse_favorites_root(library): + expected = [ + models.Ref.directory(uri="funkwhale:directory:favorites:recent", name="Recent"), + models.Ref.directory( + uri="funkwhale:directory:favorites:by-artist", name="By artist" + ), + ] + assert library.browse_favorites([]) == expected + + +def test_browse_favorites_recent(library, client, requests_mock): + track = factories.TrackJSONFactory() + url = ( + client.session.url_base + + "tracks/?favorites=true&page_size=100&&ordering=-creation_date" + ) + requests_mock.get(url, json={"results": [track]}) + + expected = [mopidy_funkwhale.library.convert_to_track(track, ref=True)] + result = library.browse_favorites(["recent"]) + + assert result == expected + + +def test_cache_set(): + cache = mopidy_funkwhale.library.Cache() + cache.set("hello:world", "value") + assert cache["hello:world"][0] < time.time() + assert cache["hello:world"][1] == "value" + + +def test_cache_get(): + cache = mopidy_funkwhale.library.Cache() + cache["hello:world"] = (time.time(), "value") + assert cache.get("hello:world") == "value" + + +def test_cache_key_too_old(): + cache = mopidy_funkwhale.library.Cache(max_age=60) + t = time.time() - 60 + cache["hello:world"] = (t, "value") + assert cache.get("hello:world") is None + assert "hello:world" not in cache