Module fpdf.util
Expand source code
import gc, os, warnings
from datetime import datetime, timezone
from numbers import Number
from tracemalloc import get_traced_memory, is_tracing
from typing import Iterable, Tuple, Union
# default block size from src/libImaging/Storage.c:
PIL_MEM_BLOCK_SIZE_IN_MIB = 16
def buffer_subst(buffer, placeholder, value):
buffer_size = len(buffer)
assert len(placeholder) == len(value), f"placeholder={placeholder} value={value}"
buffer = buffer.replace(placeholder.encode(), value.encode(), 1)
assert len(buffer) == buffer_size
return buffer
def format_date(date: datetime, with_tz=False) -> str:
if with_tz:
assert date.tzinfo
if date.tzinfo == timezone.utc:
str_date = f"D:{date:%Y%m%d%H%M%SZ%H'%M'}"
else:
str_date = f"D:{date:%Y%m%d%H%M%S%z}"
str_date = str_date[:-2] + "'" + str_date[-2:] + "'"
return enclose_in_parens(str_date)
return enclose_in_parens(f"D:{date:%Y%m%d%H%M%S}")
def enclose_in_parens(s):
"""Format a text string"""
if s:
assert isinstance(s, str)
return f"({escape_parens(s)})"
return ""
def escape_parens(s):
"""Add a backslash character before , ( and )"""
if isinstance(s, str):
return (
s.replace("\\", "\\\\")
.replace(")", "\\)")
.replace("(", "\\(")
.replace("\r", "\\r")
)
return (
s.replace(b"\\", b"\\\\")
.replace(b")", b"\\)")
.replace(b"(", b"\\(")
.replace(b"\r", b"\\r")
)
# shortcut to bytes conversion (b prefix)
def b(s):
if isinstance(s, str):
return s.encode("latin1")
if isinstance(s, int):
return bytes([s]) # http://bugs.python.org/issue4588
raise ValueError(f"Invalid input: {s}")
def get_scale_factor(unit: Union[str, Number]) -> float:
"""
Get how many pts are in a unit. (k)
Args:
unit (str, float, int): Any of "pt", "mm", "cm", "in", or a number.
Returns:
float: The number of points in that unit (assuming 72dpi)
Raises:
ValueError
"""
if isinstance(unit, Number):
return float(unit)
if unit == "pt":
return 1
if unit == "mm":
return 72 / 25.4
if unit == "cm":
return 72 / 2.54
if unit == "in":
return 72.0
raise ValueError(f"Incorrect unit: {unit}")
def convert_unit(
to_convert: Union[float, int, Iterable[Union[float, int, Iterable]]],
old_unit: Union[str, Number],
new_unit: Union[str, Number],
) -> Union[float, tuple]:
"""
Convert a number or sequence of numbers from one unit to another.
If either unit is a number it will be treated as the number of points per unit. So 72 would mean 1 inch.
Args:
to_convert (float, int, Iterable): The number / list of numbers, or points, to convert
old_unit (str, float, int): A unit accepted by fpdf.FPDF or a number
new_unit (str, float, int): A unit accepted by fpdf.FPDF or a number
Returns:
(float, tuple): to_convert converted from old_unit to new_unit or a tuple of the same
"""
unit_conversion_factor = get_scale_factor(new_unit) / get_scale_factor(old_unit)
if isinstance(to_convert, Iterable):
return tuple(
map(lambda i: convert_unit(i, 1, unit_conversion_factor), to_convert)
)
return to_convert / unit_conversion_factor
################################################################################
################### Utility functions to track memory usage ####################
################################################################################
def print_mem_usage(prefix):
print(get_mem_usage(prefix))
def get_mem_usage(prefix) -> str:
_collected_count = gc.collect()
rss = get_process_rss()
# heap_size, stack_size = get_process_heap_and_stack_sizes()
# objs_size_sum = get_gc_managed_objs_total_size()
pillow = get_pillow_allocated_memory()
if is_tracing():
malloc_stats = get_tracemalloc_traced_memory()
else:
malloc_stats = get_pymalloc_allocated_over_total_size()
return f"{prefix:<40} Malloc stats: {malloc_stats} | Pillow: {pillow} | Process RSS: {rss}"
def get_process_rss() -> str:
rss_as_mib = get_process_rss_as_mib()
if rss_as_mib:
return f"{rss_as_mib:.1f} MiB"
return "<unavailable>"
def get_process_rss_as_mib() -> Union[Number, None]:
"Inspired by psutil source code"
pid = os.getpid()
try:
with open(f"/proc/{pid}/statm", encoding="utf8") as statm:
return (
int(statm.readline().split()[1])
* os.sysconf("SC_PAGE_SIZE")
/ 1024
/ 1024
)
except FileNotFoundError: # This file only exists under Linux
return None
def get_process_heap_and_stack_sizes() -> Tuple[str]:
heap_size_in_mib, stack_size_in_mib = "<unavailable>", "<unavailable>"
pid = os.getpid()
try:
with open(f"/proc/{pid}/maps", encoding="utf8") as maps_file:
maps_lines = list(maps_file)
except FileNotFoundError: # This file only exists under Linux
return heap_size_in_mib, stack_size_in_mib
for line in maps_lines:
line = line.split()
addr_range, path = line[0], line[-1]
addr_start, addr_end = addr_range.split("-")
addr_start, addr_end = int(addr_start, 16), int(addr_end, 16)
size = addr_end - addr_start
if path == "[heap]":
heap_size_in_mib = f"{size / 1024 / 1024:.1f} MiB"
elif path == "[stack]":
stack_size_in_mib = f"{size / 1024 / 1024:.1f} MiB"
return heap_size_in_mib, stack_size_in_mib
def get_pymalloc_allocated_over_total_size() -> Tuple[str]:
"""
Get PyMalloc stats from sys._debugmallocstats()
From experiments, not very reliable
"""
try:
# pylint: disable=import-outside-toplevel
from pymemtrace.debug_malloc_stats import get_debugmallocstats
allocated, total = -1, -1
for line in get_debugmallocstats().decode().splitlines():
if line.startswith("Total"):
total = int(line.split()[-1].replace(",", ""))
elif line.startswith("# bytes in allocated blocks"):
allocated = int(line.split()[-1].replace(",", ""))
return f"{allocated / 1024 / 1024:.1f} / {total / 1024 / 1024:.1f} MiB"
except ImportError:
warnings.warn("pymemtrace could not be imported - Run: pip install pymemtrace")
return "<unavailable>"
def get_gc_managed_objs_total_size() -> str:
"From experiments, not very reliable"
try:
# pylint: disable=import-outside-toplevel
from pympler.muppy import get_objects, getsizeof
objs_total_size = sum(getsizeof(obj) for obj in get_objects())
return f"{objs_total_size / 1024 / 1024:.1f} MiB"
except ImportError:
warnings.warn("pympler could not be imported - Run: pip install pympler")
return "<unavailable>"
def get_tracemalloc_traced_memory() -> str:
"Requires python -X tracemalloc"
current, peak = get_traced_memory()
return f"{current / 1024 / 1024:.1f} (peak={peak / 1024 / 1024:.1f}) MiB"
def get_pillow_allocated_memory() -> str:
# pylint: disable=c-extension-no-member,import-outside-toplevel
from PIL import Image
stats = Image.core.get_stats()
blocks_in_use = stats["allocated_blocks"] - stats["freed_blocks"]
return f"{blocks_in_use * PIL_MEM_BLOCK_SIZE_IN_MIB:.1f} MiB"
Functions
def b(s)
-
Expand source code
def b(s): if isinstance(s, str): return s.encode("latin1") if isinstance(s, int): return bytes([s]) # http://bugs.python.org/issue4588 raise ValueError(f"Invalid input: {s}")
def buffer_subst(buffer, placeholder, value)
-
Expand source code
def buffer_subst(buffer, placeholder, value): buffer_size = len(buffer) assert len(placeholder) == len(value), f"placeholder={placeholder} value={value}" buffer = buffer.replace(placeholder.encode(), value.encode(), 1) assert len(buffer) == buffer_size return buffer
def convert_unit(to_convert: Union[float, int, Iterable[Union[float, int, Iterable]]], old_unit: Union[str, numbers.Number], new_unit: Union[str, numbers.Number]) ‑> Union[float, tuple]
-
Convert a number or sequence of numbers from one unit to another.
If either unit is a number it will be treated as the number of points per unit. So 72 would mean 1 inch.
Args: to_convert (float, int, Iterable): The number / list of numbers, or points, to convert old_unit (str, float, int): A unit accepted by fpdf.FPDF or a number new_unit (str, float, int): A unit accepted by fpdf.FPDF or a number
Returns
(float, tuple): to_convert converted from old_unit to new_unit or a tuple of the same
Expand source code
def convert_unit( to_convert: Union[float, int, Iterable[Union[float, int, Iterable]]], old_unit: Union[str, Number], new_unit: Union[str, Number], ) -> Union[float, tuple]: """ Convert a number or sequence of numbers from one unit to another. If either unit is a number it will be treated as the number of points per unit. So 72 would mean 1 inch. Args: to_convert (float, int, Iterable): The number / list of numbers, or points, to convert old_unit (str, float, int): A unit accepted by fpdf.FPDF or a number new_unit (str, float, int): A unit accepted by fpdf.FPDF or a number Returns: (float, tuple): to_convert converted from old_unit to new_unit or a tuple of the same """ unit_conversion_factor = get_scale_factor(new_unit) / get_scale_factor(old_unit) if isinstance(to_convert, Iterable): return tuple( map(lambda i: convert_unit(i, 1, unit_conversion_factor), to_convert) ) return to_convert / unit_conversion_factor
def enclose_in_parens(s)
-
Format a text string
Expand source code
def enclose_in_parens(s): """Format a text string""" if s: assert isinstance(s, str) return f"({escape_parens(s)})" return ""
def escape_parens(s)
-
Add a backslash character before , ( and )
Expand source code
def escape_parens(s): """Add a backslash character before , ( and )""" if isinstance(s, str): return ( s.replace("\\", "\\\\") .replace(")", "\\)") .replace("(", "\\(") .replace("\r", "\\r") ) return ( s.replace(b"\\", b"\\\\") .replace(b")", b"\\)") .replace(b"(", b"\\(") .replace(b"\r", b"\\r") )
def format_date(date: datetime.datetime, with_tz=False) ‑> str
-
Expand source code
def format_date(date: datetime, with_tz=False) -> str: if with_tz: assert date.tzinfo if date.tzinfo == timezone.utc: str_date = f"D:{date:%Y%m%d%H%M%SZ%H'%M'}" else: str_date = f"D:{date:%Y%m%d%H%M%S%z}" str_date = str_date[:-2] + "'" + str_date[-2:] + "'" return enclose_in_parens(str_date) return enclose_in_parens(f"D:{date:%Y%m%d%H%M%S}")
def get_gc_managed_objs_total_size() ‑> str
-
From experiments, not very reliable
Expand source code
def get_gc_managed_objs_total_size() -> str: "From experiments, not very reliable" try: # pylint: disable=import-outside-toplevel from pympler.muppy import get_objects, getsizeof objs_total_size = sum(getsizeof(obj) for obj in get_objects()) return f"{objs_total_size / 1024 / 1024:.1f} MiB" except ImportError: warnings.warn("pympler could not be imported - Run: pip install pympler") return "<unavailable>"
def get_mem_usage(prefix) ‑> str
-
Expand source code
def get_mem_usage(prefix) -> str: _collected_count = gc.collect() rss = get_process_rss() # heap_size, stack_size = get_process_heap_and_stack_sizes() # objs_size_sum = get_gc_managed_objs_total_size() pillow = get_pillow_allocated_memory() if is_tracing(): malloc_stats = get_tracemalloc_traced_memory() else: malloc_stats = get_pymalloc_allocated_over_total_size() return f"{prefix:<40} Malloc stats: {malloc_stats} | Pillow: {pillow} | Process RSS: {rss}"
def get_pillow_allocated_memory() ‑> str
-
Expand source code
def get_pillow_allocated_memory() -> str: # pylint: disable=c-extension-no-member,import-outside-toplevel from PIL import Image stats = Image.core.get_stats() blocks_in_use = stats["allocated_blocks"] - stats["freed_blocks"] return f"{blocks_in_use * PIL_MEM_BLOCK_SIZE_IN_MIB:.1f} MiB"
def get_process_heap_and_stack_sizes() ‑> Tuple[str]
-
Expand source code
def get_process_heap_and_stack_sizes() -> Tuple[str]: heap_size_in_mib, stack_size_in_mib = "<unavailable>", "<unavailable>" pid = os.getpid() try: with open(f"/proc/{pid}/maps", encoding="utf8") as maps_file: maps_lines = list(maps_file) except FileNotFoundError: # This file only exists under Linux return heap_size_in_mib, stack_size_in_mib for line in maps_lines: line = line.split() addr_range, path = line[0], line[-1] addr_start, addr_end = addr_range.split("-") addr_start, addr_end = int(addr_start, 16), int(addr_end, 16) size = addr_end - addr_start if path == "[heap]": heap_size_in_mib = f"{size / 1024 / 1024:.1f} MiB" elif path == "[stack]": stack_size_in_mib = f"{size / 1024 / 1024:.1f} MiB" return heap_size_in_mib, stack_size_in_mib
def get_process_rss() ‑> str
-
Expand source code
def get_process_rss() -> str: rss_as_mib = get_process_rss_as_mib() if rss_as_mib: return f"{rss_as_mib:.1f} MiB" return "<unavailable>"
def get_process_rss_as_mib() ‑> Optional[numbers.Number]
-
Inspired by psutil source code
Expand source code
def get_process_rss_as_mib() -> Union[Number, None]: "Inspired by psutil source code" pid = os.getpid() try: with open(f"/proc/{pid}/statm", encoding="utf8") as statm: return ( int(statm.readline().split()[1]) * os.sysconf("SC_PAGE_SIZE") / 1024 / 1024 ) except FileNotFoundError: # This file only exists under Linux return None
def get_pymalloc_allocated_over_total_size() ‑> Tuple[str]
-
Get PyMalloc stats from sys._debugmallocstats() From experiments, not very reliable
Expand source code
def get_pymalloc_allocated_over_total_size() -> Tuple[str]: """ Get PyMalloc stats from sys._debugmallocstats() From experiments, not very reliable """ try: # pylint: disable=import-outside-toplevel from pymemtrace.debug_malloc_stats import get_debugmallocstats allocated, total = -1, -1 for line in get_debugmallocstats().decode().splitlines(): if line.startswith("Total"): total = int(line.split()[-1].replace(",", "")) elif line.startswith("# bytes in allocated blocks"): allocated = int(line.split()[-1].replace(",", "")) return f"{allocated / 1024 / 1024:.1f} / {total / 1024 / 1024:.1f} MiB" except ImportError: warnings.warn("pymemtrace could not be imported - Run: pip install pymemtrace") return "<unavailable>"
def get_scale_factor(unit: Union[str, numbers.Number]) ‑> float
-
Get how many pts are in a unit. (k)
Args
unit
:str, float, int
- Any of "pt", "mm", "cm", "in", or a number.
Returns
float
- The number of points in that unit (assuming 72dpi)
Raises
ValueError
Expand source code
def get_scale_factor(unit: Union[str, Number]) -> float: """ Get how many pts are in a unit. (k) Args: unit (str, float, int): Any of "pt", "mm", "cm", "in", or a number. Returns: float: The number of points in that unit (assuming 72dpi) Raises: ValueError """ if isinstance(unit, Number): return float(unit) if unit == "pt": return 1 if unit == "mm": return 72 / 25.4 if unit == "cm": return 72 / 2.54 if unit == "in": return 72.0 raise ValueError(f"Incorrect unit: {unit}")
def get_tracemalloc_traced_memory() ‑> str
-
Requires python -X tracemalloc
Expand source code
def get_tracemalloc_traced_memory() -> str: "Requires python -X tracemalloc" current, peak = get_traced_memory() return f"{current / 1024 / 1024:.1f} (peak={peak / 1024 / 1024:.1f}) MiB"
def print_mem_usage(prefix)
-
Expand source code
def print_mem_usage(prefix): print(get_mem_usage(prefix))