camelot-py/camelot/core.py

823 lines
26 KiB
Python

# -*- coding: utf-8 -*-
import os
import sqlite3
import zipfile
import tempfile
from itertools import chain
from operator import itemgetter
import numpy as np
import pandas as pd
from cv2 import cv2
from .utils import (
build_file_path_in_temp_dir,
compute_accuracy,
compute_whitespace,
export_pdf_as_png
)
# minimum number of vertical textline intersections for a textedge
# to be considered valid
TEXTEDGE_REQUIRED_ELEMENTS = 4
# padding added to table area on the left, right and bottom
TABLE_AREA_PADDING = 10
class TextEdge(object):
"""Defines a text edge coordinates relative to a left-bottom
origin. (PDF coordinate space)
Parameters
----------
x : float
x-coordinate of the text edge.
y0 : float
y-coordinate of bottommost point.
y1 : float
y-coordinate of topmost point.
align : string, optional (default: 'left')
{'left', 'right', 'middle'}
Attributes
----------
intersections: int
Number of intersections with horizontal text rows.
is_valid: bool
A text edge is valid if it intersects with at least
TEXTEDGE_REQUIRED_ELEMENTS horizontal text rows.
"""
def __init__(self, x, y0, y1, align="left"):
self.x = x
self.y0 = y0
self.y1 = y1
self.align = align
self.intersections = 0
self.is_valid = False
def __repr__(self):
return "<TextEdge x={} y0={} y1={} align={} valid={}>".format(
round(self.x, 2),
round(self.y0, 2),
round(self.y1, 2),
self.align,
self.is_valid,
)
def update_coords(self, x, y0, edge_tol=50):
"""Updates the text edge's x and bottom y coordinates and sets
the is_valid attribute.
"""
if np.isclose(self.y0, y0, atol=edge_tol):
self.x = (self.intersections * self.x + x) / \
float(self.intersections + 1)
self.y0 = y0
self.intersections += 1
# a textedge is valid only if it extends uninterrupted
# over a required number of textlines
if self.intersections > TEXTEDGE_REQUIRED_ELEMENTS:
self.is_valid = True
class TextEdges(object):
"""Defines a dict of left, right and middle text edges found on
the PDF page. The dict has three keys based on the alignments,
and each key's value is a list of camelot.core.TextEdge objects.
"""
def __init__(self, edge_tol=50):
self.edge_tol = edge_tol
self._textedges = {"left": [], "right": [], "middle": []}
@staticmethod
def get_x_coord(textline, align):
"""Returns the x coordinate of a text row based on the
specified alignment.
"""
x_left = textline.x0
x_right = textline.x1
x_middle = x_left + (x_right - x_left) / 2.0
x_coord = {"left": x_left, "middle": x_middle, "right": x_right}
return x_coord[align]
def find(self, x_coord, align):
"""Returns the index of an existing text edge using
the specified x coordinate and alignment.
"""
for i, te in enumerate(self._textedges[align]):
if np.isclose(te.x, x_coord, atol=0.5):
return i
return None
def add(self, textline, align):
"""Adds a new text edge to the current dict.
"""
x = self.get_x_coord(textline, align)
y0 = textline.y0
y1 = textline.y1
te = TextEdge(x, y0, y1, align=align)
self._textedges[align].append(te)
def update(self, textline):
"""Updates an existing text edge in the current dict.
"""
for align in ["left", "right", "middle"]:
x_coord = self.get_x_coord(textline, align)
idx = self.find(x_coord, align)
if idx is None:
self.add(textline, align)
else:
self._textedges[align][idx].update_coords(
x_coord, textline.y0, edge_tol=self.edge_tol
)
def generate(self, textlines):
"""Generates the text edges dict based on horizontal text
rows.
"""
for tl in textlines:
if len(tl.get_text().strip()) > 1: # TODO: hacky
self.update(tl)
def get_relevant(self):
"""Returns the list of relevant text edges (all share the same
alignment) based on which list intersects horizontal text rows
the most.
"""
intersections_sum = {
"left": sum(
te.intersections for te in self._textedges["left"]
if te.is_valid
),
"right": sum(
te.intersections for te in self._textedges["right"]
if te.is_valid
),
"middle": sum(
te.intersections for te in self._textedges["middle"]
if te.is_valid
),
}
# TODO: naive
# get vertical textedges that intersect maximum number of
# times with horizontal textlines
relevant_align = max(intersections_sum.items(), key=itemgetter(1))[0]
return list(filter(
lambda te: te.is_valid,
self._textedges[relevant_align])
)
def get_table_areas(self, textlines, relevant_textedges):
"""Returns a dict of interesting table areas on the PDF page
calculated using relevant text edges.
"""
def pad(area, average_row_height):
x0 = area[0] - TABLE_AREA_PADDING
y0 = area[1] - TABLE_AREA_PADDING
x1 = area[2] + TABLE_AREA_PADDING
# add a constant since table headers can be relatively up
y1 = area[3] + average_row_height * 5
return (x0, y0, x1, y1)
# sort relevant textedges in reading order
relevant_textedges.sort(key=lambda te: (-te.y0, te.x))
table_areas = {}
for te in relevant_textedges:
if not table_areas:
table_areas[(te.x, te.y0, te.x, te.y1)] = None
else:
found = None
for area in table_areas:
# check for overlap
if te.y1 >= area[1] and te.y0 <= area[3]:
found = area
break
if found is None:
table_areas[(te.x, te.y0, te.x, te.y1)] = None
else:
table_areas.pop(found)
updated_area = (
found[0],
min(te.y0, found[1]),
max(found[2], te.x),
max(found[3], te.y1),
)
table_areas[updated_area] = None
# extend table areas based on textlines that overlap
# vertically. it's possible that these textlines were
# eliminated during textedges generation since numbers and
# chars/words/sentences are often aligned differently.
# drawback: table areas that have paragraphs on their sides
# will include the paragraphs too.
sum_textline_height = 0
for tl in textlines:
sum_textline_height += tl.y1 - tl.y0
found = None
for area in table_areas:
# check for overlap
if tl.y0 >= area[1] and tl.y1 <= area[3]:
found = area
break
if found is not None:
table_areas.pop(found)
updated_area = (
min(tl.x0, found[0]),
min(tl.y0, found[1]),
max(found[2], tl.x1),
max(found[3], tl.y1),
)
table_areas[updated_area] = None
average_textline_height = sum_textline_height / \
float(len(textlines))
# add some padding to table areas
table_areas_padded = {}
for area in table_areas:
table_areas_padded[pad(area, average_textline_height)] = None
return table_areas_padded
class Cell(object):
"""Defines a cell in a table with coordinates relative to a
left-bottom origin. (PDF coordinate space)
Parameters
----------
x1 : float
x-coordinate of left-bottom point.
y1 : float
y-coordinate of left-bottom point.
x2 : float
x-coordinate of right-top point.
y2 : float
y-coordinate of right-top point.
Attributes
----------
lb : tuple
Tuple representing left-bottom coordinates.
lt : tuple
Tuple representing left-top coordinates.
rb : tuple
Tuple representing right-bottom coordinates.
rt : tuple
Tuple representing right-top coordinates.
left : bool
Whether or not cell is bounded on the left.
right : bool
Whether or not cell is bounded on the right.
top : bool
Whether or not cell is bounded on the top.
bottom : bool
Whether or not cell is bounded on the bottom.
hspan : bool
Whether or not cell spans horizontally.
vspan : bool
Whether or not cell spans vertically.
text : string
Text assigned to cell.
"""
def __init__(self, x1, y1, x2, y2):
self.x1 = x1
self.y1 = y1
self.x2 = x2
self.y2 = y2
self.lb = (x1, y1)
self.lt = (x1, y2)
self.rb = (x2, y1)
self.rt = (x2, y2)
self.left = False
self.right = False
self.top = False
self.bottom = False
self.hspan = False
self.vspan = False
self._text = ""
def __repr__(self):
return "<Cell x1={} y1={} x2={} y2={}>".format(
round(self.x1, 2),
round(self.y1, 2),
round(self.x2, 2),
round(self.y2, 2)
)
@property
def text(self):
return self._text
@text.setter
def text(self, t):
self._text = "".join([self._text, t])
@property
def bound(self):
"""The number of sides on which the cell is bounded.
"""
return self.top + self.bottom + self.left + self.right
class Table(object):
"""Defines a table with coordinates relative to a left-bottom
origin. (PDF coordinate space)
Parameters
----------
cols : list
List of tuples representing column x-coordinates in increasing
order.
rows : list
List of tuples representing row y-coordinates in decreasing
order.
Attributes
----------
df : :class:`pandas.DataFrame`
shape : tuple
Shape of the table.
accuracy : float
Accuracy with which text was assigned to the cell.
whitespace : float
Percentage of whitespace in the table.
filename : str
Path of the original PDF
order : int
Table number on PDF page.
page : int
PDF page number.
"""
def __init__(self, cols, rows):
self.cols = cols
self.rows = rows
self.cells = [
[Cell(c[0], r[1], c[1], r[0]) for c in cols] for r in rows
]
self.df = None
self.shape = (0, 0)
self.accuracy = 0
self.whitespace = 0
self.filename = None
self.order = None
self.page = None
self.flavor = None # Flavor of the parser that generated the table
self.pdf_size = None # Dimensions of the original PDF page
self.debug_info = None # Field holding debug data
self._image = None
self._image_path = None # Temporary file to hold an image of the pdf
def __repr__(self):
return "<{} shape={}>".format(self.__class__.__name__, self.shape)
def __lt__(self, other):
if self.page == other.page:
if self.order < other.order:
return True
if self.page < other.page:
return True
@property
def data(self):
"""Returns two-dimensional list of strings in table.
"""
d = []
for row in self.cells:
d.append([cell.text.strip() for cell in row])
return d
@property
def parsing_report(self):
"""Returns a parsing report with %accuracy, %whitespace,
table number on page and page number.
"""
# pretty?
report = {
"accuracy": round(self.accuracy, 2),
"whitespace": round(self.whitespace, 2),
"order": self.order,
"page": self.page,
}
return report
def record_metadata(self, parser):
"""Record data about the origin of the table
"""
self.flavor = parser.id
self.filename = parser.filename
self.debug_info = parser.debug_info
data = self.data
self.df = pd.DataFrame(data)
self.shape = self.df.shape
self.whitespace = compute_whitespace(data)
self.pdf_size = (parser.pdf_width, parser.pdf_height)
def get_pdf_image(self):
"""Compute pdf image and cache it
"""
if self._image is None:
if self._image_path is None:
self._image_path = build_file_path_in_temp_dir(
os.path.basename(self.filename),
".png"
)
export_pdf_as_png(self.filename, self._image_path)
self._image = cv2.imread(self._image_path)
return self._image
def set_all_edges(self):
"""Sets all table edges to True.
"""
for row in self.cells:
for cell in row:
cell.left = cell.right = cell.top = cell.bottom = True
return self
def set_edges(self, vertical, horizontal, joint_tol=2):
"""Sets a cell's edges to True depending on whether the cell's
coordinates overlap with the line's coordinates within a
tolerance.
Parameters
----------
vertical : list
List of detected vertical lines.
horizontal : list
List of detected horizontal lines.
"""
for v in vertical:
# find closest x coord
# iterate over y coords and find closest start and end points
i = [
i
for i, t in enumerate(self.cols)
if np.isclose(v[0], t[0], atol=joint_tol)
]
j = [
j
for j, t in enumerate(self.rows)
if np.isclose(v[3], t[0], atol=joint_tol)
]
k = [
k
for k, t in enumerate(self.rows)
if np.isclose(v[1], t[0], atol=joint_tol)
]
if not j:
continue
J = j[0]
if i == [0]: # only left edge
L = i[0]
if k:
K = k[0]
while J < K:
self.cells[J][L].left = True
J += 1
else:
K = len(self.rows)
while J < K:
self.cells[J][L].left = True
J += 1
elif i == []: # only right edge
L = len(self.cols) - 1
if k:
K = k[0]
while J < K:
self.cells[J][L].right = True
J += 1
else:
K = len(self.rows)
while J < K:
self.cells[J][L].right = True
J += 1
else: # both left and right edges
L = i[0]
if k:
K = k[0]
while J < K:
self.cells[J][L].left = True
self.cells[J][L - 1].right = True
J += 1
else:
K = len(self.rows)
while J < K:
self.cells[J][L].left = True
self.cells[J][L - 1].right = True
J += 1
for h in horizontal:
# find closest y coord
# iterate over x coords and find closest start and end points
i = [
i
for i, t in enumerate(self.rows)
if np.isclose(h[1], t[0], atol=joint_tol)
]
j = [
j
for j, t in enumerate(self.cols)
if np.isclose(h[0], t[0], atol=joint_tol)
]
k = [
k
for k, t in enumerate(self.cols)
if np.isclose(h[2], t[0], atol=joint_tol)
]
if not j:
continue
J = j[0]
if i == [0]: # only top edge
L = i[0]
if k:
K = k[0]
while J < K:
self.cells[L][J].top = True
J += 1
else:
K = len(self.cols)
while J < K:
self.cells[L][J].top = True
J += 1
elif i == []: # only bottom edge
L = len(self.rows) - 1
if k:
K = k[0]
while J < K:
self.cells[L][J].bottom = True
J += 1
else:
K = len(self.cols)
while J < K:
self.cells[L][J].bottom = True
J += 1
else: # both top and bottom edges
L = i[0]
if k:
K = k[0]
while J < K:
self.cells[L][J].top = True
self.cells[L - 1][J].bottom = True
J += 1
else:
K = len(self.cols)
while J < K:
self.cells[L][J].top = True
self.cells[L - 1][J].bottom = True
J += 1
return self
def set_border(self):
"""Sets table border edges to True.
"""
for r in range(len(self.rows)):
self.cells[r][0].left = True
self.cells[r][len(self.cols) - 1].right = True
for c in range(len(self.cols)):
self.cells[0][c].top = True
self.cells[len(self.rows) - 1][c].bottom = True
return self
def set_span(self):
"""Sets a cell's hspan or vspan attribute to True depending
on whether the cell spans horizontally or vertically.
"""
for row in self.cells:
for cell in row:
left = cell.left
right = cell.right
top = cell.top
bottom = cell.bottom
if cell.bound == 4:
continue
elif cell.bound == 3:
if not left and (right and top and bottom):
cell.hspan = True
elif not right and (left and top and bottom):
cell.hspan = True
elif not top and (left and right and bottom):
cell.vspan = True
elif not bottom and (left and right and top):
cell.vspan = True
elif cell.bound == 2:
if left and right and (not top and not bottom):
cell.vspan = True
elif top and bottom and (not left and not right):
cell.hspan = True
elif cell.bound in [0, 1]:
cell.vspan = True
cell.hspan = True
return self
def to_csv(self, path, **kwargs):
"""Writes Table to a comma-separated values (csv) file.
For kwargs, check :meth:`pandas.DataFrame.to_csv`.
Parameters
----------
path : str
Output filepath.
"""
kw = {"encoding": "utf-8", "index": False, "header": False,
"quoting": 1}
kw.update(kwargs)
self.df.to_csv(path, **kw)
def to_json(self, path, **kwargs):
"""Writes Table to a JSON file.
For kwargs, check :meth:`pandas.DataFrame.to_json`.
Parameters
----------
path : str
Output filepath.
"""
kw = {"orient": "records"}
kw.update(kwargs)
json_string = self.df.to_json(**kw)
with open(path, "w") as f:
f.write(json_string)
def to_excel(self, path, **kwargs):
"""Writes Table to an Excel file.
For kwargs, check :meth:`pandas.DataFrame.to_excel`.
Parameters
----------
path : str
Output filepath.
"""
kw = {
"sheet_name": "page-{}-table-{}".format(self.page, self.order),
"encoding": "utf-8",
}
kw.update(kwargs)
# pylint: disable=abstract-class-instantiated
writer = pd.ExcelWriter(path)
self.df.to_excel(writer, **kw)
writer.save()
def to_html(self, path, **kwargs):
"""Writes Table to an HTML file.
For kwargs, check :meth:`pandas.DataFrame.to_html`.
Parameters
----------
path : str
Output filepath.
"""
html_string = self.df.to_html(**kwargs)
with open(path, "w") as f:
f.write(html_string)
def to_sqlite(self, path, **kwargs):
"""Writes Table to sqlite database.
For kwargs, check :meth:`pandas.DataFrame.to_sql`.
Parameters
----------
path : str
Output filepath.
"""
kw = {"if_exists": "replace", "index": False}
kw.update(kwargs)
conn = sqlite3.connect(path)
table_name = "page-{}-table-{}".format(self.page, self.order)
self.df.to_sql(table_name, conn, **kw)
conn.commit()
conn.close()
class TableList(object):
"""Defines a list of camelot.core.Table objects. Each table can
be accessed using its index.
Attributes
----------
n : int
Number of tables in the list.
"""
def __init__(self, tables):
self._tables = tables
def __repr__(self):
return "<{} n={}>".format(self.__class__.__name__, self.n)
def __len__(self):
return len(self._tables)
def __getitem__(self, idx):
return self._tables[idx]
@staticmethod
def _format_func(table, f):
return getattr(table, "to_{}".format(f))
@property
def n(self):
return len(self)
def _write_file(self, f=None, **kwargs):
dirname = kwargs.get("dirname")
root = kwargs.get("root")
ext = kwargs.get("ext")
for table in self._tables:
filename = os.path.join(
"{}-page-{}-table-{}{}".format(root, table.page, table.order,
ext)
)
filepath = os.path.join(dirname, filename)
to_format = self._format_func(table, f)
to_format(filepath)
def _compress_dir(self, **kwargs):
path = kwargs.get("path")
dirname = kwargs.get("dirname")
root = kwargs.get("root")
ext = kwargs.get("ext")
zipname = os.path.join(os.path.dirname(path), root) + ".zip"
with zipfile.ZipFile(zipname, "w", allowZip64=True) as z:
for table in self._tables:
filename = os.path.join(
"{}-page-{}-table-{}{}".format(root,
table.page,
table.order,
ext)
)
filepath = os.path.join(dirname, filename)
z.write(filepath, os.path.basename(filepath))
def export(self, path, f="csv", compress=False):
"""Exports the list of tables to specified file format.
Parameters
----------
path : str
Output filepath.
f : str
File format. Can be csv, json, excel, html and sqlite.
compress : bool
Whether or not to add files to a ZIP archive.
"""
dirname = os.path.dirname(path)
basename = os.path.basename(path)
root, ext = os.path.splitext(basename)
if compress:
dirname = tempfile.mkdtemp()
kwargs = {"path": path, "dirname": dirname, "root": root, "ext": ext}
if f in ["csv", "json", "html"]:
self._write_file(f=f, **kwargs)
if compress:
self._compress_dir(**kwargs)
elif f == "excel":
filepath = os.path.join(dirname, basename)
# pylint: disable=abstract-class-instantiated
writer = pd.ExcelWriter(filepath)
for table in self._tables:
sheet_name = "page-{}-table-{}".format(table.page, table.order)
table.df.to_excel(writer, sheet_name=sheet_name,
encoding="utf-8")
writer.save()
if compress:
zipname = os.path.join(os.path.dirname(path), root) + ".zip"
with zipfile.ZipFile(zipname, "w", allowZip64=True) as z:
z.write(filepath, os.path.basename(filepath))
elif f == "sqlite":
filepath = os.path.join(dirname, basename)
for table in self._tables:
table.to_sqlite(filepath)
if compress:
zipname = os.path.join(os.path.dirname(path), root) + ".zip"
with zipfile.ZipFile(zipname, "w", allowZip64=True) as z:
z.write(filepath, os.path.basename(filepath))