# -*- coding: utf-8 -*- """Implementation of hybrid table parser.""" from __future__ import division import numpy as np import copy from .base import TextBaseParser from ..core import ( TextAlignments, ALL_ALIGNMENTS, HORIZONTAL_ALIGNMENTS, VERTICAL_ALIGNMENTS ) from ..utils import ( bbox_from_str, text_in_bbox, bbox_from_textlines, distance_tl_to_bbox, find_columns_coordinates ) # maximum number of columns over which a header can spread MAX_COL_SPREAD_IN_HEADER = 3 def search_header_from_body_bbox(body_bbox, textlines, col_anchors, max_v_gap): """Expand a bbox vertically up by looking for plausible headers. The core algorithm is based on fairly strict alignment of text. It works for the table body, but might fail on tables' headers since they tend to be in a different font, alignment (e.g. vertical), etc. This method evalutes the area above the table body's bbox for characteristics of a table header: close to the top of the body, with cells that fit within the horizontal bounds identified. """ new_bbox = body_bbox (left, bottom, right, top) = body_bbox zones = [] def column_spread(left, right, col_anchors): """Get the number of columns crossed by a segment [left, right].""" indexLeft = 0 while indexLeft < len(col_anchors) \ and col_anchors[indexLeft] < left: indexLeft += 1 indexRight = indexLeft while indexRight < len(col_anchors) \ and col_anchors[indexRight] < right: indexRight += 1 return indexRight - indexLeft keep_searching = True while keep_searching: keep_searching = False # a/ first look for the closest text element above the bbox. # It will be the anchor for a possible new row. closest_above = None all_above = [] for te in textlines: # higher than the table, >50% within its bounds te_center = 0.5 * (te.x0 + te.x1) if te.y0 > top and left < te_center < right: all_above.append(te) if closest_above is None or closest_above.y0 > te.y0: closest_above = te if closest_above and \ closest_above.y0 < top + max_v_gap: # b/ We have a candidate cell that is within the correct # vertical band, and directly above the table. Starting from # this anchor, we list all the textlines within the same row. tls_in_new_row = [] top = closest_above.y1 pushed_up = True while pushed_up: pushed_up = False # Iterate and extract elements that fit in the row # from our list for i in range(len(all_above) - 1, -1, -1): te = all_above[i] if te.y0 < top: # The bottom of this element is within our row # so we add it. tls_in_new_row.append(te) all_above.pop(i) if te.y1 > top: # If the top of this element raises our row's # band, we'll need to keep on searching for # overlapping items top = te.y1 pushed_up = True # Get the x-ranges for all the textlines, and merge the # x-ranges that overlap zones = zones + \ list(map(lambda tl: [tl.x0, tl.x1], tls_in_new_row)) zones.sort(key=lambda z: z[0]) # Sort by left coordinate # Starting from the right, if two zones overlap horizontally, # merge them merged_something = True while merged_something: merged_something = False for i in range(len(zones) - 1, 0, -1): zone_right = zones[i] zone_left = zones[i-1] if zone_left[1] >= zone_right[0]: zone_left[1] = max(zone_right[1], zone_left[1]) zones.pop(i) merged_something = True max_spread = max( list( map( lambda zone: column_spread( zone[0], zone[1], col_anchors), zones ) ) ) if max_spread <= MAX_COL_SPREAD_IN_HEADER: # Combined, the elements we've identified don't cross more # than the authorized number of columns. # We're trying to avoid # 0: # 1: # 2: # if len(zones) > TEXTEDGE_REQUIRED_ELEMENTS: new_bbox = (left, bottom, right, top) # At this stage we've identified a plausible row (or the # beginning of one). keep_searching = True return new_bbox class AlignmentCounter(object): """ For a given textline, represent all other textlines aligned with it. A textline can be vertically aligned with others if their bbox match on left, right, or middle coord, and horizontally aligned if they match top, bottom, or center coord. """ def __init__(self): self.alignment_to_occurrences = {} for alignment in ALL_ALIGNMENTS: self.alignment_to_occurrences[alignment] = [] def __getitem__(self, key): return self.alignment_to_occurrences[key] def __setitem__(self, key, value): self.alignment_to_occurrences[key] = value return value def max_alignments(self, alignment_ids=None): """Get the alignment dimension with the max number of textlines. """ alignment_ids = alignment_ids or self.alignment_to_occurrences.keys() alignment_items = map( lambda alignment_id: ( alignment_id, self.alignment_to_occurrences[alignment_id] ), alignment_ids ) return max(alignment_items, key=lambda item: len(item[1])) def max_v(self): """Tuple (alignment_id, textlines) of largest vertical row. """ # Note that the horizontal alignments (left, center, right) are aligned # vertically in a column, so max_v is calculated by looking at # horizontal alignments. return self.max_alignments(HORIZONTAL_ALIGNMENTS) def max_h(self): """Tuple (alignment_id, textlines) of largest horizontal col. """ return self.max_alignments(VERTICAL_ALIGNMENTS) def max_v_count(self): """Returns the maximum number of alignments along one of the vertical axis (left/right/middle). """ return len(self.max_v()[1]) def max_h_count(self): """Returns the maximum number of alignments along one of the horizontal axis (bottom/top/center). """ return len(self.max_h()[1]) def alignment_score(self): """We define the alignment score of a textline as the product of the number of aligned elements - 1. The -1 is to avoid favoring singletons on a long line. """ return (self.max_v_count()-1) * (self.max_h_count()-1) class TextNetworks(TextAlignments): """Text elements connected by vertical AND horizontal alignments. The alignment dict has six keys based on the hor/vert alignments, and each key's value is a list of camelot.core.TextAlignment objects. """ def __init__(self): super().__init__(ALL_ALIGNMENTS) # For each textline, dictionary "alignment type" to # "number of textlines aligned" self._textline_to_alignments = {} def _update_alignment(self, alignment, coord, textline): alignment.register_aligned_textline(textline, coord) def _register_all_text_lines(self, textlines): """Add all textlines to our network repository to identify alignments. """ # Identify all the alignments for tl in textlines: if len(tl.get_text().strip()) > 0: self._register_textline(tl) def _compute_alignment_counts(self): """Build a dictionary textline -> alignment object. """ for align_id, textedges in self._text_alignments.items(): for textedge in textedges: for textline in textedge.textlines: alignments = self._textline_to_alignments.get( textline, None) if alignments is None: alignments = AlignmentCounter() self._textline_to_alignments[textline] = alignments alignments[align_id] = textedge.textlines def _calculate_gaps_thresholds(self, percentile=75): """Identify reasonable gaps between lines and columns based on gaps observed across alignments. This can be used to reject cells as too far away from the core table. """ h_gaps, v_gaps = [], [] for align_id in self._text_alignments: edge_array = self._text_alignments[align_id] gaps = [] vertical = align_id in HORIZONTAL_ALIGNMENTS sort_function = (lambda tl: tl.y0) \ if vertical \ else (lambda tl: tl.x0) for alignments in edge_array: tls = sorted( alignments.textlines, key=sort_function, reverse=True ) for i in range(1, len(tls)): # If the lines are vertically aligned (stacked up), we # record the vertical gap between them if vertical: gap = tls[i-1].y1 - tls[i].y0 else: gap = tls[i-1].x1 - tls[i].x0 gaps.append(gap) if gaps: if vertical: v_gaps.append(np.percentile(gaps, percentile)) else: h_gaps.append(np.percentile(gaps, percentile)) direction_str = 'vertical' if vertical else 'horizontal' rounded_gaps = list(map(lambda x: round(x, 2), gaps)) print( f"{direction_str} gaps found " f"for {align_id}: " f"{rounded_gaps} " f"with {percentile}th percentile " f"{np.percentile(gaps, percentile)}" ) return max(h_gaps, default=None), max(v_gaps, default=None) def _remove_unconnected_edges(self): """Weed out elements which are only connected to others vertically or horizontally. There needs to be connections across both dimensions. """ removed_singletons = True while removed_singletons: removed_singletons = False for alignment_id, textalignments in self._text_alignments.items(): # For each alignment edge, remove items if they are singletons # either horizontally or vertically for ta in textalignments: for i in range(len(ta.textlines) - 1, -1, -1): tl = ta.textlines[i] alignments = self._textline_to_alignments[tl] if alignments.max_h_count() <= 1 or \ alignments.max_v_count() <= 1: del ta.textlines[i] removed_singletons = True self._textline_to_alignments = {} self._compute_alignment_counts() def most_connected_textline(self): """ Retrieve the textline that is most connected across vertical and horizontal axis. """ # Find the textline with the highest alignment score return max( self._textline_to_alignments.keys(), key=lambda textline: self._textline_to_alignments[textline].alignment_score(), default=None ) def _compute_plausible_gaps(self): """ Evaluate plausible gaps between cells horizontally and vertically based on the textlines aligned with the most connected textline. Returns ------- gaps_hv : tuple (horizontal_gap, horizontal_gap) in pdf coordinate space. """ # Determine the textline that has the most combined # alignments across horizontal and vertical axis. # It will serve as a reference axis along which to collect the average # spacing between rows/cols. most_aligned_tl = self.most_connected_textline() if most_aligned_tl is None: return None # Retrieve the list of textlines it's aligned with, across both # axis best_alignment = self._textline_to_alignments[most_aligned_tl] ref_h_alignment_id, ref_h_textlines = best_alignment.max_h() ref_v_alignment_id, ref_v_textlines = best_alignment.max_v() if len(ref_v_textlines) <= 1 or len(ref_h_textlines) <= 1: return None h_textlines = sorted( ref_h_textlines, key=lambda tl: tl.x0, reverse=True ) v_textlines = sorted( ref_v_textlines, key=lambda tl: tl.y0, reverse=True ) h_gaps, v_gaps = [], [] for i in range(1, len(v_textlines)): v_gaps.append(v_textlines[i-1].y0 - v_textlines[i].y0) for i in range(1, len(h_textlines)): h_gaps.append(h_textlines[i-1].x0 - h_textlines[i].x0) if (not h_gaps or not v_gaps): return None percentile = 75 gaps_hv = ( 2.0 * np.percentile(h_gaps, percentile), 2.0 * np.percentile(v_gaps, percentile) ) return gaps_hv def _build_bbox_candidate(self, gaps_hv, parse_details=None): """ Seed the process with the textline with the highest alignment score, then expand the bbox with textlines within threshold. Parameters ---------- gaps_hv : tuple The maximum distance allowed to consider surrounding lines/columns as part of the same table. parse_details : array (optional) Optional parameter array, in which to store extra information to help later visualization of the table creation. """ # First, determine the textline that has the most combined # alignments across horizontal and vertical axis. # It will serve both as a starting point for the table boundary # search, and as a way to estimate the average spacing between # rows/cols. most_aligned_tl = self.most_connected_textline() # Calculate the 75th percentile of the horizontal/vertical # gaps between textlines. Use this as a reference for a threshold # to not exceed while looking for table boundaries. max_h_gap, max_v_gap = gaps_hv[0], gaps_hv[1] if parse_details is not None: # Store debug info parse_details_search = { "max_h_gap": max_h_gap, "max_v_gap": max_v_gap, "iterations": [] } parse_details.append(parse_details_search) else: parse_details_search = None MINIMUM_TEXTLINES_IN_TABLE = 6 bbox = (most_aligned_tl.x0, most_aligned_tl.y0, most_aligned_tl.x1, most_aligned_tl.y1) # For the body of the table, we only consider cells with alignments # on both axis. tls_search_space = list(self._textline_to_alignments.keys()) # tls_search_space = [] tls_search_space.remove(most_aligned_tl) tls_in_bbox = [most_aligned_tl] last_bbox = None while last_bbox != bbox: if parse_details_search is not None: # Store debug info parse_details_search["iterations"].append(bbox) last_bbox = bbox # Go through all remaining textlines, expand our bbox # if a textline is within our proximity tolerance for i in range(len(tls_search_space) - 1, -1, -1): tl = tls_search_space[i] h_distance, v_distance = distance_tl_to_bbox(tl, bbox) # Move textline to our bbox and expand the bbox accordingly # if the textline is close. if h_distance < max_h_gap and v_distance < max_v_gap: tls_in_bbox.append(tl) bbox = ( min(bbox[0], tl.x0), min(bbox[1], tl.y0), max(bbox[2], tl.x1), max(bbox[3], tl.y1) ) del tls_search_space[i] if len(tls_in_bbox) > MINIMUM_TEXTLINES_IN_TABLE: return bbox else: print(f"Only {len(tls_in_bbox)}, that's not enough.") return None def generate(self, textlines): """Generate the text edge dictionaries based on the input textlines. """ self._register_all_text_lines(textlines) self._compute_alignment_counts() class Hybrid(TextBaseParser): """Hybrid method of parsing looks for spaces between text to parse the table. If you want to specify columns when specifying multiple table areas, make sure that the length of both lists are equal. Parameters ---------- table_regions : list, optional (default: None) List of page regions that may contain tables of the form x1,y1,x2,y2 where (x1, y1) -> left-top and (x2, y2) -> right-bottom in PDF coordinate space. table_areas : list, optional (default: None) List of table area strings of the form x1,y1,x2,y2 where (x1, y1) -> left-top and (x2, y2) -> right-bottom in PDF coordinate space. columns : list, optional (default: None) List of column x-coordinates strings where the coordinates are comma-separated. split_text : bool, optional (default: False) Split text that spans across multiple cells. flag_size : bool, optional (default: False) Flag text based on font size. Useful to detect super/subscripts. Adds around flagged text. strip_text : str, optional (default: '') Characters that should be stripped from a string before assigning it to a cell. edge_tol : int, optional (default: 50) Tolerance parameter for extending textedges vertically. row_tol : int, optional (default: 2) Tolerance parameter used to combine text vertically, to generate rows. column_tol : int, optional (default: 0) Tolerance parameter used to combine text horizontally, to generate columns. """ def __init__( self, table_regions=None, table_areas=None, columns=None, flag_size=False, split_text=False, strip_text="", edge_tol=None, row_tol=2, column_tol=0, debug=False, **kwargs ): super().__init__( "hybrid", table_regions=table_regions, table_areas=table_areas, columns=columns, flag_size=flag_size, split_text=split_text, strip_text=strip_text, edge_tol=edge_tol, row_tol=row_tol, column_tol=column_tol, debug=debug, ) def _generate_table_bbox(self): if self.table_areas is not None: table_bbox = {} for area_str in self.table_areas: table_bbox[bbox_from_str(area_str)] = None self.table_bbox = table_bbox return # Take all the textlines that are not just spaces all_textlines = [ t for t in self.horizontal_text + self.vertical_text if len(t.get_text().strip()) > 0 ] textlines = self._apply_regions_filter(all_textlines) textlines_processed = {} self.table_bbox = {} if self.parse_details is not None: parse_details_network_searches = [] self.parse_details["network_searches"] = \ parse_details_network_searches parse_details_bbox_searches = [] self.parse_details["bbox_searches"] = parse_details_bbox_searches else: parse_details_network_searches = None parse_details_bbox_searches = None while True: text_network = TextNetworks() text_network.generate(textlines) text_network._remove_unconnected_edges() gaps_hv = text_network._compute_plausible_gaps() if gaps_hv is None: return None # edge_tol instructions override the calculated vertical gap edge_tol_hv = ( gaps_hv[0], gaps_hv[1] if self.edge_tol is None else self.edge_tol ) bbox = text_network._build_bbox_candidate( edge_tol_hv, parse_details=parse_details_bbox_searches ) if bbox is None: break if parse_details_network_searches is not None: # Preserve the current edge calculation for display debugging parse_details_network_searches.append( copy.deepcopy(text_network) ) # Get all the textlines that are at least 50% in the box tls_in_bbox = text_in_bbox(bbox, textlines) # and expand the text box to fully contain them bbox = bbox_from_textlines(tls_in_bbox) # FRH: do we need to repeat this? # tls_in_bbox = text_in_bbox(bbox, textlines) cols_anchors = find_columns_coordinates(tls_in_bbox) # Apply a heuristic to salvage headers which formatting might be # off compared to the rest of the table. expanded_bbox = search_header_from_body_bbox( bbox, textlines, cols_anchors, gaps_hv[1] ) if self.parse_details is not None: if "col_searches" not in self.parse_details: self.parse_details["col_searches"] = [] self.parse_details["col_searches"].append({ "core_bbox": bbox, "cols_anchors": cols_anchors, "expanded_bbox": expanded_bbox }) self.table_bbox[expanded_bbox] = None # Remember what textlines we processed, and repeat for tl in tls_in_bbox: textlines_processed[tl] = None textlines = list(filter( lambda tl: tl not in textlines_processed, textlines ))