mirror of https://github.com/papermerge/mglib
Compare commits
26 Commits
Author | SHA1 | Date |
---|---|---|
Eugen Ciur | 00775cef7d | |
Eugen Ciur | bea2a7dd62 | |
Eugen Ciur | e27107ae83 | |
Eugen Ciur | c5a0be464c | |
Eugen Ciur | 6fecc4d60f | |
Eugen Ciur | 21a9ebb57b | |
Eugen Ciur | 2d7c96be4e | |
Eugen Ciur | 90352965e6 | |
Eugen Ciur | e267f7e98f | |
Eugen Ciur | 4a7099a16b | |
Eugen Ciur | 7ddb02dcb5 | |
Eugen Ciur | eb98ef1329 | |
Eugen Ciur | 6f7e8ba0e2 | |
Eugen Ciur | 40f95466c8 | |
Eugen Ciur | 25e973ff79 | |
Eugen Ciur | 1e6d1a10ec | |
Eugen Ciur | 535af7df83 | |
Eugen Ciur | e341100e69 | |
Eugen Ciur | faba619024 | |
Eugen Ciur | 94a72760ca | |
Eugen Ciur | c8b524910d | |
Eugen Ciur | 1b86732056 | |
Eugen Ciur | 06be42542a | |
Eugen Ciur | fe20ddd72b | |
Eugen Ciur | b7ce57b055 | |
Eugen Ciur | fa90e6b0a6 |
2
LICENSE
2
LICENSE
|
@ -1,6 +1,6 @@
|
|||
Copyright 2020 Eugen Ciur <eugen@papermerge.com>
|
||||
|
||||
MgMail is Licensed under Apache License version 2.0
|
||||
MgLib is Licensed under Apache License version 2.0
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this software except in compliance with the License.
|
||||
|
|
18
README.md
18
README.md
|
@ -1,18 +0,0 @@
|
|||
MgLib
|
||||
=======
|
||||
|
||||
Python Package containing modules shared across all [Papermerge Project](https://github.com/ciur/papermerge) project.
|
||||
|
||||
## Installation
|
||||
|
||||
pip install mglib
|
||||
|
||||
|
||||
## Run tests
|
||||
|
||||
python test/run.py
|
||||
|
||||
|
||||
## Requirements
|
||||
|
||||
python >= 3.7
|
|
@ -0,0 +1,20 @@
|
|||
MgLib
|
||||
=======
|
||||
|
||||
Python Package containing modules shared across all `Papermerge Project <https://github.com/ciur/papermerge>`_ project.
|
||||
|
||||
Installation
|
||||
##############
|
||||
|
||||
pip install mglib
|
||||
|
||||
Run tests
|
||||
###########
|
||||
|
||||
python test/run.py
|
||||
|
||||
|
||||
Requirements
|
||||
##############
|
||||
|
||||
python >= 3.7
|
41
changelog.md
41
changelog.md
|
@ -1,6 +1,47 @@
|
|||
|
||||
# Changelog
|
||||
|
||||
## [1.3.9] - 2021-09-19
|
||||
### Added
|
||||
- empty \_s3copy method to meglib.storage.Storage class
|
||||
|
||||
## [1.3.8] - 4 March 2020
|
||||
|
||||
- bug fix: reset version to 0 for newly created documents "from paste"
|
||||
|
||||
## [1.3.5] - 14 December 2020
|
||||
|
||||
### Changed
|
||||
|
||||
- bug fixing of get_versions method
|
||||
|
||||
|
||||
## [1.3.4] - 14 December 2020
|
||||
|
||||
### Changed
|
||||
|
||||
- mglib.storage.get_versions(self, doc_path) method added
|
||||
|
||||
## [1.3.3] - 14 December 2020
|
||||
|
||||
### Changed
|
||||
|
||||
- mglib.path module adjusted to accept version argument. Supports
|
||||
getting/setting path to versioned documents.
|
||||
|
||||
## [1.3.2] - 1 December 2020
|
||||
|
||||
### Changed
|
||||
|
||||
- mglib.pdfinfo.get_pagecount use python magic + file extention to determine correct mime type (and thus page count)
|
||||
|
||||
## [1.3.1] - 1 December 2020
|
||||
|
||||
### Changed
|
||||
|
||||
- pdftk module was replaced with stapler
|
||||
|
||||
|
||||
## [1.2.8] - 24 August 2020
|
||||
|
||||
### Added
|
||||
|
|
|
@ -23,10 +23,6 @@ BINARY_IDENTIFY = "/usr/bin/identify"
|
|||
# Used to extract text from images/PDF files.
|
||||
BINARY_OCR = "/usr/bin/tesseract"
|
||||
|
||||
# Provided by pdftk package
|
||||
# Used to reorder, cut/paste, delete pages withing PDF document
|
||||
BINARY_PDFTK = "/usr/bin/pdftk"
|
||||
|
||||
# Provided by stapler
|
||||
# Used to edit PDF documents
|
||||
BINARY_STAPLER = "~/.local/bin/stapler"
|
||||
|
|
|
@ -32,12 +32,16 @@ class DocumentPath:
|
|||
self.version = version
|
||||
self.pages = "pages"
|
||||
|
||||
def url(self):
|
||||
return f"{self.dirname}{self.file_name}"
|
||||
def url(self, version=None):
|
||||
if version:
|
||||
version = int(version)
|
||||
|
||||
@property
|
||||
def path(self):
|
||||
return self.url()
|
||||
return f"{self.dirname(version=version)}{self.file_name}"
|
||||
|
||||
def path(self, version=None):
|
||||
if version:
|
||||
version = int(version)
|
||||
return self.url(version=version)
|
||||
|
||||
@property
|
||||
def dirname_docs(self):
|
||||
|
@ -57,21 +61,23 @@ class DocumentPath:
|
|||
|
||||
return _path
|
||||
|
||||
@property
|
||||
def dirname(self):
|
||||
def dirname(self, version=None):
|
||||
|
||||
if version is None:
|
||||
version = self.version
|
||||
|
||||
full_path = (
|
||||
f"{self.aux_dir}/user_{self.user_id}/"
|
||||
f"document_{self.document_id}/"
|
||||
)
|
||||
|
||||
if self.version > 0:
|
||||
full_path = f"{full_path}v{self.version}/"
|
||||
if version > 0:
|
||||
full_path = f"{full_path}v{version}/"
|
||||
|
||||
return full_path
|
||||
|
||||
@property
|
||||
def pages_dirname(self):
|
||||
return f"{self.dirname}{self.pages}/"
|
||||
def pages_dirname(self, version=None):
|
||||
return f"{self.dirname(version=version)}{self.pages}/"
|
||||
|
||||
def __repr__(self):
|
||||
message = (
|
||||
|
@ -144,7 +150,7 @@ class PagePath:
|
|||
@property
|
||||
def ppmroot(self):
|
||||
# returns schema://.../<doc_id>/pages/<page_num>/<step>/page
|
||||
pages_dirname = self.results_document_ep.pages_dirname
|
||||
pages_dirname = self.results_document_ep.pages_dirname()
|
||||
result = (
|
||||
f"{pages_dirname}page_{self.page_num}/"
|
||||
f"{self.step.percent}/page"
|
||||
|
@ -153,7 +159,7 @@ class PagePath:
|
|||
|
||||
@property
|
||||
def pages_dirname(self):
|
||||
return self.document_path.pages_dirname
|
||||
return self.document_path.pages_dirname()
|
||||
|
||||
@property
|
||||
def path(self):
|
||||
|
@ -167,7 +173,7 @@ class PagePath:
|
|||
return self.txt_url()
|
||||
|
||||
def txt_url(self):
|
||||
pages_dirname = self.results_document_ep.pages_dirname
|
||||
pages_dirname = self.results_document_ep.pages_dirname()
|
||||
return f"{pages_dirname}page_{self.page_num}.txt"
|
||||
|
||||
@property
|
||||
|
@ -193,7 +199,7 @@ class PagePath:
|
|||
fmt_num = "{num:d}"
|
||||
elif self.page_count > 9 and self.page_count < 100:
|
||||
fmt_num = "{num:02d}"
|
||||
elif self.page_count > 100:
|
||||
elif self.page_count >= 100:
|
||||
fmt_num = "{num:003d}"
|
||||
|
||||
return fmt_num.format(
|
||||
|
|
|
@ -64,22 +64,42 @@ def get_pagecount(filepath):
|
|||
if os.path.isdir(filepath):
|
||||
raise ValueError("Filepath %s is a directory!" % filepath)
|
||||
|
||||
base, ext = os.path.splitext(filepath)
|
||||
mime_type = from_file(filepath, mime=True)
|
||||
|
||||
# pure images (png, jpeg) have only one page :)
|
||||
|
||||
if mime_type in ['image/png', 'image/jpeg', 'image/jpg']:
|
||||
# whatever png/jpg image is there - it is
|
||||
# considered by default one page document.
|
||||
return 1
|
||||
|
||||
# In case of REST API upload (via PUT + form multipart)
|
||||
# django saves temporary file as application/octet-stream
|
||||
# Checking extentions is an extra method of finding out correct
|
||||
# mime type
|
||||
if ext and ext.lower() in ('.jpeg', '.png', '.jpg'):
|
||||
return 1
|
||||
|
||||
if mime_type == 'image/tiff':
|
||||
return get_tiff_pagecount(filepath)
|
||||
|
||||
# In case of REST API upload (via PUT + form multipart)
|
||||
# django saves temporary file as application/octet-stream
|
||||
# Checking extentions is an extra method of finding out correct
|
||||
# mime type
|
||||
if ext and ext.lower() in ('.tiff', ):
|
||||
return get_tiff_pagecount(filepath)
|
||||
|
||||
if mime_type != 'application/pdf':
|
||||
raise FileTypeNotSupported(
|
||||
"Only jpeg, png, pdf and tiff are handled by this"
|
||||
" method"
|
||||
)
|
||||
# In case of REST API upload (via PUT + form multipart)
|
||||
# django saves temporary file as application/octet-stream
|
||||
# Checking extentions is an extra method of finding out correct
|
||||
# mime type
|
||||
if ext and ext.lower() != '.pdf':
|
||||
raise FileTypeNotSupported(
|
||||
"Only jpeg, png, pdf and tiff are handled by this"
|
||||
" method"
|
||||
)
|
||||
# pdfinfo "${PDFFILE}" | grep Pages
|
||||
cmd = [
|
||||
settings.BINARY_PDFINFO,
|
||||
|
|
357
mglib/pdftk.py
357
mglib/pdftk.py
|
@ -1,357 +0,0 @@
|
|||
import logging
|
||||
|
||||
from mglib.runcmd import run
|
||||
from mglib.pdfinfo import get_pagecount
|
||||
|
||||
from .conf import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
#
|
||||
# Utilities around pdftk command line tool
|
||||
#
|
||||
# https://www.pdflabs.com/docs/pdftk-man-page/
|
||||
#
|
||||
|
||||
|
||||
def cat_ranges_for_reorder(page_count, new_order):
|
||||
"""
|
||||
Returns a list of integers. Each number in the list
|
||||
is correctly positioned (newly ordered) page.
|
||||
|
||||
Examples:
|
||||
|
||||
If in document with 4 pages first and second pages were
|
||||
swapped, then returned list will be:
|
||||
|
||||
[2, 1, 3, 4]
|
||||
|
||||
If first page was swapped with last one (also 4 paegs document)
|
||||
result list will look like:
|
||||
|
||||
[4, 2, 3, 1]
|
||||
"""
|
||||
if len(new_order) != page_count:
|
||||
raise ValueError("Not enough pages specified")
|
||||
results = []
|
||||
# key = page_num
|
||||
# value = page_order
|
||||
page_map = {}
|
||||
|
||||
for item in new_order:
|
||||
k = int(item['page_order'])
|
||||
v = int(item['page_num'])
|
||||
page_map[k] = v
|
||||
|
||||
for number in range(1, page_count + 1):
|
||||
results.append(
|
||||
page_map[number]
|
||||
)
|
||||
|
||||
return results
|
||||
|
||||
|
||||
def cat_ranges_for_delete(page_count, page_numbers):
|
||||
"""
|
||||
Returns a list of integers. Each number in the list
|
||||
is the number of page which will 'stay' in document.
|
||||
In other words, it returns a list with not deleted pages.
|
||||
|
||||
Examples:
|
||||
|
||||
|
||||
If document has 22 pages (page_count=22) and page number 21 is to be
|
||||
deleted (i.e page_numbers = [21]) will return
|
||||
|
||||
[1, 2, 3, 4, ..., 19, 20, 22]
|
||||
|
||||
If page number 1 is to be deleted:
|
||||
|
||||
[2, 3, 4, ..., 22] list will be returned.
|
||||
|
||||
If page number is 22 is to be deleted:
|
||||
|
||||
[1, 2, 3,..., 21] will be returned.
|
||||
|
||||
With page_numbers=[1, 7, 10] and page_count=22 result
|
||||
will be:
|
||||
|
||||
(2, 3, 4, 5, 6, 8, 9, 11, 12 , 13, ..., 22)
|
||||
|
||||
|
||||
page_numbers is a list of page numbers (starting with 1).
|
||||
"""
|
||||
results = []
|
||||
|
||||
for check in page_numbers:
|
||||
if not isinstance(check, int):
|
||||
err_msg = "page_numbers must be a list of ints"
|
||||
raise ValueError(err_msg)
|
||||
|
||||
for number in range(1, page_count + 1):
|
||||
if number not in page_numbers:
|
||||
results.append(number)
|
||||
|
||||
return results
|
||||
|
||||
|
||||
def split_ranges(total, after=False, before=False):
|
||||
"""
|
||||
Given a range 1, 2, ..., total (page numbers of a doc).
|
||||
Split it in two lists.
|
||||
Example:
|
||||
Input: total = 9, after=1, before=False
|
||||
Output: list1 = [1]; list2 = [2, 3, 4, ..., 9].
|
||||
|
||||
Input: total = 9; after=False, before=1
|
||||
Output: list1 = [], list2 = [1, 2, 3, 4, ..., 9]
|
||||
|
||||
Input: total = 5; after=4; before=False
|
||||
Output: list1 = [1, 2, 3, 4] list2 = [5]
|
||||
|
||||
Input: total = 5; after=False; before=False;
|
||||
Output: list1 = [1, 2, 3, 4, 5], list2 = []
|
||||
(it means, by default, all pages are inserted at the end of the doc)
|
||||
"""
|
||||
if after and not before:
|
||||
if not type(after) == int:
|
||||
raise ValueError(
|
||||
"argument 'after' is supposed to be an int"
|
||||
)
|
||||
list1 = list(range(1, after + 1))
|
||||
list2 = list(range(after + 1, total + 1))
|
||||
return list1, list2
|
||||
|
||||
if not after and before:
|
||||
if not type(before) == int:
|
||||
raise ValueError(
|
||||
"argument 'before' is supposed to be an int"
|
||||
)
|
||||
list1 = list(range(1, before))
|
||||
list2 = list(range(before, total + 1))
|
||||
return list1, list2
|
||||
|
||||
list1 = list(range(1, total + 1))
|
||||
list2 = []
|
||||
|
||||
return list1, list2
|
||||
|
||||
|
||||
def paste_pages_into_existing_doc(
|
||||
src,
|
||||
dst,
|
||||
data_list,
|
||||
after_page_number=False,
|
||||
before_page_number=False
|
||||
):
|
||||
page_count = get_pagecount(src)
|
||||
list1, list2 = split_ranges(
|
||||
total=page_count,
|
||||
after=after_page_number,
|
||||
before=before_page_number
|
||||
)
|
||||
# notice missing A
|
||||
# Letter A is assignent to current folder and
|
||||
# pages from list1 and list2
|
||||
letters = "BCDEFGHIJKLMNOPQRSTUVWXYZ"
|
||||
letters_2_doc_map = []
|
||||
letters_pages = []
|
||||
letters_pages_before = []
|
||||
letters_pages_after = []
|
||||
|
||||
letters_2_doc_map.append(
|
||||
f"A={src}"
|
||||
)
|
||||
|
||||
for idx in range(0, len(data_list)):
|
||||
letter = letters[idx]
|
||||
src = data_list[idx]['src']
|
||||
pages = data_list[idx]['page_nums']
|
||||
|
||||
letters_2_doc_map.append(
|
||||
f"{letter}={src}"
|
||||
)
|
||||
for p in pages:
|
||||
letters_pages.append(
|
||||
f"{letter}{p}"
|
||||
)
|
||||
|
||||
for p in list1:
|
||||
letters_pages_before.append(
|
||||
f"A{p}"
|
||||
)
|
||||
|
||||
for p in list2:
|
||||
letters_pages_after.append(
|
||||
f"A{p}"
|
||||
)
|
||||
|
||||
cmd = [
|
||||
settings.BINARY_PDFTK,
|
||||
]
|
||||
# add A=doc1_path, B=doc2_path
|
||||
cmd.extend(letters_2_doc_map)
|
||||
|
||||
cmd.append("cat")
|
||||
|
||||
# existing doc pages (may be empty)
|
||||
cmd.extend(letters_pages_before)
|
||||
# newly inserted pages
|
||||
cmd.extend(letters_pages)
|
||||
# existing doc pages (may be empty)
|
||||
cmd.extend(letters_pages_after)
|
||||
|
||||
cmd.append("output")
|
||||
|
||||
cmd.append(dst)
|
||||
|
||||
run(cmd)
|
||||
|
||||
|
||||
def paste_pages(
|
||||
src,
|
||||
dst,
|
||||
data_list,
|
||||
dst_doc_is_new=True,
|
||||
after_page_number=False,
|
||||
before_page_number=False
|
||||
):
|
||||
"""
|
||||
dest_doc_ep = endpoint of the doc where newly created
|
||||
file will be placed.
|
||||
src_doc_ep_list is a list of following format:
|
||||
[
|
||||
{
|
||||
'doc_ep': doc_ep,
|
||||
'page_nums': [page_num_1, page_num_2, page_num_3]
|
||||
},
|
||||
{
|
||||
'doc_ep': doc_ep,
|
||||
'page_nums': [page_num_1, page_num_2, page_num_3]
|
||||
},
|
||||
...
|
||||
]
|
||||
src_doc_ep_list is a list of documents where pages
|
||||
(with numbers page_num_1...) will be paste from.
|
||||
|
||||
dst_doc_is_new = True well.. destination document was just created,
|
||||
we are pasting here cutted pages into some folder as new document.
|
||||
|
||||
In this case 'after' and 'before' arguments are ignored
|
||||
|
||||
dst_doc_is_new = False, pasting pages into exiting document.
|
||||
If before_page_number > 0 - paste pages before page number
|
||||
'before_page_number'
|
||||
If after_page_number > 0 - paste pages after page number
|
||||
'after_page_number'
|
||||
|
||||
before_page_number argument has priority over after_page_number.
|
||||
|
||||
If both before_page_number and after_page_number are < 0 - just paste
|
||||
pages at the end of the document.
|
||||
"""
|
||||
if not dst_doc_is_new:
|
||||
return paste_pages_into_existing_doc(
|
||||
src=src,
|
||||
dst=dst,
|
||||
data_list=data_list,
|
||||
after_page_number=after_page_number,
|
||||
before_page_number=before_page_number
|
||||
)
|
||||
letters = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
|
||||
letters_2_doc_map = []
|
||||
letters_pages = []
|
||||
|
||||
for idx in range(0, len(data_list)):
|
||||
letter = letters[idx]
|
||||
src = data_list[idx]['src']
|
||||
pages = data_list[idx]['page_nums']
|
||||
|
||||
letters_2_doc_map.append(
|
||||
f"{letter}={src}"
|
||||
)
|
||||
for p in pages:
|
||||
letters_pages.append(
|
||||
f"{letter}{p}"
|
||||
)
|
||||
|
||||
cmd = [
|
||||
settings.BINARY_PDFTK,
|
||||
]
|
||||
# add A=doc1_path, B=doc2_path
|
||||
cmd.extend(letters_2_doc_map)
|
||||
|
||||
cmd.append("cat")
|
||||
|
||||
cmd.extend(letters_pages)
|
||||
|
||||
cmd.append("output")
|
||||
|
||||
cmd.append(dst)
|
||||
|
||||
run(cmd)
|
||||
|
||||
|
||||
def reorder_pages(
|
||||
src, dst, new_order
|
||||
):
|
||||
"""
|
||||
new_order is a list of following format:
|
||||
|
||||
[
|
||||
{'page_num': 2, page_order: 1},
|
||||
{'page_num': 1, page_order: 2},
|
||||
{'page_num': 3, page_order: 3},
|
||||
{'page_num': 4, page_order: 4},
|
||||
]
|
||||
Example above means that in current document of 4 pages,
|
||||
first page was swapped with second one.
|
||||
page_num = older page order
|
||||
page_order = current page order
|
||||
So in human language, each hash is read:
|
||||
<page_num> now should be <page_order>
|
||||
"""
|
||||
page_count = get_pagecount(src)
|
||||
|
||||
cat_ranges = cat_ranges_for_reorder(
|
||||
page_count=page_count,
|
||||
new_order=new_order
|
||||
)
|
||||
|
||||
cmd = [
|
||||
settings.BINARY_PDFTK,
|
||||
src,
|
||||
"cat"
|
||||
]
|
||||
for page in cat_ranges:
|
||||
cmd.append(
|
||||
str(page)
|
||||
)
|
||||
|
||||
cmd.append("output")
|
||||
cmd.append(dst)
|
||||
run(cmd)
|
||||
|
||||
|
||||
def delete_pages(src, dst, page_numbers):
|
||||
page_count = get_pagecount(src)
|
||||
|
||||
cat_ranges = cat_ranges_for_delete(
|
||||
page_count,
|
||||
page_numbers
|
||||
)
|
||||
|
||||
cmd = [
|
||||
settings.BINARY_PDFTK,
|
||||
src,
|
||||
"cat"
|
||||
]
|
||||
for page in cat_ranges:
|
||||
cmd.append(
|
||||
str(page)
|
||||
)
|
||||
|
||||
cmd.append("output")
|
||||
cmd.append(dst)
|
||||
|
||||
run(cmd)
|
|
@ -1,5 +1,43 @@
|
|||
class Step:
|
||||
|
||||
# Q: What is ``Step`` and why it was a bad decision to introduce it?
|
||||
#
|
||||
# A: ``Step`` class is closely related to zooming in/zooming out
|
||||
# a specific page in the document in the frontend (javascript code).
|
||||
#
|
||||
# When user opens the document in document viewer, he/she actually
|
||||
# sees an image with text over it (text overlay). Text overlay is
|
||||
# created from hocr data. Very important point here, is that
|
||||
# text hocr data corresponds to (extracted, format jpeg) image of the page
|
||||
# of VERY SAME width/height. Again, hocr file and respective image file
|
||||
# of the page MUST HAVE SAME WIDTH AND HEIGHT.
|
||||
#
|
||||
# Each step is meant to be a specific zoom value of the page. Thus, step
|
||||
# 2, which corresonds to LIST[2] % = 75 % of the page initial logical size
|
||||
# of WIDTH_100p = 1240.
|
||||
# When user zooms in/zooms out - a new hocr file is downloaded
|
||||
# corresponding to that zoom step. As you may guess, user can zoom only
|
||||
# 125%, 100%, 75% and 50%. Value of 10% corresponds to thumbnail of the
|
||||
# document and does not count as 'real' step.
|
||||
#
|
||||
# Instead of doing this step thingy, it would have been better to drop
|
||||
# the entire step concept. Much better solution for zoom in/zoom out would
|
||||
# have been to download one SVG file for each page (instead of hocr) and
|
||||
# SVG file of respective page should contain embedded image
|
||||
# (binary jpeg; yes SVG format allows embedding of binary formats!) and
|
||||
# correctly mapped text overlay (built from hocr file). User later
|
||||
# can zoom in/zoom out using SVG transforations in frontend!
|
||||
#
|
||||
# The good things about SVG solutions are:
|
||||
#
|
||||
# * there will be 4X less OCR required (corresponding to
|
||||
# hOCR of each step minus thumbnail/10% step)
|
||||
# * will simplify front-end code as SVG (= hocr + jpeg) will be
|
||||
# generated on the on server side
|
||||
# * eliminate conept of Step entirely
|
||||
# (there will be only one SVG file per page)
|
||||
# * increase front-end and back-end performance as only one file SVG file
|
||||
# will be sent back and forth (from backend to frontend)
|
||||
#
|
||||
# width of a document when displayed as 100%.
|
||||
WIDTH_100p = 1240
|
||||
PERCENT = 100
|
||||
|
|
148
mglib/storage.py
148
mglib/storage.py
|
@ -4,7 +4,7 @@ import shutil
|
|||
from os import listdir
|
||||
from os.path import isdir, join
|
||||
|
||||
from mglib import pdftk
|
||||
from mglib import stapler
|
||||
from mglib.path import DocumentPath, PagePath
|
||||
from mglib.step import Steps
|
||||
from mglib.utils import get_assigns_after_delete, safe_to_delete
|
||||
|
@ -18,7 +18,7 @@ class Storage:
|
|||
on local host filesystem
|
||||
"""
|
||||
|
||||
def __init__(self, location=None):
|
||||
def __init__(self, location=None, **kwargs):
|
||||
# by default, this will be something like
|
||||
# settings.MEDIA_ROOT
|
||||
self._location = location
|
||||
|
@ -27,6 +27,15 @@ class Storage:
|
|||
def location(self):
|
||||
return self._location
|
||||
|
||||
def upload(self, doc_path_url, **kwargs):
|
||||
pass
|
||||
|
||||
def download(self, doc_path_url, **kwargs):
|
||||
pass
|
||||
|
||||
def _s3copy(self, src, dst):
|
||||
pass
|
||||
|
||||
def make_sure_path_exists(self, filepath):
|
||||
logger.debug(f"make_sure_path_exists {filepath}")
|
||||
dirname = os.path.dirname(filepath)
|
||||
|
@ -35,6 +44,39 @@ class Storage:
|
|||
exist_ok=True
|
||||
)
|
||||
|
||||
def get_versions(self, doc_path):
|
||||
"""
|
||||
Returns a list of (all) ordered versions
|
||||
of specific doc_path. Versions
|
||||
start with 0. Examples of return values:
|
||||
|
||||
- [0, 1, 2, 3] = 4 versions of the document
|
||||
- [ 0 ] = only one version (original)
|
||||
|
||||
To count versions it just counts number of subfolders
|
||||
in specific document folder. Versions are
|
||||
stored in subfolders named v1, v2, v3, ...
|
||||
"""
|
||||
abs_dirname_docs = self.path(
|
||||
doc_path.dirname_docs
|
||||
)
|
||||
try:
|
||||
only_dirs = [
|
||||
fi for fi in listdir(abs_dirname_docs) if isdir(
|
||||
join(abs_dirname_docs, fi)
|
||||
)
|
||||
]
|
||||
except FileNotFoundError:
|
||||
# in tests, document folders are not always created.
|
||||
# If no document folder is found, just return [ 0 ]
|
||||
# i.e that document has only one single version and it
|
||||
# is the latest one.
|
||||
return [0]
|
||||
|
||||
dirs_count = len(only_dirs)
|
||||
|
||||
return list(range(0, dirs_count + 1))
|
||||
|
||||
def get_pagecount(self, doc_path):
|
||||
"""
|
||||
Returns total number of pages for this doc_path.
|
||||
|
@ -44,7 +86,7 @@ class Storage:
|
|||
doc_path_pointing_to_results = DocumentPath.copy_from(
|
||||
doc_path, aux_dir="results"
|
||||
)
|
||||
pages_dir = self.abspath(doc_path_pointing_to_results.pages_dirname)
|
||||
pages_dir = self.abspath(doc_path_pointing_to_results.pages_dirname())
|
||||
|
||||
only_dirs = [
|
||||
fi for fi in listdir(pages_dir) if isdir(join(pages_dir, fi))
|
||||
|
@ -98,7 +140,7 @@ class Storage:
|
|||
if os.path.exists(abs_dirname_results):
|
||||
os.rmdir(abs_dirname_results)
|
||||
|
||||
def copy_doc(self, src, dst):
|
||||
def copy_doc(self, src: DocumentPath, dst: DocumentPath):
|
||||
"""
|
||||
copy given file src file path to destination
|
||||
as absolute doc_path
|
||||
|
@ -117,7 +159,7 @@ class Storage:
|
|||
f"copy_doc: {src} to {dst}"
|
||||
)
|
||||
shutil.copyfile(
|
||||
src,
|
||||
self.abspath(src),
|
||||
self.abspath(dst)
|
||||
)
|
||||
|
||||
|
@ -126,24 +168,60 @@ class Storage:
|
|||
self.path(_path)
|
||||
)
|
||||
|
||||
def copy_page(self, src_page_path, dst_page_path):
|
||||
err_msg = "copy_page accepts only PageEp instances"
|
||||
def copy_page_txt(self, src_page_path, dst_page_path):
|
||||
|
||||
self.make_sure_path_exists(
|
||||
self.abspath(dst_page_path.txt_url())
|
||||
)
|
||||
|
||||
src_txt = self.abspath(src_page_path.txt_url())
|
||||
dst_txt = self.abspath(dst_page_path.txt_url())
|
||||
|
||||
logger.debug(f"copy src_txt={src_txt} dst_txt={dst_txt}")
|
||||
shutil.copy(src_txt, dst_txt)
|
||||
|
||||
def copy_page_img(self, src_page_path, dst_page_path):
|
||||
|
||||
self.make_sure_path_exists(
|
||||
self.abspath(dst_page_path.img_url())
|
||||
)
|
||||
|
||||
src_img = self.abspath(src_page_path.img_url())
|
||||
dst_img = self.abspath(dst_page_path.img_url())
|
||||
logger.debug(f"copy src_img={src_img} dst_img={dst_img}")
|
||||
shutil.copy(src_img, dst_img)
|
||||
|
||||
def copy_page_hocr(self, src_page_path, dst_page_path):
|
||||
|
||||
self.make_sure_path_exists(
|
||||
self.abspath(dst_page_path.hocr_url())
|
||||
)
|
||||
|
||||
src_hocr = self.abspath(src_page_path.hocr_url())
|
||||
dst_hocr = self.abspath(dst_page_path.hocr_url())
|
||||
logger.debug(f"copy src_hocr={src_hocr} dst_hocr={dst_hocr}")
|
||||
shutil.copy(src_hocr, dst_hocr)
|
||||
|
||||
def copy_page(self, src_page_path, dst_page_path):
|
||||
"""
|
||||
Copies page data from source to destination.
|
||||
|
||||
Page data are files with following extentions:
|
||||
* txt
|
||||
* hocr
|
||||
* jpeg
|
||||
they are located in media root of respective application.
|
||||
"""
|
||||
for inst in [src_page_path, dst_page_path]:
|
||||
if not isinstance(inst, PagePath):
|
||||
raise ValueError(err_msg)
|
||||
raise ValueError("copy_page accepts only PagePath instances")
|
||||
|
||||
# copy .txt file
|
||||
if self.exists(src_page_path.txt_url()):
|
||||
|
||||
self.make_sure_path_exists(
|
||||
self.abspath(dst_page_path.txt_url())
|
||||
self.copy_page_txt(
|
||||
src_page_path=src_page_path,
|
||||
dst_page_path=dst_page_path
|
||||
)
|
||||
|
||||
src_txt = self.abspath(src_page_path.txt_url())
|
||||
dst_txt = self.abspath(dst_page_path.txt_url())
|
||||
logger.debug(f"copy src_txt={src_txt} dst_txt={dst_txt}")
|
||||
shutil.copy(src_txt, dst_txt)
|
||||
else:
|
||||
logger.debug(
|
||||
f"txt does not exits {src_page_path.txt_url()}"
|
||||
|
@ -151,28 +229,20 @@ class Storage:
|
|||
|
||||
# hocr
|
||||
if self.exists(src_page_path.hocr_url()):
|
||||
self.make_sure_path_exists(
|
||||
self.abspath(dst_page_path.hocr_url())
|
||||
self.copy_page_hocr(
|
||||
src_page_path=src_page_path,
|
||||
dst_page_path=dst_page_path
|
||||
)
|
||||
|
||||
src_hocr = self.abspath(src_page_path.hocr_url())
|
||||
dst_hocr = self.abspath(dst_page_path.hocr_url())
|
||||
logger.debug(f"copy src_hocr={src_hocr} dst_hocr={dst_hocr}")
|
||||
shutil.copy(src_hocr, dst_hocr)
|
||||
else:
|
||||
logger.debug(
|
||||
f"hocr does not exits {src_page_path.hocr_url()}"
|
||||
)
|
||||
|
||||
if src_page_path.img_url():
|
||||
self.make_sure_path_exists(
|
||||
self.abspath(dst_page_path.img_url())
|
||||
self.copy_page_img(
|
||||
src_page_path=src_page_path,
|
||||
dst_page_path=dst_page_path
|
||||
)
|
||||
|
||||
src_img = self.abspath(src_page_path.img_url())
|
||||
dst_img = self.abspath(dst_page_path.img_url())
|
||||
logger.debug(f"copy src_img={src_img} dst_img={dst_img}")
|
||||
shutil.copy(src_img, dst_img)
|
||||
else:
|
||||
logger.debug(
|
||||
f"img does not exits {src_page_path.img_url()}"
|
||||
|
@ -209,7 +279,7 @@ class Storage:
|
|||
self.abspath(dst_doc_path)
|
||||
)
|
||||
|
||||
pdftk.reorder_pages(
|
||||
stapler.reorder_pages(
|
||||
src=self.abspath(src_doc_path),
|
||||
dst=self.abspath(dst_doc_path),
|
||||
new_order=new_order
|
||||
|
@ -269,7 +339,7 @@ class Storage:
|
|||
self.make_sure_path_exists(
|
||||
self.abspath(dst_doc_path)
|
||||
)
|
||||
pdftk.delete_pages(
|
||||
stapler.delete_pages(
|
||||
self.abspath(src_doc_path),
|
||||
self.abspath(dst_doc_path),
|
||||
page_numbers
|
||||
|
@ -324,15 +394,23 @@ class Storage:
|
|||
from src_doc_path. Both dest and src are instances of
|
||||
mglib.path.DocumentPath
|
||||
"""
|
||||
next_version = 0
|
||||
if dest_doc_is_new:
|
||||
# document is new, start version with 0
|
||||
next_version = 0
|
||||
else:
|
||||
# destination document is not new, increment its version
|
||||
next_version = dest_doc_path.version + 1
|
||||
|
||||
next_ver_dp = DocumentPath.copy_from(
|
||||
dest_doc_path,
|
||||
version=dest_doc_path.version + 1
|
||||
version=next_version
|
||||
)
|
||||
self.make_sure_path_exists(
|
||||
self.abspath(next_ver_dp)
|
||||
)
|
||||
|
||||
pdftk.paste_pages(
|
||||
stapler.paste_pages(
|
||||
src=self.abspath(dest_doc_path),
|
||||
dst=self.abspath(next_ver_dp),
|
||||
data_list=data_list,
|
||||
|
@ -381,7 +459,7 @@ class Storage:
|
|||
)
|
||||
dest_page_num += 1
|
||||
|
||||
return dest_doc_path.version + 1
|
||||
return next_version
|
||||
|
||||
|
||||
class FileSystemStorage(Storage):
|
||||
|
|
|
@ -7,16 +7,34 @@ from .conf import settings
|
|||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def convert_tiff2pdf(doc_url):
|
||||
def pdfname_from_tiffname(doc_url):
|
||||
"""
|
||||
Given tiff document url, will return
|
||||
respective pdf file name. Returned
|
||||
file name can be use used as destination
|
||||
for tiff2pdf tool.
|
||||
|
||||
logger.debug(f"convert_tiff2pdf for {doc_url}")
|
||||
Returns a tuple (new_doc_url, new_filename).
|
||||
new_doc_url - is new absolute path to the pdf file
|
||||
new_filename - is new pdf filename
|
||||
"""
|
||||
# basename is filename + ext (no path)
|
||||
|
||||
basename = os.path.basename(doc_url)
|
||||
base_root, base_ext = os.path.splitext(basename)
|
||||
root, ext = os.path.splitext(doc_url)
|
||||
new_doc_url = f"{root}.pdf"
|
||||
|
||||
return new_doc_url, f"{base_root}.pdf"
|
||||
|
||||
|
||||
def convert_tiff2pdf(doc_url):
|
||||
|
||||
logger.debug(f"convert_tiff2pdf for {doc_url}")
|
||||
|
||||
new_doc_url, new_filename = pdfname_from_tiffname(
|
||||
doc_url
|
||||
)
|
||||
|
||||
logger.debug(
|
||||
f"tiff2pdf source={doc_url} dest={new_doc_url}"
|
||||
)
|
||||
|
@ -30,4 +48,4 @@ def convert_tiff2pdf(doc_url):
|
|||
run(cmd)
|
||||
|
||||
# returns new filename
|
||||
return f"{base_root}.pdf"
|
||||
return new_filename
|
||||
|
|
|
@ -43,7 +43,7 @@ def safe_to_delete(place):
|
|||
for root, dirs, files in os.walk(place):
|
||||
for name in files:
|
||||
base, ext = os.path.splitext(name)
|
||||
if ext not in SAFE_EXTENSIONS:
|
||||
if ext.lower() not in SAFE_EXTENSIONS:
|
||||
logger.warning(
|
||||
f"Trying to delete unsefe location: "
|
||||
f"extention={ext} not found in {SAFE_EXTENSIONS}"
|
||||
|
|
|
@ -0,0 +1,16 @@
|
|||
[metadata]
|
||||
name = mglib
|
||||
version = 1.3.9
|
||||
description = Common code used across all Papermerge project utilities
|
||||
long_description = file: README.rst
|
||||
url = https://www.papermerge.com/
|
||||
author = Eugen Ciur
|
||||
author_email = eugen@papermerge.com
|
||||
keywords= common, package, shared, papermerge, pdf, ocr, dms
|
||||
license = Apache 2.0 License
|
||||
classifiers =
|
||||
Programming Language :: Python :: 3
|
||||
Programming Language :: Python :: 3 :: Only
|
||||
Programming Language :: Python :: 3.7
|
||||
License :: OSI Approved :: Apache Software License
|
||||
Operating System :: OS Independent
|
19
setup.py
19
setup.py
|
@ -1,25 +1,6 @@
|
|||
from setuptools import find_packages, setup
|
||||
|
||||
with open("README.md", "r") as fh:
|
||||
long_description = fh.read()
|
||||
|
||||
|
||||
setup(
|
||||
name="mglib",
|
||||
version="1.3.0",
|
||||
author="Eugen Ciur",
|
||||
author_email="eugen@papermerge.com",
|
||||
url="https://github.com/papermerge/mglib",
|
||||
description="Common code used across all Papermerge project utilities",
|
||||
long_description=long_description,
|
||||
long_description_content_type="text/markdown",
|
||||
license="Apache 2.0 License",
|
||||
keywords="common, package, shared, papermerge, pdf, ocr, dms",
|
||||
packages=find_packages(),
|
||||
classifiers=[
|
||||
"Programming Language :: Python :: 3",
|
||||
"License :: OSI Approved :: Apache Software License",
|
||||
"Operating System :: OS Independent",
|
||||
],
|
||||
python_requires='>=3.7',
|
||||
)
|
||||
|
|
|
@ -21,3 +21,16 @@ class TestConvert(unittest.TestCase):
|
|||
self.assertTrue(
|
||||
mime_type.is_pdf()
|
||||
)
|
||||
|
||||
def test_get_mime_type(self):
|
||||
|
||||
file_path = os.path.join(
|
||||
DATA_DIR,
|
||||
"berlin.pdf"
|
||||
)
|
||||
mime_type = mime.Mime(filepath=file_path)
|
||||
|
||||
self.assertEquals(
|
||||
mime_type.guess(),
|
||||
"application/pdf"
|
||||
)
|
||||
|
|
|
@ -19,6 +19,47 @@ class TestDocumentPath(unittest.TestCase):
|
|||
"docs/user_1/document_3/x.pdf"
|
||||
)
|
||||
|
||||
def test_document_url_with_another_version(self):
|
||||
|
||||
doc_ep = DocumentPath(
|
||||
user_id=1,
|
||||
document_id=15,
|
||||
file_name="x.pdf"
|
||||
)
|
||||
self.assertEqual(
|
||||
doc_ep.url(version=3),
|
||||
"docs/user_1/document_15/v3/x.pdf"
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
doc_ep.url(version=2),
|
||||
"docs/user_1/document_15/v2/x.pdf"
|
||||
)
|
||||
|
||||
def test_document_url_none_vs_0(self):
|
||||
doc_ep = DocumentPath(
|
||||
user_id=1,
|
||||
document_id=15,
|
||||
file_name="x.pdf"
|
||||
)
|
||||
doc_ep.inc_version() # current version = 1
|
||||
doc_ep.inc_version() # current version = 2
|
||||
doc_ep.inc_version() # current version = 3
|
||||
|
||||
self.assertEqual(
|
||||
# with version == None, latest version of the document
|
||||
# will be returned, which is 3
|
||||
doc_ep.url(version=None),
|
||||
"docs/user_1/document_15/v3/x.pdf"
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
# with version == 0, version 0 will be provided
|
||||
# i.e. version=0 returns original doc.
|
||||
doc_ep.url(version=0),
|
||||
"docs/user_1/document_15/x.pdf"
|
||||
)
|
||||
|
||||
def test_inc_version(self):
|
||||
"""
|
||||
Document endpoints are now versioned.
|
||||
|
@ -48,6 +89,13 @@ class TestDocumentPath(unittest.TestCase):
|
|||
"docs/user_1/document_3/v2/x.pdf"
|
||||
)
|
||||
|
||||
# however, explicit version can be forced
|
||||
# by providing an argument to url method.
|
||||
self.assertEqual(
|
||||
doc_ep.url(version=1),
|
||||
"docs/user_1/document_3/v1/x.pdf"
|
||||
)
|
||||
|
||||
def test_dirname(self):
|
||||
ep = DocumentPath(
|
||||
user_id=1,
|
||||
|
@ -56,7 +104,7 @@ class TestDocumentPath(unittest.TestCase):
|
|||
file_name="x.pdf"
|
||||
)
|
||||
self.assertEqual(
|
||||
ep.dirname,
|
||||
ep.dirname(),
|
||||
"results/user_1/document_3/"
|
||||
)
|
||||
|
||||
|
@ -68,7 +116,7 @@ class TestDocumentPath(unittest.TestCase):
|
|||
file_name="x.pdf"
|
||||
)
|
||||
self.assertEqual(
|
||||
ep.pages_dirname,
|
||||
ep.pages_dirname(),
|
||||
"results/user_1/document_3/pages/"
|
||||
)
|
||||
|
||||
|
|
|
@ -1,145 +0,0 @@
|
|||
import os
|
||||
import unittest
|
||||
from unittest import mock
|
||||
from mglib import pdftk
|
||||
from mglib.conf import settings
|
||||
from mglib.runcmd import run
|
||||
|
||||
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
|
||||
DATA_DIR = os.path.join(BASE_DIR, "data")
|
||||
|
||||
|
||||
class TestPdfLib(unittest.TestCase):
|
||||
def test_ranges_for_reorder(self):
|
||||
actual = pdftk.cat_ranges_for_reorder(4, [
|
||||
{"page_order": 1, "page_num": 4},
|
||||
{"page_order": 2, "page_num": 3},
|
||||
{"page_order": 3, "page_num": 2},
|
||||
{"page_order": 4, "page_num": 1}
|
||||
])
|
||||
expected = [4,3,2,1]
|
||||
assert expected == actual
|
||||
|
||||
self.assertRaises(ValueError, pdftk.cat_ranges_for_reorder, 2, [])
|
||||
self.assertRaises(KeyError, pdftk.cat_ranges_for_reorder, 2, [
|
||||
{"page_order": 3, "page_num": 4},
|
||||
{"page_order": 5, "page_num": 6}
|
||||
])
|
||||
|
||||
def test_delete_pages(self):
|
||||
input_file = os.path.join(DATA_DIR, "berlin.pdf")
|
||||
output_file = os.path.join(DATA_DIR, "berlin2.pdf")
|
||||
|
||||
with mock.patch("mglib.pdftk.run") as run_func:
|
||||
pdftk.delete_pages(input_file, output_file, [1])
|
||||
run_func.assert_called()
|
||||
run_func.assert_called_with(
|
||||
[settings.BINARY_PDFTK, input_file, "cat", "2", "output", output_file]
|
||||
)
|
||||
|
||||
def test_cat_ranges_for_delete(self):
|
||||
page_count = 22
|
||||
page_numbers = range(1, 23)
|
||||
|
||||
actual = pdftk.cat_ranges_for_delete(page_count, [21])
|
||||
expected = list(page_numbers)
|
||||
expected.remove(21)
|
||||
assert actual == expected
|
||||
|
||||
actual = pdftk.cat_ranges_for_delete(page_count, [1])
|
||||
expected = list(page_numbers)
|
||||
expected.remove(1)
|
||||
assert actual == expected
|
||||
|
||||
actual = pdftk.cat_ranges_for_delete(page_count, [1, 7, 10])
|
||||
expected = list(page_numbers)
|
||||
expected.remove(1)
|
||||
expected.remove(7)
|
||||
expected.remove(10)
|
||||
assert actual == expected
|
||||
|
||||
self.assertRaises(ValueError, pdftk.cat_ranges_for_delete, page_count, ["1"])
|
||||
|
||||
def test_split_ranges(self):
|
||||
page_count = 9
|
||||
page_numbers = list(range(1, 10))
|
||||
|
||||
self.assertRaises(ValueError, pdftk.split_ranges, 9, after="a", before=False)
|
||||
self.assertRaises(ValueError, pdftk.split_ranges, 9, after=False, before=True)
|
||||
|
||||
actual1, actual2 = pdftk.split_ranges(page_count, 1, False)
|
||||
expected1 = [1]
|
||||
expected2 = [2, 3, 4, 5, 6, 7, 8, 9]
|
||||
assert actual1 == expected1
|
||||
assert actual2 == expected2
|
||||
|
||||
actual1, actual2 = pdftk.split_ranges(page_count, False, 2)
|
||||
expected1 = [1]
|
||||
expected2 = [2, 3, 4, 5, 6, 7, 8, 9]
|
||||
assert actual1 == expected1
|
||||
assert actual2 == expected2
|
||||
|
||||
actual1, actual2 = pdftk.split_ranges(page_count)
|
||||
expected1 = list(range(1, page_count + 1))
|
||||
expected2 = []
|
||||
assert actual1 == expected1
|
||||
assert actual2 == expected2
|
||||
|
||||
def test_reorder_pages(self):
|
||||
input_file = os.path.join(DATA_DIR, "berlin.pdf")
|
||||
output_file = os.path.join(DATA_DIR, "berlin2.pdf")
|
||||
new_order = [
|
||||
{'page_num': 2, 'page_order': 1},
|
||||
{'page_num': 1, 'page_order': 2},
|
||||
]
|
||||
|
||||
with mock.patch("mglib.pdftk.run") as run_func:
|
||||
pdftk.reorder_pages(input_file, output_file, new_order)
|
||||
run_func.assert_called()
|
||||
run_func.assert_called_with(
|
||||
[settings.BINARY_PDFTK, input_file, "cat", "2", "1", "output", output_file]
|
||||
)
|
||||
|
||||
def test_paste_pages_into_existing_doc(self):
|
||||
input_file = os.path.join(DATA_DIR, "berlin.pdf")
|
||||
output_file = os.path.join(DATA_DIR, "berlin2.pdf")
|
||||
datalist = []
|
||||
|
||||
with mock.patch("mglib.pdftk.run") as run_func:
|
||||
pdftk.paste_pages_into_existing_doc(input_file, output_file, datalist)
|
||||
run_func.assert_called()
|
||||
run_func.assert_called_with(
|
||||
[settings.BINARY_PDFTK, "A=" + input_file, "cat", "A1", "A2", "output", output_file]
|
||||
)
|
||||
|
||||
datalist = [{"src": input_file, "page_nums": "34"}]
|
||||
|
||||
with mock.patch("mglib.pdftk.run") as run_func:
|
||||
pdftk.paste_pages_into_existing_doc(input_file, output_file, datalist, 1)
|
||||
run_func.assert_called()
|
||||
run_func.assert_called_with(
|
||||
[settings.BINARY_PDFTK, "A=" + input_file, "B=" + input_file, "cat", "A1", "B3",
|
||||
"B4", "A2", "output", output_file]
|
||||
)
|
||||
def test_paste_pages(self):
|
||||
input_file = os.path.join(DATA_DIR, "berlin.pdf")
|
||||
output_file = os.path.join(DATA_DIR, "berlin2.pdf")
|
||||
datalist = []
|
||||
|
||||
with mock.patch("mglib.pdftk.run") as run_func:
|
||||
pdftk.paste_pages(input_file, output_file, datalist, False)
|
||||
run_func.assert_called()
|
||||
run_func.assert_called_with(
|
||||
[settings.BINARY_PDFTK, "A=" + input_file, "cat", "A1", "A2", "output", output_file]
|
||||
)
|
||||
|
||||
datalist = [{"src": input_file, "page_nums": "34"}]
|
||||
|
||||
with mock.patch("mglib.pdftk.run") as run_func:
|
||||
pdftk.paste_pages(input_file, output_file, datalist)
|
||||
run_func.assert_called()
|
||||
run_func.assert_called_with(
|
||||
[settings.BINARY_PDFTK, "A=" + input_file, "cat", "A3", "A4",
|
||||
"output", output_file]
|
||||
)
|
|
@ -3,7 +3,6 @@ import unittest
|
|||
from unittest import mock
|
||||
from mglib import stapler
|
||||
from mglib.conf import settings
|
||||
from mglib.runcmd import run
|
||||
|
||||
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
|
||||
|
@ -17,15 +16,15 @@ class TestPdfLib(unittest.TestCase):
|
|||
{"page_order": 2, "page_num": 3},
|
||||
{"page_order": 3, "page_num": 2},
|
||||
{"page_order": 4, "page_num": 1}
|
||||
])
|
||||
expected = [4,3,2,1]
|
||||
])
|
||||
expected = [4, 3, 2, 1]
|
||||
assert expected == actual
|
||||
|
||||
self.assertRaises(ValueError, stapler.cat_ranges_for_reorder, 2, [])
|
||||
self.assertRaises(KeyError, stapler.cat_ranges_for_reorder, 2, [
|
||||
{"page_order": 3, "page_num": 4},
|
||||
{"page_order": 5, "page_num": 6}
|
||||
])
|
||||
])
|
||||
|
||||
def test_delete_pages(self):
|
||||
input_file = os.path.join(DATA_DIR, "berlin.pdf")
|
||||
|
@ -38,13 +37,22 @@ class TestPdfLib(unittest.TestCase):
|
|||
[settings.BINARY_STAPLER, "del", input_file, "1", output_file]
|
||||
)
|
||||
|
||||
|
||||
def test_split_ranges(self):
|
||||
page_count = 9
|
||||
page_numbers = list(range(1, 10))
|
||||
|
||||
self.assertRaises(ValueError, stapler.split_ranges, 9, after="a", before=False)
|
||||
self.assertRaises(ValueError, stapler.split_ranges, 9, after=False, before=True)
|
||||
self.assertRaises(
|
||||
ValueError,
|
||||
stapler.split_ranges,
|
||||
9,
|
||||
after="a",
|
||||
before=False
|
||||
)
|
||||
self.assertRaises(
|
||||
ValueError,
|
||||
stapler.split_ranges,
|
||||
9, after=False,
|
||||
before=True
|
||||
)
|
||||
|
||||
actual1, actual2 = stapler.split_ranges(page_count, 1, False)
|
||||
expected1 = [1]
|
||||
|
@ -67,54 +75,81 @@ class TestPdfLib(unittest.TestCase):
|
|||
def test_reorder_pages(self):
|
||||
input_file = os.path.join(DATA_DIR, "berlin.pdf")
|
||||
output_file = os.path.join(DATA_DIR, "berlin2.pdf")
|
||||
new_order = [
|
||||
{'page_num': 2, 'page_order': 1},
|
||||
{'page_num': 1, 'page_order': 2},
|
||||
]
|
||||
new_order = [
|
||||
{'page_num': 2, 'page_order': 1},
|
||||
{'page_num': 1, 'page_order': 2},
|
||||
]
|
||||
|
||||
with mock.patch("mglib.stapler.run") as run_func:
|
||||
stapler.reorder_pages(input_file, output_file, new_order)
|
||||
run_func.assert_called()
|
||||
run_func.assert_called_with(
|
||||
[settings.BINARY_STAPLER, "sel", input_file, "2", "1", output_file]
|
||||
[
|
||||
settings.BINARY_STAPLER,
|
||||
"sel",
|
||||
input_file,
|
||||
"2",
|
||||
"1",
|
||||
output_file
|
||||
]
|
||||
)
|
||||
|
||||
def test_paste_pages_into_existing_doc(self):
|
||||
input_file = os.path.join(DATA_DIR, "berlin.pdf")
|
||||
output_file = os.path.join(DATA_DIR, "berlin2.pdf")
|
||||
datalist = []
|
||||
datalist = []
|
||||
|
||||
with mock.patch("mglib.stapler.run") as run_func:
|
||||
stapler.paste_pages_into_existing_doc(input_file, output_file, datalist)
|
||||
stapler.paste_pages_into_existing_doc(
|
||||
input_file, output_file, datalist
|
||||
)
|
||||
run_func.assert_called()
|
||||
run_func.assert_called_with(
|
||||
[settings.BINARY_STAPLER, "sel", "A=" + input_file, "A1", "A2", output_file]
|
||||
[
|
||||
settings.BINARY_STAPLER,
|
||||
"sel", "A=" + input_file, "A1", "A2", output_file
|
||||
]
|
||||
)
|
||||
|
||||
datalist = [{"src": input_file, "page_nums": "34"}]
|
||||
datalist = [{"src": input_file, "page_nums": "34"}]
|
||||
|
||||
with mock.patch("mglib.stapler.run") as run_func:
|
||||
stapler.paste_pages_into_existing_doc(input_file, output_file, datalist, 1)
|
||||
stapler.paste_pages_into_existing_doc(
|
||||
input_file,
|
||||
output_file,
|
||||
datalist,
|
||||
1
|
||||
)
|
||||
run_func.assert_called()
|
||||
run_func.assert_called_with(
|
||||
[settings.BINARY_STAPLER, "sel", "A=" + input_file, "B=" + input_file, "A1", "B3",
|
||||
"B4", "A2", output_file]
|
||||
[
|
||||
settings.BINARY_STAPLER,
|
||||
"sel", "A=" + input_file,
|
||||
"B=" + input_file, "A1", "B3",
|
||||
"B4", "A2", output_file
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
def test_paste_pages(self):
|
||||
input_file = os.path.join(DATA_DIR, "berlin.pdf")
|
||||
output_file = os.path.join(DATA_DIR, "berlin2.pdf")
|
||||
datalist = []
|
||||
datalist = []
|
||||
|
||||
with mock.patch("mglib.stapler.run") as run_func:
|
||||
stapler.paste_pages(input_file, output_file, datalist, False)
|
||||
run_func.assert_called()
|
||||
run_func.assert_called_with(
|
||||
[settings.BINARY_STAPLER, "sel", "A=" + input_file, "A1", "A2", output_file]
|
||||
[
|
||||
settings.BINARY_STAPLER,
|
||||
"sel",
|
||||
"A=" + input_file,
|
||||
"A1",
|
||||
"A2",
|
||||
output_file
|
||||
]
|
||||
)
|
||||
|
||||
datalist = [{"src": input_file, "page_nums": "34"}]
|
||||
datalist = [{"src": input_file, "page_nums": "34"}]
|
||||
|
||||
with mock.patch("mglib.stapler.run") as run_func:
|
||||
stapler.paste_pages(input_file, output_file, datalist)
|
||||
|
|
|
@ -41,3 +41,49 @@ class TestStorage(unittest.TestCase):
|
|||
f1.exists()
|
||||
)
|
||||
|
||||
def test_get_versions_1(self):
|
||||
storage = FileSystemStorage(location=MEDIA_ROOT)
|
||||
|
||||
with TemporaryNode(MEDIA_ROOT) as media_root:
|
||||
docs = media_root.add_folder("docs")
|
||||
res = media_root.add_folder("results")
|
||||
f1 = docs.add_folder("user_1/document_2")
|
||||
f1.add_file("doku.pdf")
|
||||
# simulate 2 versions of the document.
|
||||
f1.add_folder("v1")
|
||||
f1.add_folder("v2")
|
||||
res.add_folder("user_1/document_2/pages")
|
||||
|
||||
doc_path = DocumentPath(
|
||||
user_id=1,
|
||||
document_id=2,
|
||||
file_name='doku.pdf',
|
||||
version=2
|
||||
)
|
||||
versions = storage.get_versions(doc_path)
|
||||
|
||||
self.assertEqual(
|
||||
versions, [0, 1, 2]
|
||||
)
|
||||
|
||||
def test_get_versions_2(self):
|
||||
storage = FileSystemStorage(location=MEDIA_ROOT)
|
||||
|
||||
with TemporaryNode(MEDIA_ROOT) as media_root:
|
||||
docs = media_root.add_folder("docs")
|
||||
f1 = docs.add_folder("user_1/document_2")
|
||||
f1.add_file("doku.pdf")
|
||||
|
||||
doc_path = DocumentPath(
|
||||
user_id=1,
|
||||
document_id=2,
|
||||
file_name='doku.pdf',
|
||||
version=2
|
||||
)
|
||||
versions = storage.get_versions(doc_path)
|
||||
|
||||
# document has only one version - the latest
|
||||
self.assertEqual(
|
||||
versions, [0]
|
||||
)
|
||||
|
||||
|
|
Loading…
Reference in New Issue