camelot-py/camelot/core.py

895 lines
29 KiB
Python

# -*- coding: utf-8 -*-
import os
import sqlite3
import zipfile
import tempfile
from itertools import chain
from operator import itemgetter
import numpy as np
import pandas as pd
# minimum number of vertical textline intersections for a textedge
# to be considered valid
TEXTEDGE_REQUIRED_ELEMENTS = 4
# maximum number of columns over which a header can spread
MAX_COL_SPREAD_IN_HEADER = 3
class TextEdge(object):
"""Defines a text edge coordinates relative to a left-bottom
origin. (PDF coordinate space)
Parameters
----------
x : float
x-coordinate of the text edge.
y0 : float
y-coordinate of bottommost point.
y1 : float
y-coordinate of topmost point.
align : string, optional (default: 'left')
{'left', 'right', 'middle'}
Attributes
----------
intersections: int
Number of intersections with horizontal text rows.
is_valid: bool
A text edge is valid if it intersections with at least
TEXTEDGE_REQUIRED_ELEMENTS horizontal text rows.
"""
def __init__(self, x, y0, y1, align="left"):
self.x = x
self.y0 = y0
self.y1 = y1
self.align = align
self.intersections = 0
self.is_valid = False
def __repr__(self):
return "<TextEdge x={} y0={} y1={} align={} valid={}>".format(
round(self.x, 2),
round(self.y0, 2),
round(self.y1, 2),
self.align,
self.is_valid,
)
def update_coords(self, x, y0, edge_tol=50):
"""Updates the text edge's x and bottom y coordinates and sets
the is_valid attribute.
"""
if np.isclose(self.y0, y0, atol=edge_tol):
self.x = (self.intersections * self.x + x) / \
float(self.intersections + 1)
self.y0 = y0
self.intersections += 1
# a textedge is valid only if it extends uninterrupted
# over a required number of textlines
if self.intersections > TEXTEDGE_REQUIRED_ELEMENTS:
self.is_valid = True
class TextEdges(object):
"""Defines a dict of left, right and middle text edges found on
the PDF page. The dict has three keys based on the alignments,
and each key's value is a list of camelot.core.TextEdge objects.
"""
def __init__(self, edge_tol=50):
self.edge_tol = edge_tol
self._textedges = {"left": [], "right": [], "middle": []}
@staticmethod
def get_x_coord(textline, align):
"""Returns the x coordinate of a text row based on the
specified alignment.
"""
x_left = textline.x0
x_right = textline.x1
x_middle = x_left + (x_right - x_left) / 2.0
x_coord = {"left": x_left, "middle": x_middle, "right": x_right}
return x_coord[align]
def find(self, x_coord, align):
"""Returns the index of an existing text edge using
the specified x coordinate and alignment.
"""
for i, te in enumerate(self._textedges[align]):
if np.isclose(te.x, x_coord, atol=0.5):
return i
return None
def add(self, textline, align):
"""Adds a new text edge to the current dict.
"""
x = self.get_x_coord(textline, align)
y0 = textline.y0
y1 = textline.y1
te = TextEdge(x, y0, y1, align=align)
self._textedges[align].append(te)
def update(self, textline):
"""Updates an existing text edge in the current dict.
"""
for align in ["left", "right", "middle"]:
x_coord = self.get_x_coord(textline, align)
idx = self.find(x_coord, align)
if idx is None:
self.add(textline, align)
else:
self._textedges[align][idx].update_coords(
x_coord, textline.y0, edge_tol=self.edge_tol
)
def generate(self, textlines):
"""Generates the text edges dict based on horizontal text
rows.
"""
for tl in textlines:
if len(tl.get_text().strip()) > 1: # TODO: hacky
self.update(tl)
def get_relevant(self):
"""Returns the list of relevant text edges (all share the same
alignment) based on which list intersects horizontal text rows
the most.
"""
intersections_sum = {
"left": sum(
te.intersections for te in self._textedges["left"]
if te.is_valid
),
"right": sum(
te.intersections for te in self._textedges["right"]
if te.is_valid
),
"middle": sum(
te.intersections for te in self._textedges["middle"]
if te.is_valid
),
}
# TODO: naive
# get vertical textedges that intersect maximum number of
# times with horizontal textlines
relevant_align = max(
intersections_sum.items(),
key=itemgetter(1)
)[0]
return list(filter(
lambda te: te.is_valid,
self._textedges[relevant_align])
)
@staticmethod
def _expand_area_for_header(area, textlines, col_anchors,
average_row_height):
"""The core algorithm is based on fairly strict alignment of text.
It works ok for the table body, but might fail on tables' headers
since they tend to be in a different font, alignment (e.g. vertical),
etc.
The section below tries to identify whether what's above the bbox
identified so far has the characteristics of a table header:
Close to the top of the body, with cells that fit within the bounds
identified.
"""
new_area = area
(left, bottom, right, top) = area
zones = []
def column_spread(left, right, col_anchors):
"""Returns the number of columns (splits on the x-axis)
crossed by an element covering left to right.
"""
indexLeft = 0
while indexLeft < len(col_anchors) \
and col_anchors[indexLeft] < left:
indexLeft += 1
indexRight = indexLeft
while indexRight < len(col_anchors) \
and col_anchors[indexRight] < right:
indexRight += 1
return indexRight - indexLeft
keep_searching = True
while keep_searching:
keep_searching = False
# a/ first look for the closest text element above the area.
# It will be the anchor for a possible new row.
closest_above = None
all_above = []
for te in textlines:
# higher than the table, directly within its bounds
if te.y0 > top and te.x0 > left and te.x1 < right:
all_above.append(te)
if closest_above is None or closest_above.y0 > te.y0:
closest_above = te
if closest_above and \
closest_above.y0 < top + average_row_height:
# b/ We have a candidate cell that is within the correct
# vertical band, and directly above the table. Starting from
# this anchor, we list all the textlines within the same row.
tls_in_new_row = []
top = closest_above.y1
pushed_up = True
while pushed_up:
pushed_up = False
# Iterate and extract elements that fit in the row
# from our list
for i in range(len(all_above) - 1, -1, -1):
te = all_above[i]
if te.y0 < top:
# The bottom of this element is within our row
# so we add it.
tls_in_new_row.append(te)
all_above.pop(i)
if te.y1 > top:
# If the top of this element raises our row's
# band, we'll need to keep on searching for
# overlapping items
top = te.y1
pushed_up = True
# Get the x-ranges for all the textlines, and merge the
# x-ranges that overlap
zones = zones + \
list(map(lambda tl: [tl.x0, tl.x1], tls_in_new_row))
zones.sort(key=lambda z: z[0]) # Sort by left coordinate
# Starting from the right, if two zones overlap horizontally,
# merge them
merged_something = True
while merged_something:
merged_something = False
for i in range(len(zones) - 1, 0, -1):
zone_right = zones[i]
zone_left = zones[i-1]
if zone_left[1] >= zone_right[0]:
zone_left[1] = max(zone_right[1], zone_left[1])
zones.pop(i)
merged_something = True
max_spread = max(
list(
map(
lambda zone: column_spread(
zone[0], zone[1], col_anchors),
zones
)
)
)
if max_spread <= MAX_COL_SPREAD_IN_HEADER:
# Combined, the elements we've identified don't cross more
# than the authorized number of columns.
# We're trying to avoid
# 0: <BAD: Added header spans too broad>
# 1: <A1> <B1> <C1> <D1> <E1>
# 2: <A2> <B2> <C2> <D2> <E2>
# if len(zones) > TEXTEDGE_REQUIRED_ELEMENTS:
new_area = (left, bottom, right, top)
# At this stage we've identified a plausible row (or the
# beginning of one).
keep_searching = True
return new_area
def get_table_areas(self, textlines, relevant_textedges):
"""Returns a dict of interesting table areas on the PDF page
calculated using relevant text edges.
"""
# sort relevant textedges in reading order
relevant_textedges.sort(key=lambda te: (-te.y0, te.x))
table_areas = {}
for te in relevant_textedges:
if not table_areas:
table_areas[(te.x, te.y0, te.x, te.y1)] = None
else:
found = None
for area in table_areas:
# check for overlap
if te.y1 >= area[1] and te.y0 <= area[3]:
found = area
break
if found is None:
table_areas[(te.x, te.y0, te.x, te.y1)] = None
else:
table_areas.pop(found)
updated_area = (
found[0],
min(te.y0, found[1]),
max(found[2], te.x),
max(found[3], te.y1),
)
table_areas[updated_area] = None
# extend table areas based on textlines that overlap
# vertically. it's possible that these textlines were
# eliminated during textedges generation since numbers and
# chars/words/sentences are often aligned differently.
# drawback: table areas that have paragraphs on their sides
# will include the paragraphs too.
sum_textline_height = 0
for tl in textlines:
sum_textline_height += tl.y1 - tl.y0
found = None
for area in table_areas:
# check for overlap
if tl.y0 >= area[1] and tl.y1 <= area[3]:
found = area
break
if found is not None:
table_areas.pop(found)
updated_area = (
min(tl.x0, found[0]),
min(tl.y0, found[1]),
max(found[2], tl.x1),
max(found[3], tl.y1),
)
table_areas[updated_area] = None
# Apply a heuristic to salvage headers which formatting might be off
# compared to the rest of the table.
average_textline_height = sum_textline_height / \
float(len(textlines))
col_anchors = list(
map(lambda textedge: textedge.x, relevant_textedges))
col_anchors.sort()
# add some padding to table areas
table_areas_padded = {}
for area in table_areas:
new_area = self._expand_area_for_header(
area, textlines, col_anchors, average_textline_height)
table_areas_padded[new_area] = None
return table_areas_padded
class Cell(object):
"""Defines a cell in a table with coordinates relative to a
left-bottom origin. (PDF coordinate space)
Parameters
----------
x1 : float
x-coordinate of left-bottom point.
y1 : float
y-coordinate of left-bottom point.
x2 : float
x-coordinate of right-top point.
y2 : float
y-coordinate of right-top point.
Attributes
----------
lb : tuple
Tuple representing left-bottom coordinates.
lt : tuple
Tuple representing left-top coordinates.
rb : tuple
Tuple representing right-bottom coordinates.
rt : tuple
Tuple representing right-top coordinates.
left : bool
Whether or not cell is bounded on the left.
right : bool
Whether or not cell is bounded on the right.
top : bool
Whether or not cell is bounded on the top.
bottom : bool
Whether or not cell is bounded on the bottom.
hspan : bool
Whether or not cell spans horizontally.
vspan : bool
Whether or not cell spans vertically.
text : string
Text assigned to cell.
"""
def __init__(self, x1, y1, x2, y2):
self.x1 = x1
self.y1 = y1
self.x2 = x2
self.y2 = y2
self.lb = (x1, y1)
self.lt = (x1, y2)
self.rb = (x2, y1)
self.rt = (x2, y2)
self.left = False
self.right = False
self.top = False
self.bottom = False
self.hspan = False
self.vspan = False
self._text = ""
def __repr__(self):
return "<Cell x1={} y1={} x2={} y2={}>".format(
round(self.x1, 2),
round(self.y1, 2),
round(self.x2, 2),
round(self.y2, 2)
)
@property
def text(self):
return self._text
@text.setter
def text(self, t):
self._text = "".join([self._text, t])
@property
def bound(self):
"""The number of sides on which the cell is bounded.
"""
return self.top + self.bottom + self.left + self.right
class Table(object):
"""Defines a table with coordinates relative to a left-bottom
origin. (PDF coordinate space)
Parameters
----------
cols : list
List of tuples representing column x-coordinates in increasing
order.
rows : list
List of tuples representing row y-coordinates in decreasing
order.
Attributes
----------
df : :class:`pandas.DataFrame`
shape : tuple
Shape of the table.
accuracy : float
Accuracy with which text was assigned to the cell.
whitespace : float
Percentage of whitespace in the table.
order : int
Table number on PDF page.
page : int
PDF page number.
"""
def __init__(self, cols, rows):
self.cols = cols
self.rows = rows
self.cells = [
[Cell(c[0], r[1], c[1], r[0]) for c in cols] for r in rows
]
self.df = None
self.shape = (0, 0)
self.accuracy = 0
self.whitespace = 0
self.order = None
self.page = None
def __repr__(self):
return "<{} shape={}>".format(self.__class__.__name__, self.shape)
def __lt__(self, other):
if self.page == other.page:
if self.order < other.order:
return True
if self.page < other.page:
return True
@property
def data(self):
"""Returns two-dimensional list of strings in table.
"""
d = []
for row in self.cells:
d.append([cell.text.strip() for cell in row])
return d
@property
def parsing_report(self):
"""Returns a parsing report with %accuracy, %whitespace,
table number on page and page number.
"""
# pretty?
report = {
"accuracy": round(self.accuracy, 2),
"whitespace": round(self.whitespace, 2),
"order": self.order,
"page": self.page,
}
return report
def set_all_edges(self):
"""Sets all table edges to True.
"""
for row in self.cells:
for cell in row:
cell.left = cell.right = cell.top = cell.bottom = True
return self
def set_edges(self, vertical, horizontal, joint_tol=2):
"""Sets a cell's edges to True depending on whether the cell's
coordinates overlap with the line's coordinates within a
tolerance.
Parameters
----------
vertical : list
List of detected vertical lines.
horizontal : list
List of detected horizontal lines.
"""
for v in vertical:
# find closest x coord
# iterate over y coords and find closest start and end points
i = [
i
for i, t in enumerate(self.cols)
if np.isclose(v[0], t[0], atol=joint_tol)
]
j = [
j
for j, t in enumerate(self.rows)
if np.isclose(v[3], t[0], atol=joint_tol)
]
k = [
k
for k, t in enumerate(self.rows)
if np.isclose(v[1], t[0], atol=joint_tol)
]
if not j:
continue
J = j[0]
if i == [0]: # only left edge
L = i[0]
if k:
K = k[0]
while J < K:
self.cells[J][L].left = True
J += 1
else:
K = len(self.rows)
while J < K:
self.cells[J][L].left = True
J += 1
elif i == []: # only right edge
L = len(self.cols) - 1
if k:
K = k[0]
while J < K:
self.cells[J][L].right = True
J += 1
else:
K = len(self.rows)
while J < K:
self.cells[J][L].right = True
J += 1
else: # both left and right edges
L = i[0]
if k:
K = k[0]
while J < K:
self.cells[J][L].left = True
self.cells[J][L - 1].right = True
J += 1
else:
K = len(self.rows)
while J < K:
self.cells[J][L].left = True
self.cells[J][L - 1].right = True
J += 1
for h in horizontal:
# find closest y coord
# iterate over x coords and find closest start and end points
i = [
i
for i, t in enumerate(self.rows)
if np.isclose(h[1], t[0], atol=joint_tol)
]
j = [
j
for j, t in enumerate(self.cols)
if np.isclose(h[0], t[0], atol=joint_tol)
]
k = [
k
for k, t in enumerate(self.cols)
if np.isclose(h[2], t[0], atol=joint_tol)
]
if not j:
continue
J = j[0]
if i == [0]: # only top edge
L = i[0]
if k:
K = k[0]
while J < K:
self.cells[L][J].top = True
J += 1
else:
K = len(self.cols)
while J < K:
self.cells[L][J].top = True
J += 1
elif i == []: # only bottom edge
L = len(self.rows) - 1
if k:
K = k[0]
while J < K:
self.cells[L][J].bottom = True
J += 1
else:
K = len(self.cols)
while J < K:
self.cells[L][J].bottom = True
J += 1
else: # both top and bottom edges
L = i[0]
if k:
K = k[0]
while J < K:
self.cells[L][J].top = True
self.cells[L - 1][J].bottom = True
J += 1
else:
K = len(self.cols)
while J < K:
self.cells[L][J].top = True
self.cells[L - 1][J].bottom = True
J += 1
return self
def set_border(self):
"""Sets table border edges to True.
"""
for r in range(len(self.rows)):
self.cells[r][0].left = True
self.cells[r][len(self.cols) - 1].right = True
for c in range(len(self.cols)):
self.cells[0][c].top = True
self.cells[len(self.rows) - 1][c].bottom = True
return self
def set_span(self):
"""Sets a cell's hspan or vspan attribute to True depending
on whether the cell spans horizontally or vertically.
"""
for row in self.cells:
for cell in row:
left = cell.left
right = cell.right
top = cell.top
bottom = cell.bottom
if cell.bound == 4:
continue
elif cell.bound == 3:
if not left and (right and top and bottom):
cell.hspan = True
elif not right and (left and top and bottom):
cell.hspan = True
elif not top and (left and right and bottom):
cell.vspan = True
elif not bottom and (left and right and top):
cell.vspan = True
elif cell.bound == 2:
if left and right and (not top and not bottom):
cell.vspan = True
elif top and bottom and (not left and not right):
cell.hspan = True
elif cell.bound in [0, 1]:
cell.vspan = True
cell.hspan = True
return self
def to_csv(self, path, **kwargs):
"""Writes Table to a comma-separated values (csv) file.
For kwargs, check :meth:`pandas.DataFrame.to_csv`.
Parameters
----------
path : str
Output filepath.
"""
kw = {"encoding": "utf-8", "index": False, "header": False,
"quoting": 1}
kw.update(kwargs)
self.df.to_csv(path, **kw)
def to_json(self, path, **kwargs):
"""Writes Table to a JSON file.
For kwargs, check :meth:`pandas.DataFrame.to_json`.
Parameters
----------
path : str
Output filepath.
"""
kw = {"orient": "records"}
kw.update(kwargs)
json_string = self.df.to_json(**kw)
with open(path, "w") as f:
f.write(json_string)
def to_excel(self, path, **kwargs):
"""Writes Table to an Excel file.
For kwargs, check :meth:`pandas.DataFrame.to_excel`.
Parameters
----------
path : str
Output filepath.
"""
kw = {
"sheet_name": "page-{}-table-{}".format(self.page, self.order),
"encoding": "utf-8",
}
kw.update(kwargs)
writer = pd.ExcelWriter(path)
self.df.to_excel(writer, **kw)
writer.save()
def to_html(self, path, **kwargs):
"""Writes Table to an HTML file.
For kwargs, check :meth:`pandas.DataFrame.to_html`.
Parameters
----------
path : str
Output filepath.
"""
html_string = self.df.to_html(**kwargs)
with open(path, "w") as f:
f.write(html_string)
def to_sqlite(self, path, **kwargs):
"""Writes Table to sqlite database.
For kwargs, check :meth:`pandas.DataFrame.to_sql`.
Parameters
----------
path : str
Output filepath.
"""
kw = {"if_exists": "replace", "index": False}
kw.update(kwargs)
conn = sqlite3.connect(path)
table_name = "page-{}-table-{}".format(self.page, self.order)
self.df.to_sql(table_name, conn, **kw)
conn.commit()
conn.close()
class TableList(object):
"""Defines a list of camelot.core.Table objects. Each table can
be accessed using its index.
Attributes
----------
n : int
Number of tables in the list.
"""
def __init__(self, tables):
self._tables = tables
def __repr__(self):
return "<{} n={}>".format(self.__class__.__name__, self.n)
def __len__(self):
return len(self._tables)
def __getitem__(self, idx):
return self._tables[idx]
@staticmethod
def _format_func(table, f):
return getattr(table, "to_{}".format(f))
@property
def n(self):
return len(self)
def _write_file(self, f=None, **kwargs):
dirname = kwargs.get("dirname")
root = kwargs.get("root")
ext = kwargs.get("ext")
for table in self._tables:
filename = os.path.join(
"{}-page-{}-table-{}{}".format(root, table.page, table.order,
ext)
)
filepath = os.path.join(dirname, filename)
to_format = self._format_func(table, f)
to_format(filepath)
def _compress_dir(self, **kwargs):
path = kwargs.get("path")
dirname = kwargs.get("dirname")
root = kwargs.get("root")
ext = kwargs.get("ext")
zipname = os.path.join(os.path.dirname(path), root) + ".zip"
with zipfile.ZipFile(zipname, "w", allowZip64=True) as z:
for table in self._tables:
filename = os.path.join(
"{}-page-{}-table-{}{}".format(root,
table.page,
table.order,
ext)
)
filepath = os.path.join(dirname, filename)
z.write(filepath, os.path.basename(filepath))
def export(self, path, f="csv", compress=False):
"""Exports the list of tables to specified file format.
Parameters
----------
path : str
Output filepath.
f : str
File format. Can be csv, json, excel, html and sqlite.
compress : bool
Whether or not to add files to a ZIP archive.
"""
dirname = os.path.dirname(path)
basename = os.path.basename(path)
root, ext = os.path.splitext(basename)
if compress:
dirname = tempfile.mkdtemp()
kwargs = {"path": path, "dirname": dirname, "root": root, "ext": ext}
if f in ["csv", "json", "html"]:
self._write_file(f=f, **kwargs)
if compress:
self._compress_dir(**kwargs)
elif f == "excel":
filepath = os.path.join(dirname, basename)
writer = pd.ExcelWriter(filepath)
for table in self._tables:
sheet_name = "page-{}-table-{}".format(table.page, table.order)
table.df.to_excel(writer, sheet_name=sheet_name,
encoding="utf-8")
writer.save()
if compress:
zipname = os.path.join(os.path.dirname(path), root) + ".zip"
with zipfile.ZipFile(zipname, "w", allowZip64=True) as z:
z.write(filepath, os.path.basename(filepath))
elif f == "sqlite":
filepath = os.path.join(dirname, basename)
for table in self._tables:
table.to_sqlite(filepath)
if compress:
zipname = os.path.join(os.path.dirname(path), root) + ".zip"
with zipfile.ZipFile(zipname, "w", allowZip64=True) as z:
z.write(filepath, os.path.basename(filepath))