From 32f58fe8edaac5d65393ffd1abec2e15392000ae Mon Sep 17 00:00:00 2001 From: Eugen Ciur Date: Mon, 4 May 2020 08:10:15 +0200 Subject: [PATCH] Endpoing moved from pmworker into mglib --- changelog.md | 12 ++ mglib/endpoint.py | 381 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 393 insertions(+) create mode 100644 changelog.md create mode 100644 mglib/endpoint.py diff --git a/changelog.md b/changelog.md new file mode 100644 index 0000000..7f3731c --- /dev/null +++ b/changelog.md @@ -0,0 +1,12 @@ +# Changelog + +## [1.1.0] - work in progress + +### Added + + - Endpoint module move in (from pmworker) + +## [1.0.0] - 25 Apr 2020 + +### Hello, mglib! + diff --git a/mglib/endpoint.py b/mglib/endpoint.py new file mode 100644 index 0000000..7c66acd --- /dev/null +++ b/mglib/endpoint.py @@ -0,0 +1,381 @@ +import os +import logging +import boto3 +from botocore.errorfactory import ClientError + + +logger = logging.getLogger(__name__) + + +def get_keyname(s3_url): + """ + Extracts key part from s3 URL: + s3_url = s3://my-bucket/some/path/to/x.pdf + result = some/path/to/x.pdf + """ + s3 = s3_url.replace('//', '/') + scheme, bucket, *rest = s3.split('/') + return '/'.join(rest) + + +def get_bucketname(s3_url): + """ + Extracts key part from s3 URL: + s3_url = s3://my-bucket/some/path/to/x.pdf + result = my-bucket + """ + s3 = s3_url.replace('//', '/') + scheme, bucket, *rest = s3.split('/') + return bucket + + +def s3_key_exists(endpoint_url): + bucketname = get_bucketname(endpoint_url) + keyname = get_keyname(endpoint_url) + s3_client = boto3.client('s3') + + try: + s3_client.head_object(Bucket=bucketname, Key=keyname) + except ClientError: + logger.debug(f"Endpoint s3:/{bucketname}/{keyname} does not exist.") + return False + + return True + + +class Endpoint: + """ + Endpoint is a either remote or local root storage + from/to where files are downloaded/uploaded. + + In case of S3 storage: + + ep_root = Endpoint("s3:/my-bucket") + ep_root.bucketname == 'my-bucket' + + In case of local storage: + + ep_root = Endpoint("local:/var/media/files") + ep_root.dirname == '/var/media/files' + """ + S3 = 's3' + LOCAL = 'local' + + def __init__(self, url): + if not url or len(url) < 1: + raise ValueError("Invalid url") + + self.url = url + self.local_tmp_ref = None + + @property + def is_s3(self): + return self.scheme == Endpoint.S3 + + @property + def scheme(self): + if ":" in self.url: + s, path = self.url.split(':') + else: + s = Endpoint.LOCAL + + return s + + @property + def is_local(self): + return self.scheme == Endpoint.LOCAL + + @property + def bucketname(self): + # First part is scheme, then bucket name. + if not self.is_s3: + raise ValueError("How come? Bucketname applies only to S3.") + + return self.url.split('/')[1] + + @property + def dirname(self): + if not self.is_local: + raise ValueError("How come? dirname applies only to local.") + + parts = self.url.split('/')[1:] + joined = '/'.join(parts) + + if self.url.endswith('/'): + return f"/{joined}" + else: + return f"/{joined}/" + + def __str__(self): + return "Endpoint(%s)" % self.url + + def __repr__(self): + return "Endpoint(%s)" % self.url + + +class DocumentEp: + """ + Document Endpoint: + ://///// + + If version = 0, it is not included in Endpoint. + Document's version is incremented everytime pdftk operation runs on it + (when pages are deleted, reordered, pasted) + """ + + def __init__( + self, + remote_endpoint, + local_endpoint, + user_id, + document_id, + file_name, + aux_dir="docs", + version=0 + ): + self.remote_endpoint = remote_endpoint + self.local_endpoint = local_endpoint + self.user_id = user_id + self.document_id = document_id + self.file_name = file_name + self.aux_dir = aux_dir + # by default, document has version 0 + self.version = version + self.pages = "pages" + + def url(self, ep=Endpoint.LOCAL): + full_path = None + + if ep == Endpoint.S3: + full_path = ( + f"s3:/{self.bucketname}/" + f"{self.key}" + ) + else: + full_path = f"{self.dirname}{self.file_name}" + + return full_path + + @property + def dirname(self): + root_dir = f"{self.local_endpoint.dirname}" + + full_path = ( + f"{root_dir}" + f"{self.aux_dir}/user_{self.user_id}/" + f"document_{self.document_id}/" + ) + + if self.version > 0: + full_path = f"{full_path}v{self.version}/" + + return full_path + + @property + def pages_dirname(self): + return f"{self.dirname}{self.pages}/" + + @property + def bucketname(self): + return self.remote_endpoint.bucketname + + @property + def key(self): + root_dir = f"{self.aux_dir}" + + full_path = ( + f"{root_dir}/user_{self.user_id}/" + f"document_{self.document_id}/" + ) + + if self.version > 0: + full_path = f"{full_path}v{self.version}/{self.file_name}" + else: + full_path = f"{full_path}{self.file_name}" + + return full_path + + def __repr__(self): + message = ( + f"DocumentEp(version={self.version}," + f"remote_endpoint={self.remote_endpoint}," + f"local_endpoint={self.local_endpoint}," + f"user_id={self.user_id}," + f"document_id={self.document_id}," + f"file_name={self.file_name})" + ) + return message + + def inc_version(self): + self.version = self.version + 1 + + def exists(self, ep=Endpoint.LOCAL): + result = False + + if ep == Endpoint.LOCAL: + result = os.path.exists(self.url(ep=Endpoint.LOCAL)) + else: + result = s3_key_exists(self.url(ep=Endpoint.S3)) + + return result + + def copy_from(doc_ep, aux_dir): + return DocumentEp( + remote_endpoint=doc_ep.remote_endpoint, + local_endpoint=doc_ep.local_endpoint, + user_id=doc_ep.user_id, + document_id=doc_ep.document_id, + file_name=doc_ep.file_name, + version=doc_ep.version, + aux_dir=aux_dir + ) + + +class PageEp: + """ + schema://...//pages///page-.jpg + """ + + def __init__( + self, + document_ep, + page_num, + page_count, + step=None + ): + if not isinstance(page_num, int): + msg_err = f"PageEp.page_num must be an int. Got {page_num}." + raise ValueError(msg_err) + + self.document_ep = document_ep + self.results_document_ep = DocumentEp.copy_from( + document_ep, + aux_dir="results" + ) + self.page_count = page_count + self.page_num = page_num + self.step = step + self.pages = self.document_ep.pages + + @property + def ppmroot(self): + # returns schema://...//pages///page + pages_dirname = self.results_document_ep.pages_dirname + result = ( + f"{pages_dirname}page_{self.page_num}/" + f"{self.step.percent}/page" + ) + return result + + @property + def pages_dirname(self): + return self.document_ep.pages_dirname + + def exists(self, ep=Endpoint.LOCAL): + return self.txt_exists(ep) + + def url(self, ep=Endpoint.LOCAL): + return self.txt_url(ep) + + def txt_url(self, ep=Endpoint.LOCAL): + result = None + + if ep == Endpoint.LOCAL: + pages_dirname = self.results_document_ep.pages_dirname + result = f"{pages_dirname}page_{self.page_num}.txt" + else: + aux_dir = self.results_document_ep.aux_dir + user_id = self.results_document_ep.user_id + document_id = self.results_document_ep.document_id + + result = ( + f"s3:/{self.results_document_ep.remote_endpoint.bucketname}/" + f"{aux_dir}/user_{user_id}/" + f"document_{document_id}/{self.pages}/page_{self.page_num}.txt" + ) + + return result + + def txt_exists(self, ep=Endpoint.LOCAL): + result = False + + if ep == Endpoint.LOCAL: + result = os.path.exists(self.txt_url(ep=ep)) + else: + result = s3_key_exists(self.txt_url(ep=Endpoint.S3)) + + return result + + @property + def bucketname(self): + return self.results_document_ep.remote_endpoint.bucketname + + def hocr_url(self, ep=Endpoint.LOCAL): + url = None + if ep == Endpoint.LOCAL: + url = f"{self.ppmroot}-{self.ppmtopdf_formated_number}.hocr" + else: + aux_dir = self.results_document_ep.aux_dir + user_id = self.results_document_ep.user_id + document_id = self.results_document_ep.document_id + + url = ( + f"s3:/{self.results_document_ep.remote_endpoint.bucketname}/" + f"{aux_dir}/user_{user_id}/" + f"document_{document_id}/{self.pages}/page_{self.page_num}/" + f"{self.step.percent}/" + f"page-{self.ppmtopdf_formated_number}.hocr" + ) + + return url + + def hocr_exists(self, ep=Endpoint.LOCAL): + result = False + + if ep == Endpoint.LOCAL: + result = os.path.exists(self.hocr_url(ep=ep)) + elif ep == Endpoint.S3: + endpoint_url = self.hocr_url(ep=Endpoint.S3) + result = s3_key_exists(endpoint_url) + + return result + + @property + def ppmtopdf_formated_number(self): + + if self.page_count <= 9: + fmt_num = "{num:d}" + elif self.page_count > 9 and self.page_count < 100: + fmt_num = "{num:02d}" + elif self.page_count > 100: + fmt_num = "{num:003d}" + + return fmt_num.format( + num=int(self.page_num) + ) + + def img_exists(self, ep=Endpoint.LOCAL): + result = False + + if ep == Endpoint.LOCAL: + result = os.path.exists(self.img_url(ep=ep)) + + return result + + def img_url(self, ep=Endpoint.LOCAL): + url = None + if ep == Endpoint.LOCAL: + url = f"{self.ppmroot}-{self.ppmtopdf_formated_number}.jpg" + else: + aux_dir = self.results_document_ep.aux_dir + user_id = self.results_document_ep.user_id + document_id = self.results_document_ep.document_id + url = ( + f"s3:/{self.results_document_ep.remote_endpoint.bucketname}/" + f"{aux_dir}/user_{user_id}/" + f"document_{document_id}/{self.pages}/page_{self.page_num}/" + f"{self.step.percent}/" + f"page-{self.ppmtopdf_formated_number}.jpg" + ) + + return url