123 lines
3.9 KiB
Python
123 lines
3.9 KiB
Python
from __future__ import unicode_literals
|
|
|
|
import logging
|
|
import requests
|
|
|
|
from mopidy import httpclient, exceptions
|
|
|
|
from . import Extension, __version__
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class SessionWithUrlBase(requests.Session):
|
|
# In Python 3 you could place `url_base` after `*args`, but not in Python 2.
|
|
def __init__(self, url_base=None, *args, **kwargs):
|
|
super(SessionWithUrlBase, self).__init__(*args, **kwargs)
|
|
self.url_base = url_base
|
|
|
|
def request(self, method, url, **kwargs):
|
|
# Next line of code is here for example purposes only.
|
|
# You really shouldn't just use string concatenation here,
|
|
# take a look at urllib.parse.urljoin instead.
|
|
if url.startswith("http://") or url.startswith("https://"):
|
|
modified_url = url
|
|
else:
|
|
modified_url = self.url_base + url
|
|
|
|
return super(SessionWithUrlBase, self).request(method, modified_url, **kwargs)
|
|
|
|
|
|
def get_requests_session(url, proxy_config, user_agent):
|
|
if not url.endswith("/"):
|
|
url += "/"
|
|
url += "api/v1/"
|
|
|
|
proxy = httpclient.format_proxy(proxy_config)
|
|
full_user_agent = httpclient.format_user_agent(user_agent)
|
|
|
|
session = SessionWithUrlBase(url_base=url)
|
|
session.proxies.update({"http": proxy, "https": proxy})
|
|
session.headers.update({"user-agent": full_user_agent})
|
|
|
|
return session
|
|
|
|
|
|
def login(session, username, password):
|
|
if not username:
|
|
return
|
|
response = session.post("token/", {"username": username, "password": password})
|
|
try:
|
|
response.raise_for_status()
|
|
except requests.exceptions.HTTPError:
|
|
raise exceptions.BackendError("Authentication failed for user %s" % (username,))
|
|
token = response.json()["token"]
|
|
session.headers.update({"Authorization": "JWT %s" % (token,)})
|
|
return token
|
|
|
|
|
|
class APIClient(object):
|
|
def __init__(self, config):
|
|
self.config = config
|
|
self.session = get_requests_session(
|
|
config["funkwhale"]["url"],
|
|
proxy_config=config["proxy"],
|
|
user_agent="%s/%s" % (Extension.dist_name, __version__),
|
|
)
|
|
self.username = self.config["funkwhale"]["username"]
|
|
self.token = None
|
|
|
|
def login(self):
|
|
self.username = self.config["funkwhale"]["username"]
|
|
if self.username:
|
|
self.token = login(
|
|
self.session,
|
|
self.config["funkwhale"]["username"],
|
|
self.config["funkwhale"]["password"],
|
|
)
|
|
else:
|
|
self.token = None
|
|
|
|
def search(self, query):
|
|
response = self.session.get("search", params={"query": query})
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
def get_track(self, id):
|
|
response = self.session.get("tracks/{}/".format(id))
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
def list_tracks(self, filters):
|
|
response = self.session.get("tracks/", params=filters)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
def list_artists(self, filters):
|
|
response = self.session.get("artists/", params=filters)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
def list_albums(self, filters):
|
|
response = self.session.get("albums/", params=filters)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
def load_all(self, first_page, max=0):
|
|
for i in first_page["results"]:
|
|
yield i
|
|
|
|
next_page = first_page.get("next")
|
|
counter = 0
|
|
while next_page:
|
|
logger.info("Fetching next page of result at url: %s", next_page)
|
|
response = self.session.get(next_page)
|
|
response.raise_for_status()
|
|
payload = response.json()
|
|
for i in payload["results"]:
|
|
yield i
|
|
counter += 1
|
|
next_page = payload.get("next")
|
|
if max and counter >= max:
|
|
next_page = None
|