Interim check-in, test failing and lots of todos
parent
c1c9358778
commit
f5fe92c22e
|
|
@ -1,15 +1,620 @@
|
|||
|
||||
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from __future__ import division
|
||||
import warnings
|
||||
|
||||
import numpy as np
|
||||
|
||||
import warnings
|
||||
|
||||
from .base import BaseParser
|
||||
from ..core import TextEdges
|
||||
from ..utils import (text_in_bbox, text_in_bbox_per_axis)
|
||||
from ..utils import (
|
||||
text_in_bbox,
|
||||
text_in_bbox_per_axis,
|
||||
bbox_from_text,
|
||||
distance_tl_to_bbox,
|
||||
find_columns_coordinates
|
||||
)
|
||||
|
||||
from matplotlib import patches as patches
|
||||
|
||||
# FRHTODO: Move to utils
|
||||
# maximum number of columns over which a header can spread
|
||||
MAX_COL_SPREAD_IN_HEADER = 3
|
||||
|
||||
|
||||
def todo_move_me_expand_area_for_header(area, textlines, col_anchors,
|
||||
average_row_height):
|
||||
"""The core algorithm is based on fairly strict alignment of text.
|
||||
It works ok for the table body, but might fail on tables' headers
|
||||
since they tend to be in a different font, alignment (e.g. vertical),
|
||||
etc.
|
||||
The section below tries to identify whether what's above the bbox
|
||||
identified so far has the characteristics of a table header:
|
||||
Close to the top of the body, with cells that fit within the bounds
|
||||
identified.
|
||||
"""
|
||||
new_area = area
|
||||
(left, bottom, right, top) = area
|
||||
zones = []
|
||||
|
||||
def column_spread(left, right, col_anchors):
|
||||
"""Returns the number of columns (splits on the x-axis)
|
||||
crossed by an element covering left to right.
|
||||
"""
|
||||
indexLeft = 0
|
||||
while indexLeft < len(col_anchors) \
|
||||
and col_anchors[indexLeft] < left:
|
||||
indexLeft += 1
|
||||
indexRight = indexLeft
|
||||
while indexRight < len(col_anchors) \
|
||||
and col_anchors[indexRight] < right:
|
||||
indexRight += 1
|
||||
|
||||
return indexRight - indexLeft
|
||||
|
||||
keep_searching = True
|
||||
while keep_searching:
|
||||
keep_searching = False
|
||||
# a/ first look for the closest text element above the area.
|
||||
# It will be the anchor for a possible new row.
|
||||
closest_above = None
|
||||
all_above = []
|
||||
for te in textlines:
|
||||
# higher than the table, directly within its bounds
|
||||
if te.y0 > top and te.x0 > left and te.x1 < right:
|
||||
all_above.append(te)
|
||||
if closest_above is None or closest_above.y0 > te.y0:
|
||||
closest_above = te
|
||||
|
||||
if closest_above and \
|
||||
closest_above.y0 < top + average_row_height:
|
||||
# b/ We have a candidate cell that is within the correct
|
||||
# vertical band, and directly above the table. Starting from
|
||||
# this anchor, we list all the textlines within the same row.
|
||||
tls_in_new_row = []
|
||||
top = closest_above.y1
|
||||
pushed_up = True
|
||||
while pushed_up:
|
||||
pushed_up = False
|
||||
# Iterate and extract elements that fit in the row
|
||||
# from our list
|
||||
for i in range(len(all_above) - 1, -1, -1):
|
||||
te = all_above[i]
|
||||
if te.y0 < top:
|
||||
# The bottom of this element is within our row
|
||||
# so we add it.
|
||||
tls_in_new_row.append(te)
|
||||
all_above.pop(i)
|
||||
if te.y1 > top:
|
||||
# If the top of this element raises our row's
|
||||
# band, we'll need to keep on searching for
|
||||
# overlapping items
|
||||
top = te.y1
|
||||
pushed_up = True
|
||||
|
||||
# Get the x-ranges for all the textlines, and merge the
|
||||
# x-ranges that overlap
|
||||
zones = zones + \
|
||||
list(map(lambda tl: [tl.x0, tl.x1], tls_in_new_row))
|
||||
zones.sort(key=lambda z: z[0]) # Sort by left coordinate
|
||||
# Starting from the right, if two zones overlap horizontally,
|
||||
# merge them
|
||||
merged_something = True
|
||||
while merged_something:
|
||||
merged_something = False
|
||||
for i in range(len(zones) - 1, 0, -1):
|
||||
zone_right = zones[i]
|
||||
zone_left = zones[i-1]
|
||||
if zone_left[1] >= zone_right[0]:
|
||||
zone_left[1] = max(zone_right[1], zone_left[1])
|
||||
zones.pop(i)
|
||||
merged_something = True
|
||||
|
||||
max_spread = max(
|
||||
list(
|
||||
map(
|
||||
lambda zone: column_spread(
|
||||
zone[0], zone[1], col_anchors),
|
||||
zones
|
||||
)
|
||||
)
|
||||
)
|
||||
if max_spread <= MAX_COL_SPREAD_IN_HEADER:
|
||||
# Combined, the elements we've identified don't cross more
|
||||
# than the authorized number of columns.
|
||||
# We're trying to avoid
|
||||
# 0: <BAD: Added header spans too broad>
|
||||
# 1: <A1> <B1> <C1> <D1> <E1>
|
||||
# 2: <A2> <B2> <C2> <D2> <E2>
|
||||
# if len(zones) > TEXTEDGE_REQUIRED_ELEMENTS:
|
||||
new_area = (left, bottom, right, top)
|
||||
|
||||
# At this stage we've identified a plausible row (or the
|
||||
# beginning of one).
|
||||
keep_searching = True
|
||||
|
||||
return new_area
|
||||
|
||||
|
||||
class TextEdge2(object):
|
||||
"""Defines a text edge coordinates relative to a left-bottom
|
||||
origin. (PDF coordinate space)
|
||||
|
||||
Parameters
|
||||
----------
|
||||
coord : float
|
||||
coordinate of the text edge. Depending on the alignment
|
||||
it could be a vertical or horizontal coordinate.
|
||||
|
||||
Attributes
|
||||
----------
|
||||
textlines: array
|
||||
Array of textlines that demonstrate this alignment.
|
||||
coord: float
|
||||
The coordinate aligned averaged out across textlines.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, coord, textline):
|
||||
self.coord = coord
|
||||
self.textlines = [textline]
|
||||
|
||||
def __repr__(self):
|
||||
text_inside = " | ".join(
|
||||
map(lambda x: x.get_text(), self.textlines[:2])).replace("\n", "")
|
||||
return f"<TextEdge coord={self.coord} tl={len(self.textlines)} " \
|
||||
f"textlines text='{text_inside}...'>"
|
||||
|
||||
def register_aligned_textline(self, textline, coord):
|
||||
"""Updates new textline to this alignment, adapting its average.
|
||||
"""
|
||||
# Increase the intersections for this segment, expand it up,
|
||||
# and adjust the x based on the new value
|
||||
self.coord = (self.coord * len(self.textlines) + coord) / \
|
||||
float(len(self.textlines) + 1)
|
||||
self.textlines.append(textline)
|
||||
|
||||
|
||||
class Alignments(object):
|
||||
"""Represents the number of other textlines aligned with this
|
||||
one across each edge.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
# Vertical alignments
|
||||
self.left = 0
|
||||
self.right = 0
|
||||
self.middle = 0
|
||||
|
||||
# Horizontal alignments
|
||||
self.bottom = 0
|
||||
self.top = 0
|
||||
self.center = 0
|
||||
|
||||
def __getitem__(self, key):
|
||||
return getattr(self, key)
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
return setattr(self, key, value)
|
||||
|
||||
def max_v(self):
|
||||
"""Returns the maximum number of alignments along
|
||||
one of the vertical axis (left/right/middle).
|
||||
"""
|
||||
return max(self.left, self.right, self.middle)
|
||||
|
||||
def max_h(self):
|
||||
"""Returns the maximum number of alignments along
|
||||
one of the horizontal axis (bottom/top/center).
|
||||
"""
|
||||
return max(self.bottom, self.top, self.center)
|
||||
|
||||
def max_v_edge_name(self):
|
||||
"""Returns the name of the vertical edge that has the
|
||||
maximum number of alignments.
|
||||
"""
|
||||
return max(
|
||||
["left", "right", "middle"],
|
||||
key=lambda edge_name: self[edge_name]
|
||||
)
|
||||
|
||||
def max_h_edge_name(self):
|
||||
"""Returns the name of the horizontal edge that has the
|
||||
maximum number of alignments.
|
||||
"""
|
||||
return max(
|
||||
["bottom", "top", "center"],
|
||||
key=lambda edge_name: self[edge_name]
|
||||
)
|
||||
|
||||
def alignment_score(self):
|
||||
"""We define the alignment score of a textline as the product of the
|
||||
number of aligned elements - 1. The -1 is to avoid favoring
|
||||
singletons on a long line.
|
||||
"""
|
||||
return (self.max_v()-1) * (self.max_h()-1)
|
||||
|
||||
|
||||
class TextEdges2(object):
|
||||
"""Defines a dict of vertical (top, bottom, middle) and
|
||||
horizontal (left, right, and middle) text alignments found on
|
||||
the PDF page. The dict has three keys based on the alignments,
|
||||
and each key's value is a list of camelot.core.TextEdge objects.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
# For each possible alignment, list of tuples coordinate/textlines
|
||||
self._textedges = {
|
||||
"left": [],
|
||||
"right": [],
|
||||
"middle": [],
|
||||
"bottom": [],
|
||||
"top": [],
|
||||
"center": []
|
||||
}
|
||||
# For each textline, dictionary "edge type" to
|
||||
# "number of textlines aligned"
|
||||
self._textlines_alignments = {}
|
||||
|
||||
# Maximum number of distinct aligned elements in rows/cols
|
||||
self.max_rows = None
|
||||
self.max_cols = None
|
||||
|
||||
@staticmethod
|
||||
def get_textline_coords(textline):
|
||||
"""Calculate the coordinates of each alignment
|
||||
for a given textline.
|
||||
"""
|
||||
return {
|
||||
"left": textline.x0,
|
||||
"right": textline.x1,
|
||||
"middle": (textline.x0 + textline.x1) / 2.0,
|
||||
"bottom": textline.y0,
|
||||
"top": textline.y1,
|
||||
"center": (textline.y0 + textline.y1) / 2.0,
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def _get_index_closest_point(coord, edge_array):
|
||||
"""Returns the index of the closest point
|
||||
"""
|
||||
n = len(edge_array)
|
||||
if n == 0:
|
||||
return None
|
||||
if n == 1:
|
||||
return 0
|
||||
|
||||
left = 0
|
||||
right = n - 1
|
||||
mid = 0
|
||||
|
||||
if coord >= edge_array[n - 1].coord:
|
||||
return n - 1
|
||||
if coord <= edge_array[0].coord:
|
||||
return 0
|
||||
|
||||
while left < right:
|
||||
mid = (left + right) // 2 # find the mid
|
||||
if coord < edge_array[mid].coord:
|
||||
right = mid
|
||||
elif coord > edge_array[mid].coord:
|
||||
left = mid + 1
|
||||
else:
|
||||
return mid
|
||||
|
||||
if edge_array[mid].coord > coord:
|
||||
if mid > 0 and (
|
||||
coord - edge_array[mid-1].coord <
|
||||
edge_array[mid].coord - coord):
|
||||
return mid-1
|
||||
elif edge_array[mid].coord < coord:
|
||||
if mid < n - 1 and (
|
||||
edge_array[mid+1].coord - coord <
|
||||
coord - edge_array[mid].coord):
|
||||
return mid+1
|
||||
return mid
|
||||
|
||||
# def insert(self, index, textline, align):
|
||||
# """Adds a new text edge to the current dict.
|
||||
# """
|
||||
# x = self.get_x_coord(textline, align)
|
||||
# y0 = textline.y0
|
||||
# y1 = textline.y1
|
||||
# te = TextEdge(x, y0, y1, align=align)
|
||||
# self._textedges[align].insert(index, te)
|
||||
|
||||
def _register_textline(self, textline):
|
||||
"""Updates an existing text edge in the current dict.
|
||||
"""
|
||||
coords = TextEdges2.get_textline_coords(textline)
|
||||
for alignment in self._textedges:
|
||||
edge_array = self._textedges[alignment]
|
||||
coord = coords[alignment]
|
||||
|
||||
# Find the index of the closest existing element (or 0 if none)
|
||||
idx_closest = self._get_index_closest_point(coord, edge_array)
|
||||
|
||||
# Check if the edges before/after are close enough
|
||||
# that it can be considered aligned
|
||||
idx_insert = None
|
||||
if idx_closest is None:
|
||||
idx_insert = 0
|
||||
elif np.isclose(edge_array[idx_closest].coord, coord, atol=0.5):
|
||||
closest_edge = edge_array[idx_closest]
|
||||
closest_edge.register_aligned_textline(textline, coord)
|
||||
elif edge_array[idx_closest].coord < coord:
|
||||
idx_insert = idx_closest + 1
|
||||
else:
|
||||
idx_insert = idx_closest
|
||||
if idx_insert is not None:
|
||||
new_edge = TextEdge2(coord, textline)
|
||||
edge_array.insert(idx_insert, new_edge)
|
||||
|
||||
def _register_all_text_lines(self, textlines):
|
||||
"""Add all textlines to our edge repository to
|
||||
identify alignments.
|
||||
"""
|
||||
# Identify all the edge alignments
|
||||
for tl in textlines:
|
||||
if len(tl.get_text().strip()) > 0:
|
||||
self._register_textline(tl)
|
||||
|
||||
def _compute_alignment_counts(self):
|
||||
"""Build a dictionary textline -> alignment object.
|
||||
"""
|
||||
#
|
||||
for edge_name, textedges in self._textedges.items():
|
||||
for textedge in textedges:
|
||||
for textline in textedge.textlines:
|
||||
textline_alignments = self._textlines_alignments.get(
|
||||
textline, None)
|
||||
if textline_alignments is None:
|
||||
alignments = Alignments()
|
||||
alignments[edge_name] = len(textedge.textlines)
|
||||
self._textlines_alignments[textline] = alignments
|
||||
else:
|
||||
textline_alignments[edge_name] = len(
|
||||
textedge.textlines)
|
||||
|
||||
# Finally calculate the overall maximum number of rows/cols
|
||||
self.max_rows = max(
|
||||
map(
|
||||
lambda alignments: alignments.max_h(),
|
||||
self._textlines_alignments.values()
|
||||
),
|
||||
default=0
|
||||
)
|
||||
self.max_cols = max(
|
||||
map(
|
||||
lambda alignments: alignments.max_v(),
|
||||
self._textlines_alignments.values()
|
||||
),
|
||||
default=0
|
||||
)
|
||||
|
||||
def _calculate_gaps_thresholds(self, percentile=75):
|
||||
"""Identify reasonable gaps between lines and columns based
|
||||
on gaps observed across alignments.
|
||||
This can be used to reject cells as too far away from
|
||||
the core table.
|
||||
"""
|
||||
h_gaps, v_gaps = [], []
|
||||
for edge_name in self._textedges:
|
||||
edge_array = self._textedges[edge_name]
|
||||
gaps = []
|
||||
vertical = edge_name in ["left", "right", "middle"]
|
||||
sort_function = (lambda tl: tl.y0) \
|
||||
if vertical \
|
||||
else (lambda tl: tl.x0)
|
||||
for alignments in edge_array:
|
||||
tls = sorted(
|
||||
alignments.textlines,
|
||||
key=sort_function,
|
||||
reverse=True
|
||||
)
|
||||
for i in range(1, len(tls)):
|
||||
# If the lines are vertically aligned (stacked up), we
|
||||
# record the vertical gap between them
|
||||
if vertical:
|
||||
gap = tls[i-1].y1 - tls[i].y0
|
||||
else:
|
||||
gap = tls[i-1].x1 - tls[i].x0
|
||||
gaps.append(gap)
|
||||
if gaps:
|
||||
if vertical:
|
||||
v_gaps.append(np.percentile(gaps, percentile))
|
||||
else:
|
||||
h_gaps.append(np.percentile(gaps, percentile))
|
||||
direction_str = 'vertical' if vertical else 'horizontal'
|
||||
rounded_gaps = list(map(lambda x: round(x, 2), gaps))
|
||||
print(
|
||||
f"{direction_str} gaps found "
|
||||
f"for {edge_name}: "
|
||||
f"{rounded_gaps} "
|
||||
f"with {percentile}th percentile "
|
||||
f"{np.percentile(gaps, percentile)}"
|
||||
)
|
||||
return max(h_gaps, default=None), max(v_gaps, default=None)
|
||||
|
||||
def _remove_unconnected_edges(self):
|
||||
"""Weed out elements which are only connected to others vertically
|
||||
or horizontally. There needs to be connections across both
|
||||
dimensions.
|
||||
"""
|
||||
singleton_textlines = []
|
||||
removed_singletons = True
|
||||
while removed_singletons:
|
||||
removed_singletons = False
|
||||
for edge_type in self._textedges:
|
||||
# For each alignment edge, remove items if they are singletons
|
||||
# either horizontally or vertically
|
||||
for te in self._textedges[edge_type]:
|
||||
for i in range(len(te.textlines) - 1, -1, -1):
|
||||
tl = te.textlines[i]
|
||||
alignments = self._textlines_alignments[tl]
|
||||
if alignments.max_h() <= 1 or alignments.max_v() <= 1:
|
||||
singleton_textlines.append(tl)
|
||||
del te.textlines[i]
|
||||
removed_singletons = True
|
||||
self._textlines_alignments = {}
|
||||
self._compute_alignment_counts()
|
||||
|
||||
def _build_bbox_candidate(self, debug_info=None):
|
||||
""" Seed the process with the textline with the highest alignment
|
||||
score, then expand the bbox with textlines within threshold.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
debug_info : array
|
||||
Optional parameter array, in which to store extra information
|
||||
to help later visualization of the table creation.
|
||||
"""
|
||||
if self.max_rows <= 1 or self.max_cols <= 1:
|
||||
return None
|
||||
tls_search_space = list(self._textlines_alignments.keys())
|
||||
|
||||
def get_best_textline(textlines):
|
||||
# Find the textline with the highest alignment score
|
||||
return max(
|
||||
textlines,
|
||||
key=lambda textline:
|
||||
self._textlines_alignments[textline].alignment_score(),
|
||||
default=None
|
||||
)
|
||||
|
||||
# Calculate the 75th percentile of the horizontal/vertical
|
||||
# gaps between textlines. Use this as a reference for a threshold
|
||||
# to not exceed while looking for table boundaries.
|
||||
gaps_hv = self._calculate_gaps_thresholds(75)
|
||||
if (gaps_hv[0] is None or gaps_hv[1] is None):
|
||||
return None
|
||||
max_h_gap, max_v_gap = gaps_hv[0] * 3, gaps_hv[1] * 3
|
||||
|
||||
if debug_info is not None:
|
||||
# Store debug info
|
||||
debug_info_search = {
|
||||
"max_h_gap": max_h_gap,
|
||||
"max_v_gap": max_v_gap,
|
||||
"iterations": []
|
||||
}
|
||||
debug_info.append(debug_info_search)
|
||||
else:
|
||||
debug_info_search = None
|
||||
|
||||
MINIMUM_TEXTLINES_IN_TABLE = 6
|
||||
tl_most_aligned = get_best_textline(tls_search_space)
|
||||
bbox = (tl_most_aligned.x0, tl_most_aligned.y0,
|
||||
tl_most_aligned.x1, tl_most_aligned.y1)
|
||||
tls_search_space.remove(tl_most_aligned)
|
||||
tls_in_bbox = [tl_most_aligned]
|
||||
last_bbox = None
|
||||
while last_bbox != bbox:
|
||||
if debug_info_search is not None:
|
||||
# Store debug info
|
||||
debug_info_search["iterations"].append(bbox)
|
||||
|
||||
last_bbox = bbox
|
||||
# Go through all remaining textlines, expand our bbox
|
||||
# if a textline is within our proximity tolerance
|
||||
for i in range(len(tls_search_space) - 1, -1, -1):
|
||||
tl = tls_search_space[i]
|
||||
h_distance, v_distance = distance_tl_to_bbox(tl, bbox)
|
||||
|
||||
# Move textline to our bbox and expand the bbox accordingly
|
||||
# if the textline is close.
|
||||
if h_distance < max_h_gap and v_distance < max_v_gap:
|
||||
tls_in_bbox.append(tl)
|
||||
bbox = (
|
||||
min(bbox[0], tl.x0),
|
||||
min(bbox[1], tl.y0),
|
||||
max(bbox[2], tl.x1),
|
||||
max(bbox[3], tl.y1)
|
||||
)
|
||||
del tls_search_space[i]
|
||||
if len(tls_in_bbox) > MINIMUM_TEXTLINES_IN_TABLE:
|
||||
return bbox
|
||||
else:
|
||||
print(f"Only {len(tls_in_bbox)}, that's not enough.")
|
||||
return None
|
||||
|
||||
def generate(self, textlines):
|
||||
"""Generate the text edge dictionaries based on the
|
||||
input textlines.
|
||||
"""
|
||||
self._register_all_text_lines(textlines)
|
||||
self._compute_alignment_counts()
|
||||
|
||||
def plotFRHAlignments(self, table, plt):
|
||||
"""Displays a visualization of the alignments as currently computed.
|
||||
"""
|
||||
fig = plt.figure()
|
||||
ax = fig.add_subplot(111, aspect="equal")
|
||||
img = table.get_pdf_image()
|
||||
ax.imshow(img, extent=(0, table.pdf_size[0], 0, table.pdf_size[1]))
|
||||
|
||||
tls_by_alignment_score = sorted(
|
||||
self._textlines_alignments.keys(),
|
||||
key=lambda textline:
|
||||
self._textlines_alignments[textline].alignment_score(),
|
||||
reverse=True
|
||||
)
|
||||
|
||||
for tl, alignments in self._textlines_alignments.items():
|
||||
color = "red"
|
||||
if tl == tls_by_alignment_score[0]:
|
||||
color = "blue"
|
||||
ax.add_patch(
|
||||
patches.Rectangle(
|
||||
(tl.x0, tl.y0),
|
||||
tl.x1 - tl.x0, tl.y1 - tl.y0,
|
||||
color=color,
|
||||
alpha=0.5
|
||||
)
|
||||
)
|
||||
ax.text(
|
||||
tl.x0 - 5,
|
||||
tl.y0 - 5,
|
||||
f"{alignments.max_h()}x{alignments.max_v()}",
|
||||
fontsize=5,
|
||||
color="black"
|
||||
)
|
||||
|
||||
def plotFRHTableSearch(self, plot, debug_info):
|
||||
if debug_info is None:
|
||||
return
|
||||
for box_id, bbox_search in enumerate(debug_info["bboxes_searches"]):
|
||||
max_h_gap = bbox_search["max_h_gap"]
|
||||
max_v_gap = bbox_search["max_v_gap"]
|
||||
iterations = bbox_search["iterations"]
|
||||
for iteration, bbox in enumerate(iterations):
|
||||
final = iteration == len(iterations) - 1
|
||||
plot.add_patch(
|
||||
patches.Rectangle(
|
||||
(bbox[0], bbox[1]),
|
||||
bbox[2] - bbox[0], bbox[3] - bbox[1],
|
||||
color="red",
|
||||
linewidth=5 if final else 2,
|
||||
fill=False
|
||||
)
|
||||
)
|
||||
plot.text(
|
||||
bbox[0],
|
||||
bbox[1],
|
||||
f"box #{box_id+1} / iter #{iteration}",
|
||||
fontsize=12,
|
||||
color="black",
|
||||
verticalalignment="top",
|
||||
bbox=dict(facecolor="orange", alpha=0.5)
|
||||
)
|
||||
|
||||
plot.add_patch(
|
||||
patches.Rectangle(
|
||||
(bbox[0]-max_h_gap, bbox[1]-max_v_gap),
|
||||
bbox[2] - bbox[0] + 2 * max_h_gap,
|
||||
bbox[3] - bbox[1] + 2 * max_v_gap,
|
||||
color="orange",
|
||||
fill=False
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
class Hybrid(BaseParser):
|
||||
|
|
@ -78,6 +683,7 @@ class Hybrid(BaseParser):
|
|||
self.row_tol = row_tol
|
||||
self.column_tol = column_tol
|
||||
|
||||
# FRHTODO: Check if needed, refactor with Stream
|
||||
@staticmethod
|
||||
def _text_bbox(t_bbox):
|
||||
"""Returns bounding box for the text present on a page.
|
||||
|
|
@ -101,6 +707,7 @@ class Hybrid(BaseParser):
|
|||
text_bbox = (xmin, ymin, xmax, ymax)
|
||||
return text_bbox
|
||||
|
||||
# FRHTODO: Check if needed, refactor with Stream
|
||||
@staticmethod
|
||||
def _group_rows(text, row_tol=2):
|
||||
"""Groups PDFMiner text objects into rows vertically
|
||||
|
|
@ -139,6 +746,7 @@ class Hybrid(BaseParser):
|
|||
rows.append(sorted(temp, key=lambda t: t.x0))
|
||||
return rows
|
||||
|
||||
# FRHTODO: Check if needed, refactor with Stream
|
||||
@staticmethod
|
||||
def _merge_columns(l, column_tol=0):
|
||||
"""Merges column boundaries horizontally if they overlap
|
||||
|
|
@ -184,6 +792,7 @@ class Hybrid(BaseParser):
|
|||
merged.append(higher)
|
||||
return merged
|
||||
|
||||
# FRHTODO: Check if needed, refactor with Stream
|
||||
@staticmethod
|
||||
def _join_rows(rows_grouped, text_y_max, text_y_min):
|
||||
"""Makes row coordinates continuous. For the row to "touch"
|
||||
|
|
@ -217,6 +826,7 @@ class Hybrid(BaseParser):
|
|||
row_boundaries[-1][1] = text_y_min
|
||||
return row_boundaries
|
||||
|
||||
# FRHTODO: Check if needed, refactor with Stream
|
||||
@staticmethod
|
||||
def _add_columns(cols, text, row_tol):
|
||||
"""Adds columns to existing list by taking into account
|
||||
|
|
@ -247,6 +857,7 @@ class Hybrid(BaseParser):
|
|||
cols.extend(Hybrid._merge_columns(sorted(new_cols)))
|
||||
return cols
|
||||
|
||||
# FRHTODO: Check if needed, refactor with Stream
|
||||
@staticmethod
|
||||
def _join_columns(cols, text_x_min, text_x_max):
|
||||
"""Makes column coordinates continuous.
|
||||
|
|
@ -271,66 +882,81 @@ class Hybrid(BaseParser):
|
|||
cols = [(cols[i], cols[i + 1]) for i in range(0, len(cols) - 1)]
|
||||
return cols
|
||||
|
||||
# FRHTODO: Check is needed, refactor with Stream
|
||||
def _validate_columns(self):
|
||||
if self.table_areas is not None and self.columns is not None:
|
||||
if len(self.table_areas) != len(self.columns):
|
||||
raise ValueError("Length of table_areas and columns"
|
||||
" should be equal")
|
||||
|
||||
def _nurminen_table_detection(self, textlines):
|
||||
"""A general implementation of the table detection algorithm
|
||||
described by Anssi Nurminen's master's thesis.
|
||||
Link: https://dspace.cc.tut.fi/dpub/bitstream/handle/123456789/21520/Nurminen.pdf?sequence=3 # noqa
|
||||
|
||||
Assumes that tables are situated relatively far apart
|
||||
vertically.
|
||||
"""
|
||||
# TODO: add support for arabic text #141
|
||||
# sort textlines in reading order
|
||||
textlines.sort(key=lambda x: (-x.y0, x.x0))
|
||||
textedges = TextEdges(edge_tol=self.edge_tol)
|
||||
# generate left, middle and right textedges
|
||||
textedges.generate(textlines)
|
||||
# select relevant edges
|
||||
relevant_textedges = textedges.get_relevant()
|
||||
self.textedges.extend(relevant_textedges)
|
||||
# guess table areas using textlines and relevant edges
|
||||
table_bbox = textedges.get_table_areas(textlines, relevant_textedges)
|
||||
# treat whole page as table area if no table areas found
|
||||
if not table_bbox:
|
||||
table_bbox = {(0, 0, self.pdf_width, self.pdf_height): None}
|
||||
|
||||
return table_bbox
|
||||
|
||||
def _generate_table_bbox(self):
|
||||
self.textedges = []
|
||||
if self.table_areas is None:
|
||||
hor_text = self.horizontal_text
|
||||
if self.table_regions is not None:
|
||||
# filter horizontal text
|
||||
hor_text = []
|
||||
for region in self.table_regions:
|
||||
x1, y1, x2, y2 = region.split(",")
|
||||
x1 = float(x1)
|
||||
y1 = float(y1)
|
||||
x2 = float(x2)
|
||||
y2 = float(y2)
|
||||
region_text = text_in_bbox(
|
||||
(x1, y2, x2, y1), self.horizontal_text)
|
||||
hor_text.extend(region_text)
|
||||
# find tables based on nurminen's detection algorithm
|
||||
table_bbox = self._nurminen_table_detection(hor_text)
|
||||
# FRHTODO: get debug_info to work again
|
||||
def _generate_table_bbox(self, debug_info=None):
|
||||
textlines = self.horizontal_text + self.vertical_text
|
||||
textlines_processed = {}
|
||||
self.table_bbox = {}
|
||||
if debug_info is not None:
|
||||
debug_info_bbox_searches = []
|
||||
debug_info["bboxes_searches"] = debug_info_bbox_searches
|
||||
else:
|
||||
table_bbox = {}
|
||||
for area in self.table_areas:
|
||||
x1, y1, x2, y2 = area.split(",")
|
||||
x1 = float(x1)
|
||||
y1 = float(y1)
|
||||
x2 = float(x2)
|
||||
y2 = float(y2)
|
||||
table_bbox[(x1, y2, x2, y1)] = None
|
||||
self.table_bbox = table_bbox
|
||||
debug_info_bbox_searches = None
|
||||
|
||||
while True:
|
||||
self.textedges = TextEdges2()
|
||||
self.textedges.generate(textlines)
|
||||
self.textedges._remove_unconnected_edges()
|
||||
bbox = self.textedges._build_bbox_candidate(
|
||||
debug_info_bbox_searches
|
||||
)
|
||||
if bbox is None:
|
||||
break
|
||||
|
||||
# Get all the textlines that are at least 50% in the box
|
||||
tls_in_bbox = text_in_bbox(bbox, textlines)
|
||||
|
||||
# and expand the text box to fully contain them
|
||||
bbox = bbox_from_text(tls_in_bbox)
|
||||
|
||||
# FRH: do we need to repeat this?
|
||||
# tls_in_bbox = text_in_bbox(bbox, textlines)
|
||||
cols_anchors = find_columns_coordinates(tls_in_bbox)
|
||||
|
||||
# Apply a heuristic to salvage headers which formatting might be
|
||||
# off compared to the rest of the table.
|
||||
|
||||
# Calculate the average height of each textline
|
||||
# FRHTODO: reuse the gap threshold from earlier?
|
||||
alignments = self.textedges._textlines_alignments.keys()
|
||||
average_tl_height = sum(
|
||||
map(
|
||||
lambda tl: tl.y1 - tl.y0,
|
||||
alignments
|
||||
)) / len(alignments)
|
||||
|
||||
expanded_bbox = todo_move_me_expand_area_for_header(
|
||||
bbox,
|
||||
textlines,
|
||||
cols_anchors,
|
||||
average_tl_height
|
||||
)
|
||||
|
||||
if debug_info is not None:
|
||||
debug_info["col_searches"].append({
|
||||
"core_bbox": bbox,
|
||||
"cols_anchors": cols_anchors,
|
||||
"expanded_bbox": expanded_bbox
|
||||
})
|
||||
|
||||
self.table_bbox[expanded_bbox] = None
|
||||
|
||||
# Remember what textlines we processed, and repeat
|
||||
for tl in tls_in_bbox:
|
||||
textlines_processed[tl] = None
|
||||
textlines = list(filter(
|
||||
lambda tl: tl not in textlines_processed,
|
||||
textlines
|
||||
))
|
||||
|
||||
# FRHTODO: Check is needed, refactor with Stream
|
||||
def _generate_columns_and_rows(self, table_idx, tk):
|
||||
# select elements which lie within table_bbox
|
||||
self.t_bbox = text_in_bbox_per_axis(
|
||||
|
|
@ -408,6 +1034,7 @@ class Hybrid(BaseParser):
|
|||
|
||||
return cols, rows
|
||||
|
||||
# FRHTODO: Check is needed, refactor with Stream
|
||||
def _generate_table(self, table_idx, cols, rows, **kwargs):
|
||||
table = self._initialize_new_table(table_idx, cols, rows)
|
||||
table = table.set_all_edges()
|
||||
|
|
@ -420,7 +1047,7 @@ class Hybrid(BaseParser):
|
|||
|
||||
return table
|
||||
|
||||
def extract_tables(self):
|
||||
def extract_tables(self, debug_info=None):
|
||||
if self._document_has_no_text():
|
||||
return []
|
||||
|
||||
|
|
|
|||
|
|
@ -823,6 +823,10 @@ data_stream_table_rotated = [
|
|||
],
|
||||
]
|
||||
|
||||
# The streaming algorithm incorrectly includes a header in the result.
|
||||
# Trimming the table for the test of hybrid, which doesn't include it.
|
||||
data_hybrid_table_rotated = data_stream_table_rotated[1:]
|
||||
|
||||
data_stream_two_tables_1 = [
|
||||
[
|
||||
"Program. Represents arrests reported (not charged) by 12,910 "
|
||||
|
|
|
|||
|
|
@ -163,7 +163,7 @@ def test_hybrid():
|
|||
|
||||
|
||||
def test_hybrid_table_rotated():
|
||||
df = pd.DataFrame(data_stream_table_rotated)
|
||||
df = pd.DataFrame(data_hybrid_table_rotated)
|
||||
|
||||
filename = os.path.join(testdir, "clockwise_table_2.pdf")
|
||||
tables = camelot.read_pdf(filename, flavor="hybrid")
|
||||
|
|
|
|||
Loading…
Reference in New Issue