326 lines
10 KiB
Python
326 lines
10 KiB
Python
from __future__ import unicode_literals
|
|
|
|
import collections
|
|
import logging
|
|
import re
|
|
import time
|
|
import urllib
|
|
|
|
from mopidy import backend, models
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def generate_uri(path):
|
|
return "funkwhale:directory:%s" % path
|
|
|
|
|
|
def new_folder(name, path):
|
|
return models.Ref.directory(uri=generate_uri(path), name=name)
|
|
|
|
|
|
def simplify_search_query(query):
|
|
|
|
if isinstance(query, dict):
|
|
r = []
|
|
for v in query.values():
|
|
if isinstance(v, list):
|
|
r.extend(v)
|
|
else:
|
|
r.append(v)
|
|
return " ".join(r)
|
|
if isinstance(query, list):
|
|
return " ".join(query)
|
|
else:
|
|
return query
|
|
|
|
|
|
class Cache(collections.OrderedDict):
|
|
def __init__(self, max_age=0):
|
|
self.max_age = max_age
|
|
super(Cache, self).__init__()
|
|
|
|
def set(self, key, value):
|
|
if self.max_age is None:
|
|
return
|
|
now = time.time()
|
|
self[key] = (now, value)
|
|
|
|
def get(self, key):
|
|
if self.max_age is None:
|
|
return
|
|
value = super(Cache, self).get(key)
|
|
if value is None:
|
|
return
|
|
now = time.time()
|
|
t, v = value
|
|
if self.max_age and t + self.max_age < now:
|
|
# entry is too old, we delete it
|
|
del self[key]
|
|
return None
|
|
return v
|
|
|
|
|
|
class FunkwhaleLibraryProvider(backend.LibraryProvider):
|
|
root_directory = models.Ref.directory(uri="funkwhale:directory", name="Funkwhale")
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super(FunkwhaleLibraryProvider, self).__init__(*args, **kwargs)
|
|
self.vfs = {"funkwhale:directory": collections.OrderedDict()}
|
|
self.add_to_vfs(new_folder("Favorites", "favorites"))
|
|
self.add_to_vfs(new_folder("Artists", "artists"))
|
|
# self.add_to_vfs(new_folder('Following', ['following']))
|
|
# self.add_to_vfs(new_folder('Sets', ['sets']))
|
|
# self.add_to_vfs(new_folder('Stream', ['stream']))
|
|
self.cache = Cache(max_age=self.backend.config["funkwhale"]["cache_duration"])
|
|
|
|
def add_to_vfs(self, _model):
|
|
self.vfs["funkwhale:directory"][_model.uri] = _model
|
|
|
|
def browse(self, uri):
|
|
cache_key = uri
|
|
from_cache = self.cache.get(cache_key)
|
|
if from_cache:
|
|
try:
|
|
len(from_cache)
|
|
return from_cache
|
|
except TypeError:
|
|
return [from_cache]
|
|
|
|
if not self.vfs.get(uri):
|
|
if uri.startswith("funkwhale:directory:"):
|
|
uri = uri.replace("funkwhale:directory:", "", 1)
|
|
parts = uri.split(":")
|
|
remaining = parts[1:] if len(parts) > 1 else []
|
|
handler = getattr(self, "browse_%s" % parts[0])
|
|
result, cache = handler(remaining)
|
|
if cache:
|
|
self.cache.set(cache_key, result)
|
|
return result
|
|
|
|
# root directory
|
|
return self.vfs.get(uri, {}).values()
|
|
|
|
def browse_favorites(self, remaining):
|
|
if remaining == []:
|
|
return (
|
|
[
|
|
new_folder("Recent", "favorites:recent"),
|
|
# new_folder("By artist", "favorites:by-artist"),
|
|
],
|
|
False,
|
|
)
|
|
|
|
if remaining == ["recent"]:
|
|
payload = self.backend.client.list_tracks(
|
|
{"favorites": "true", "ordering": "-creation_date", "page_size": 50}
|
|
)
|
|
tracks = [
|
|
convert_to_track(row, ref=True, cache=self.cache)
|
|
for row in self.backend.client.load_all(payload, max=10)
|
|
]
|
|
return tracks, True
|
|
return [], False
|
|
|
|
def browse_albums(self, uri_prefix, remaining):
|
|
if len(remaining) == 2:
|
|
album = remaining[1]
|
|
payload = self.backend.client.list_tracks(
|
|
{
|
|
"ordering": "position",
|
|
"page_size": 50,
|
|
"playable": "true",
|
|
"album": album,
|
|
}
|
|
)
|
|
tracks = [
|
|
convert_to_track(row, ref=True, cache=self.cache)
|
|
for row in self.backend.client.load_all(payload)
|
|
]
|
|
return tracks
|
|
else:
|
|
artist, album = remaining[0], None
|
|
payload = self.backend.client.list_albums(
|
|
{
|
|
"ordering": "title",
|
|
"page_size": 50,
|
|
"playable": "true",
|
|
"artist": artist,
|
|
}
|
|
)
|
|
albums = [
|
|
convert_to_album(row, uri_prefix=uri_prefix, ref=True)
|
|
for row in self.backend.client.load_all(payload)
|
|
]
|
|
return albums
|
|
|
|
def browse_artists(self, remaining):
|
|
logger.debug("Handling artist route: %s", remaining)
|
|
if remaining == []:
|
|
return (
|
|
[
|
|
new_folder("Recent", "artists:recent"),
|
|
new_folder("By name", "artists:by-name"),
|
|
],
|
|
False,
|
|
)
|
|
|
|
root = remaining[0]
|
|
end = remaining[1:]
|
|
albums_uri_prefix = "funkwhale:directory:artists:" + ":".join(
|
|
[str(i) for i in remaining]
|
|
)
|
|
if root == "recent":
|
|
if end:
|
|
# list albums
|
|
return (
|
|
self.browse_albums(uri_prefix=albums_uri_prefix, remaining=end),
|
|
True,
|
|
)
|
|
# list recent artists
|
|
payload = self.backend.client.list_artists(
|
|
{"ordering": "-creation_date", "page_size": 50, "playable": "true"}
|
|
)
|
|
uri_prefix = "funkwhale:directory:artists:recent"
|
|
|
|
artists = [
|
|
convert_to_artist(row, uri_prefix=uri_prefix, ref=True)
|
|
for row in self.backend.client.load_all(payload, max=1)
|
|
]
|
|
return artists, True
|
|
|
|
if root == "by-name":
|
|
if end:
|
|
# list albums
|
|
return (
|
|
self.browse_albums(uri_prefix=albums_uri_prefix, remaining=end),
|
|
True,
|
|
)
|
|
# list recent artists
|
|
payload = self.backend.client.list_artists(
|
|
{"ordering": "name", "page_size": 50, "playable": "true"}
|
|
)
|
|
uri_prefix = "funkwhale:directory:artists:by-name"
|
|
artists = [
|
|
convert_to_artist(row, uri_prefix=uri_prefix, ref=True)
|
|
for row in self.backend.client.load_all(payload)
|
|
]
|
|
return artists, True
|
|
|
|
return [], False
|
|
|
|
def search(self, query=None, uris=None, exact=False):
|
|
# TODO Support exact search
|
|
if not query:
|
|
return
|
|
|
|
else:
|
|
search_query = simplify_search_query(query)
|
|
logger.info("Searching Funkwhale for: %s", search_query)
|
|
raw_results = self.backend.client.search(search_query)
|
|
artists = [convert_to_artist(row) for row in raw_results["artists"]]
|
|
albums = [convert_to_album(row) for row in raw_results["albums"]]
|
|
tracks = [convert_to_track(row) for row in raw_results["tracks"]]
|
|
|
|
return models.SearchResult(
|
|
uri="funkwhale:search", tracks=tracks, albums=albums, artists=artists
|
|
)
|
|
|
|
def lookup(self, uri):
|
|
from_cache = self.cache.get(uri)
|
|
if from_cache:
|
|
try:
|
|
len(from_cache)
|
|
return from_cache
|
|
except TypeError:
|
|
return [from_cache]
|
|
|
|
if "fw:" in uri:
|
|
uri = uri.replace("fw:", "")
|
|
return self.backend.remote.resolve_url(uri)
|
|
|
|
client = self.backend.client
|
|
config = {
|
|
"track": lambda id: [client.get_track(id)],
|
|
"album": lambda id: client.list_tracks({"album": id})["results"],
|
|
"artist": lambda id: client.list_tracks({"artist": id})["results"],
|
|
}
|
|
|
|
type, id = parse_uri(uri)
|
|
payload = config[type](id)
|
|
return [convert_to_track(row, cache=self.cache) for row in payload]
|
|
|
|
|
|
def parse_uri(uri):
|
|
uri = uri.replace("funkwhale:", "", 1)
|
|
parts = uri.split(":")
|
|
type = parts[0].rstrip("s")
|
|
id = int(parts[1])
|
|
return type, id
|
|
|
|
|
|
def cast_to_ref(f):
|
|
def inner(payload, *args, **kwargs):
|
|
ref = kwargs.pop("ref", False)
|
|
cache = kwargs.pop("cache", None)
|
|
result = f(payload, *args, **kwargs)
|
|
if cache is not None:
|
|
cache.set(result.uri, result)
|
|
if ref:
|
|
return to_ref(result)
|
|
return result
|
|
|
|
return inner
|
|
|
|
|
|
@cast_to_ref
|
|
def convert_to_artist(payload, uri_prefix="funkwhale:artists"):
|
|
return models.Artist(
|
|
uri=uri_prefix + ":%s" % payload["id"],
|
|
name=payload["name"],
|
|
sortname=payload["name"],
|
|
musicbrainz_id=payload["mbid"],
|
|
)
|
|
|
|
|
|
@cast_to_ref
|
|
def convert_to_album(payload, uri_prefix="funkwhale:albums"):
|
|
artist = convert_to_artist(payload["artist"])
|
|
image = payload["cover"]["original"] if payload["cover"] else None
|
|
|
|
return models.Album(
|
|
uri=uri_prefix + ":%s" % payload["id"],
|
|
name=payload["title"],
|
|
musicbrainz_id=payload["mbid"],
|
|
images=[image] if image else [],
|
|
artists=[artist],
|
|
date=payload["release_date"],
|
|
num_tracks=len(payload.get("tracks", [])),
|
|
)
|
|
|
|
|
|
@cast_to_ref
|
|
def convert_to_track(payload, uri_prefix="funkwhale:tracks"):
|
|
artist = convert_to_artist(payload["artist"])
|
|
album = convert_to_album(payload["album"])
|
|
try:
|
|
upload = payload["uploads"][0]
|
|
except (KeyError, IndexError):
|
|
upload = {}
|
|
return models.Track(
|
|
uri=uri_prefix + ":%s" % payload["id"],
|
|
name=payload["title"],
|
|
musicbrainz_id=payload["mbid"],
|
|
artists=[artist],
|
|
album=album,
|
|
date=payload["album"]["release_date"],
|
|
bitrate=(upload.get("bitrate") or 0) / 1000,
|
|
length=(upload.get("duration") or 0) * 1000,
|
|
track_no=payload["position"],
|
|
)
|
|
|
|
|
|
def to_ref(obj):
|
|
getter = getattr(models.Ref, obj.__class__.__name__.lower())
|
|
return getter(uri=obj.uri, name=obj.name)
|