# -*- coding: utf-8 -*- from __future__ import division import numpy as np import copy import warnings from .base import BaseParser from ..utils import ( bbox_from_str, text_in_bbox, text_in_bbox_per_axis, bbox_from_text, distance_tl_to_bbox, find_columns_coordinates ) from matplotlib import patches as patches # FRHTODO: Move to utils # maximum number of columns over which a header can spread MAX_COL_SPREAD_IN_HEADER = 3 def plot_annotated_bbox(plot, bbox, text, rect_color): plot.add_patch( patches.Rectangle( (bbox[0], bbox[1]), bbox[2] - bbox[0], bbox[3] - bbox[1], color="purple", linewidth=3, fill=False ) ) plot.text( bbox[0], bbox[1], text, fontsize=12, color="black", verticalalignment="top", bbox=dict(facecolor="purple", alpha=0.5) ) def todo_move_me_expand_area_for_header(area, textlines, col_anchors, average_row_height): """The core algorithm is based on fairly strict alignment of text. It works ok for the table body, but might fail on tables' headers since they tend to be in a different font, alignment (e.g. vertical), etc. The section below tries to identify whether what's above the bbox identified so far has the characteristics of a table header: Close to the top of the body, with cells that fit within the bounds identified. """ new_area = area (left, bottom, right, top) = area zones = [] def column_spread(left, right, col_anchors): """Returns the number of columns (splits on the x-axis) crossed by an element covering left to right. """ indexLeft = 0 while indexLeft < len(col_anchors) \ and col_anchors[indexLeft] < left: indexLeft += 1 indexRight = indexLeft while indexRight < len(col_anchors) \ and col_anchors[indexRight] < right: indexRight += 1 return indexRight - indexLeft keep_searching = True while keep_searching: keep_searching = False # a/ first look for the closest text element above the area. # It will be the anchor for a possible new row. closest_above = None all_above = [] for te in textlines: # higher than the table, directly within its bounds if te.y0 > top and te.x0 > left and te.x1 < right: all_above.append(te) if closest_above is None or closest_above.y0 > te.y0: closest_above = te if closest_above and \ closest_above.y0 < top + average_row_height: # b/ We have a candidate cell that is within the correct # vertical band, and directly above the table. Starting from # this anchor, we list all the textlines within the same row. tls_in_new_row = [] top = closest_above.y1 pushed_up = True while pushed_up: pushed_up = False # Iterate and extract elements that fit in the row # from our list for i in range(len(all_above) - 1, -1, -1): te = all_above[i] if te.y0 < top: # The bottom of this element is within our row # so we add it. tls_in_new_row.append(te) all_above.pop(i) if te.y1 > top: # If the top of this element raises our row's # band, we'll need to keep on searching for # overlapping items top = te.y1 pushed_up = True # Get the x-ranges for all the textlines, and merge the # x-ranges that overlap zones = zones + \ list(map(lambda tl: [tl.x0, tl.x1], tls_in_new_row)) zones.sort(key=lambda z: z[0]) # Sort by left coordinate # Starting from the right, if two zones overlap horizontally, # merge them merged_something = True while merged_something: merged_something = False for i in range(len(zones) - 1, 0, -1): zone_right = zones[i] zone_left = zones[i-1] if zone_left[1] >= zone_right[0]: zone_left[1] = max(zone_right[1], zone_left[1]) zones.pop(i) merged_something = True max_spread = max( list( map( lambda zone: column_spread( zone[0], zone[1], col_anchors), zones ) ) ) if max_spread <= MAX_COL_SPREAD_IN_HEADER: # Combined, the elements we've identified don't cross more # than the authorized number of columns. # We're trying to avoid # 0: # 1: # 2: # if len(zones) > TEXTEDGE_REQUIRED_ELEMENTS: new_area = (left, bottom, right, top) # At this stage we've identified a plausible row (or the # beginning of one). keep_searching = True return new_area class TextEdge2(object): """Defines a text edge coordinates relative to a left-bottom origin. (PDF coordinate space) Parameters ---------- coord : float coordinate of the text edge. Depending on the alignment it could be a vertical or horizontal coordinate. Attributes ---------- textlines: array Array of textlines that demonstrate this alignment. coord: float The coordinate aligned averaged out across textlines. """ def __init__(self, coord, textline): self.coord = coord self.textlines = [textline] def __repr__(self): text_inside = " | ".join( map(lambda x: x.get_text(), self.textlines[:2])).replace("\n", "") return f"" def register_aligned_textline(self, textline, coord): """Updates new textline to this alignment, adapting its average. """ # Increase the intersections for this segment, expand it up, # and adjust the x based on the new value self.coord = (self.coord * len(self.textlines) + coord) / \ float(len(self.textlines) + 1) self.textlines.append(textline) class Alignments(object): """Represents the number of other textlines aligned with this one across each edge. """ def __init__(self): # Vertical alignments self.left = 0 self.right = 0 self.middle = 0 # Horizontal alignments self.bottom = 0 self.top = 0 self.center = 0 def __getitem__(self, key): return getattr(self, key) def __setitem__(self, key, value): return setattr(self, key, value) def max_v(self): """Returns the maximum number of alignments along one of the vertical axis (left/right/middle). """ return max(self.left, self.right, self.middle) def max_h(self): """Returns the maximum number of alignments along one of the horizontal axis (bottom/top/center). """ return max(self.bottom, self.top, self.center) def max_v_edge_name(self): """Returns the name of the vertical edge that has the maximum number of alignments. """ return max( ["left", "right", "middle"], key=lambda edge_name: self[edge_name] ) def max_h_edge_name(self): """Returns the name of the horizontal edge that has the maximum number of alignments. """ return max( ["bottom", "top", "center"], key=lambda edge_name: self[edge_name] ) def alignment_score(self): """We define the alignment score of a textline as the product of the number of aligned elements - 1. The -1 is to avoid favoring singletons on a long line. """ return (self.max_v()-1) * (self.max_h()-1) class TextEdges2(object): """Defines a dict of vertical (top, bottom, middle) and horizontal (left, right, and middle) text alignments found on the PDF page. The dict has three keys based on the alignments, and each key's value is a list of camelot.core.TextEdge objects. """ def __init__(self): # For each possible alignment, list of tuples coordinate/textlines self._textedges = { "left": [], "right": [], "middle": [], "bottom": [], "top": [], "center": [] } # For each textline, dictionary "edge type" to # "number of textlines aligned" self._textlines_alignments = {} # Maximum number of distinct aligned elements in rows/cols self.max_rows = None self.max_cols = None @staticmethod def get_textline_coords(textline): """Calculate the coordinates of each alignment for a given textline. """ return { "left": textline.x0, "right": textline.x1, "middle": (textline.x0 + textline.x1) / 2.0, "bottom": textline.y0, "top": textline.y1, "center": (textline.y0 + textline.y1) / 2.0, } # FRHTODO: Move to utils and use generic name @staticmethod def _get_index_closest_point(coord, edge_array): """Returns the index of the closest point """ n = len(edge_array) if n == 0: return None if n == 1: return 0 left = 0 right = n - 1 mid = 0 if coord >= edge_array[n - 1].coord: return n - 1 if coord <= edge_array[0].coord: return 0 while left < right: mid = (left + right) // 2 # find the mid if coord < edge_array[mid].coord: right = mid elif coord > edge_array[mid].coord: left = mid + 1 else: return mid if edge_array[mid].coord > coord: if mid > 0 and ( coord - edge_array[mid-1].coord < edge_array[mid].coord - coord): return mid-1 elif edge_array[mid].coord < coord: if mid < n - 1 and ( edge_array[mid+1].coord - coord < coord - edge_array[mid].coord): return mid+1 return mid # def insert(self, index, textline, align): # """Adds a new text edge to the current dict. # """ # x = self.get_x_coord(textline, align) # y0 = textline.y0 # y1 = textline.y1 # te = TextEdge(x, y0, y1, align=align) # self._textedges[align].insert(index, te) def _register_textline(self, textline): """Updates an existing text edge in the current dict. """ coords = TextEdges2.get_textline_coords(textline) for alignment in self._textedges: edge_array = self._textedges[alignment] coord = coords[alignment] # Find the index of the closest existing element (or 0 if none) idx_closest = self._get_index_closest_point(coord, edge_array) # Check if the edges before/after are close enough # that it can be considered aligned idx_insert = None if idx_closest is None: idx_insert = 0 elif np.isclose(edge_array[idx_closest].coord, coord, atol=0.5): closest_edge = edge_array[idx_closest] closest_edge.register_aligned_textline(textline, coord) elif edge_array[idx_closest].coord < coord: idx_insert = idx_closest + 1 else: idx_insert = idx_closest if idx_insert is not None: new_edge = TextEdge2(coord, textline) edge_array.insert(idx_insert, new_edge) def _register_all_text_lines(self, textlines): """Add all textlines to our edge repository to identify alignments. """ # Identify all the edge alignments for tl in textlines: if len(tl.get_text().strip()) > 0: self._register_textline(tl) def _compute_alignment_counts(self): """Build a dictionary textline -> alignment object. """ # for edge_name, textedges in self._textedges.items(): for textedge in textedges: for textline in textedge.textlines: textline_alignments = self._textlines_alignments.get( textline, None) if textline_alignments is None: alignments = Alignments() alignments[edge_name] = len(textedge.textlines) self._textlines_alignments[textline] = alignments else: textline_alignments[edge_name] = len( textedge.textlines) # Finally calculate the overall maximum number of rows/cols self.max_rows = max( map( lambda alignments: alignments.max_h(), self._textlines_alignments.values() ), default=0 ) self.max_cols = max( map( lambda alignments: alignments.max_v(), self._textlines_alignments.values() ), default=0 ) def _calculate_gaps_thresholds(self, percentile=75): """Identify reasonable gaps between lines and columns based on gaps observed across alignments. This can be used to reject cells as too far away from the core table. """ h_gaps, v_gaps = [], [] for edge_name in self._textedges: edge_array = self._textedges[edge_name] gaps = [] vertical = edge_name in ["left", "right", "middle"] sort_function = (lambda tl: tl.y0) \ if vertical \ else (lambda tl: tl.x0) for alignments in edge_array: tls = sorted( alignments.textlines, key=sort_function, reverse=True ) for i in range(1, len(tls)): # If the lines are vertically aligned (stacked up), we # record the vertical gap between them if vertical: gap = tls[i-1].y1 - tls[i].y0 else: gap = tls[i-1].x1 - tls[i].x0 gaps.append(gap) if gaps: if vertical: v_gaps.append(np.percentile(gaps, percentile)) else: h_gaps.append(np.percentile(gaps, percentile)) direction_str = 'vertical' if vertical else 'horizontal' rounded_gaps = list(map(lambda x: round(x, 2), gaps)) print( f"{direction_str} gaps found " f"for {edge_name}: " f"{rounded_gaps} " f"with {percentile}th percentile " f"{np.percentile(gaps, percentile)}" ) return max(h_gaps, default=None), max(v_gaps, default=None) def _remove_unconnected_edges(self): """Weed out elements which are only connected to others vertically or horizontally. There needs to be connections across both dimensions. """ removed_singletons = True while removed_singletons: removed_singletons = False for edge_type in self._textedges: # For each alignment edge, remove items if they are singletons # either horizontally or vertically for te in self._textedges[edge_type]: for i in range(len(te.textlines) - 1, -1, -1): tl = te.textlines[i] alignments = self._textlines_alignments[tl] if alignments.max_h() <= 1 or alignments.max_v() <= 1: del te.textlines[i] removed_singletons = True self._textlines_alignments = {} self._compute_alignment_counts() def _build_bbox_candidate(self, debug_info=None): """ Seed the process with the textline with the highest alignment score, then expand the bbox with textlines within threshold. Parameters ---------- debug_info : array Optional parameter array, in which to store extra information to help later visualization of the table creation. """ if self.max_rows <= 1 or self.max_cols <= 1: return None tls_search_space = list(self._textlines_alignments.keys()) def get_best_textline(textlines): # Find the textline with the highest alignment score return max( textlines, key=lambda textline: self._textlines_alignments[textline].alignment_score(), default=None ) # First, determine the textline that has the most combined alignments # across horizontal and vertical axis. # It will serve both as a starting point for the table boundary search, # and as a way to estimate the average spacing between rows/cols. most_aligned_tl = get_best_textline(tls_search_space) most_aligned_coords = TextEdges2.get_textline_coords(most_aligned_tl) # Retrieve the list of textlines it's aligned with, across both axis best_alignment = self._textlines_alignments[most_aligned_tl] ref_h_edge_name = best_alignment.max_h_edge_name() ref_v_edge_name = best_alignment.max_v_edge_name() best_h_textedges = self._textedges[ref_h_edge_name] best_v_textedges = self._textedges[ref_v_edge_name] h_coord = most_aligned_coords[ref_h_edge_name] v_coord = most_aligned_coords[ref_v_edge_name] h_textlines = sorted( best_h_textedges[ TextEdges2._get_index_closest_point( h_coord, best_h_textedges ) ].textlines, key=lambda tl: tl.x0, reverse=True ) v_textlines = sorted( best_v_textedges[ TextEdges2._get_index_closest_point( v_coord, best_v_textedges ) ].textlines, key=lambda tl: tl.y0, reverse=True ) h_gaps, v_gaps = [], [] for i in range(1, len(v_textlines)): v_gaps.append(v_textlines[i-1].y0 - v_textlines[i].y0) for i in range(1, len(h_textlines)): h_gaps.append(h_textlines[i-1].x0 - h_textlines[i].x0) if (not h_gaps or not v_gaps): return None percentile = 75 gaps_hv = ( np.percentile(h_gaps, percentile), np.percentile(v_gaps, percentile) ) # Calculate the 75th percentile of the horizontal/vertical # gaps between textlines. Use this as a reference for a threshold # to not exceed while looking for table boundaries. # FRHTODO: Clean this up # gaps_hv = self._calculate_gaps_thresholds(75) # if (gaps_hv[0] is None or gaps_hv[1] is None): # return None max_h_gap, max_v_gap = gaps_hv[0] * 3, gaps_hv[1] * 3 if debug_info is not None: # Store debug info debug_info_search = { "max_h_gap": max_h_gap, "max_v_gap": max_v_gap, "iterations": [] } debug_info.append(debug_info_search) else: debug_info_search = None MINIMUM_TEXTLINES_IN_TABLE = 6 bbox = (most_aligned_tl.x0, most_aligned_tl.y0, most_aligned_tl.x1, most_aligned_tl.y1) tls_search_space.remove(most_aligned_tl) tls_in_bbox = [most_aligned_tl] last_bbox = None while last_bbox != bbox: if debug_info_search is not None: # Store debug info debug_info_search["iterations"].append(bbox) last_bbox = bbox # Go through all remaining textlines, expand our bbox # if a textline is within our proximity tolerance for i in range(len(tls_search_space) - 1, -1, -1): tl = tls_search_space[i] h_distance, v_distance = distance_tl_to_bbox(tl, bbox) # Move textline to our bbox and expand the bbox accordingly # if the textline is close. if h_distance < max_h_gap and v_distance < max_v_gap: tls_in_bbox.append(tl) bbox = ( min(bbox[0], tl.x0), min(bbox[1], tl.y0), max(bbox[2], tl.x1), max(bbox[3], tl.y1) ) del tls_search_space[i] if len(tls_in_bbox) > MINIMUM_TEXTLINES_IN_TABLE: return bbox else: print(f"Only {len(tls_in_bbox)}, that's not enough.") return None def generate(self, textlines): """Generate the text edge dictionaries based on the input textlines. """ self._register_all_text_lines(textlines) self._compute_alignment_counts() def plot_alignments(self, ax): """Displays a visualization of the alignments as currently computed. """ # FRHTODO: This is too busy and doesn't plot lines most_aligned_tl = sorted( self._textlines_alignments.keys(), key=lambda textline: self._textlines_alignments[textline].alignment_score(), reverse=True )[0] ax.add_patch( patches.Rectangle( (most_aligned_tl.x0, most_aligned_tl.y0), most_aligned_tl.x1 - most_aligned_tl.x0, most_aligned_tl.y1 - most_aligned_tl.y0, color="red", alpha=0.5 ) ) for tl, alignments in self._textlines_alignments.items(): ax.text( tl.x0 - 5, tl.y0 - 5, f"{alignments.max_h()}x{alignments.max_v()}", fontsize=5, color="black" ) def plotFRHTableSearch(self, plot, debug_info): if debug_info is None: return # Display a bbox per region for region_str in debug_info["table_regions"] or []: plot_annotated_bbox( plot, bbox_from_str(region_str), "region: ({region_str})".format(region_str=region_str), "purple" ) # Display a bbox per area for area_str in debug_info["table_areas"] or []: plot_annotated_bbox( plot, bbox_from_str(area_str), "area: ({area_str})".format(area_str=area_str), "pink" ) for box_id, bbox_search in enumerate(debug_info["bboxes_searches"]): max_h_gap = bbox_search["max_h_gap"] max_v_gap = bbox_search["max_v_gap"] iterations = bbox_search["iterations"] for iteration, bbox in enumerate(iterations): final = iteration == len(iterations) - 1 plot.add_patch( patches.Rectangle( (bbox[0], bbox[1]), bbox[2] - bbox[0], bbox[3] - bbox[1], color="red", linewidth=5 if final else 2, fill=False ) ) plot.text( bbox[0], bbox[1], f"box #{box_id+1} / iter #{iteration}", fontsize=12, color="black", verticalalignment="top", bbox=dict(facecolor="orange", alpha=0.5) ) plot.add_patch( patches.Rectangle( (bbox[0]-max_h_gap, bbox[1]-max_v_gap), bbox[2] - bbox[0] + 2 * max_h_gap, bbox[3] - bbox[1] + 2 * max_v_gap, color="orange", fill=False ) ) class Hybrid(BaseParser): """Hybrid method of parsing looks for spaces between text to parse the table. If you want to specify columns when specifying multiple table areas, make sure that the length of both lists are equal. Parameters ---------- table_regions : list, optional (default: None) List of page regions that may contain tables of the form x1,y1,x2,y2 where (x1, y1) -> left-top and (x2, y2) -> right-bottom in PDF coordinate space. table_areas : list, optional (default: None) List of table area strings of the form x1,y1,x2,y2 where (x1, y1) -> left-top and (x2, y2) -> right-bottom in PDF coordinate space. columns : list, optional (default: None) List of column x-coordinates strings where the coordinates are comma-separated. split_text : bool, optional (default: False) Split text that spans across multiple cells. flag_size : bool, optional (default: False) Flag text based on font size. Useful to detect super/subscripts. Adds around flagged text. strip_text : str, optional (default: '') Characters that should be stripped from a string before assigning it to a cell. edge_tol : int, optional (default: 50) Tolerance parameter for extending textedges vertically. row_tol : int, optional (default: 2) Tolerance parameter used to combine text vertically, to generate rows. column_tol : int, optional (default: 0) Tolerance parameter used to combine text horizontally, to generate columns. """ def __init__( self, table_regions=None, table_areas=None, columns=None, flag_size=False, split_text=False, strip_text="", edge_tol=50, row_tol=2, column_tol=0, debug=False, **kwargs ): super().__init__( "hybrid", table_regions=table_regions, table_areas=table_areas, split_text=split_text, strip_text=strip_text, flag_size=flag_size, debug=debug ) self.columns = columns self._validate_columns() self.edge_tol = edge_tol self.row_tol = row_tol self.column_tol = column_tol # FRHTODO: Check if needed, refactor with Stream @staticmethod def _text_bbox(t_bbox): """Returns bounding box for the text present on a page. Parameters ---------- t_bbox : dict Dict with two keys 'horizontal' and 'vertical' with lists of LTTextLineHorizontals and LTTextLineVerticals respectively. Returns ------- text_bbox : tuple Tuple (x0, y0, x1, y1) in pdf coordinate space. """ xmin = min(t.x0 for direction in t_bbox for t in t_bbox[direction]) ymin = min(t.y0 for direction in t_bbox for t in t_bbox[direction]) xmax = max(t.x1 for direction in t_bbox for t in t_bbox[direction]) ymax = max(t.y1 for direction in t_bbox for t in t_bbox[direction]) text_bbox = (xmin, ymin, xmax, ymax) return text_bbox # FRHTODO: Check if needed, refactor with Stream @staticmethod def _group_rows(text, row_tol=2): """Groups PDFMiner text objects into rows vertically within a tolerance. Parameters ---------- text : list List of PDFMiner text objects. row_tol : int, optional (default: 2) Returns ------- rows : list Two-dimensional list of text objects grouped into rows. """ row_y = None rows = [] temp = [] non_empty_text = [t for t in text if t.get_text().strip()] for t in non_empty_text: # is checking for upright necessary? # if t.get_text().strip() and all([obj.upright \ # for obj in t._objs # if type(obj) is LTChar]): if row_y is None: row_y = t.y0 elif not np.isclose(row_y, t.y0, atol=row_tol): rows.append(sorted(temp, key=lambda t: t.x0)) temp = [] # We update the row's bottom as we go, to be forgiving if there # is a gradual change across multiple columns. row_y = t.y0 temp.append(t) rows.append(sorted(temp, key=lambda t: t.x0)) return rows # FRHTODO: Check if needed, refactor with Stream @staticmethod def _merge_columns(l, column_tol=0): """Merges column boundaries horizontally if they overlap or lie within a tolerance. Parameters ---------- l : list List of column x-coordinate tuples. column_tol : int, optional (default: 0) Returns ------- merged : list List of merged column x-coordinate tuples. """ merged = [] for higher in l: if not merged: merged.append(higher) else: lower = merged[-1] if column_tol >= 0: if higher[0] <= lower[1] or np.isclose( higher[0], lower[1], atol=column_tol ): upper_bound = max(lower[1], higher[1]) lower_bound = min(lower[0], higher[0]) merged[-1] = (lower_bound, upper_bound) else: merged.append(higher) elif column_tol < 0: if higher[0] <= lower[1]: if np.isclose(higher[0], lower[1], atol=abs(column_tol)): merged.append(higher) else: upper_bound = max(lower[1], higher[1]) lower_bound = min(lower[0], higher[0]) merged[-1] = (lower_bound, upper_bound) else: merged.append(higher) return merged # FRHTODO: Check if needed, refactor with Stream @staticmethod def _join_rows(rows_grouped, text_y_max, text_y_min): """Makes row coordinates continuous. For the row to "touch" we split the existing gap between them in half. Parameters ---------- rows_grouped : list Two-dimensional list of text objects grouped into rows. text_y_max : int text_y_min : int Returns ------- rows : list List of continuous row y-coordinate tuples. """ row_boundaries = [ [ max(t.y1 for t in r), min(t.y0 for t in r) ] for r in rows_grouped ] for i in range(0, len(row_boundaries)-1): top_row = row_boundaries[i] bottom_row = row_boundaries[i+1] top_row[1] = bottom_row[0] = (top_row[1] + bottom_row[0]) / 2 row_boundaries[0][0] = text_y_max row_boundaries[-1][1] = text_y_min return row_boundaries # FRHTODO: Check if needed, refactor with Stream @staticmethod def _add_columns(cols, text, row_tol): """Adds columns to existing list by taking into account the text that lies outside the current column x-coordinates. Parameters ---------- cols : list List of column x-coordinate tuples. text : list List of PDFMiner text objects. ytol : int Returns ------- cols : list Updated list of column x-coordinate tuples. """ if text: text = Hybrid._group_rows(text, row_tol=row_tol) elements = [len(r) for r in text] new_cols = [ (t.x0, t.x1) for r in text if len(r) == max(elements) for t in r ] cols.extend(Hybrid._merge_columns(sorted(new_cols))) return cols # FRHTODO: Check if needed, refactor with Stream @staticmethod def _join_columns(cols, text_x_min, text_x_max): """Makes column coordinates continuous. Parameters ---------- cols : list List of column x-coordinate tuples. text_x_min : int text_y_max : int Returns ------- cols : list Updated list of column x-coordinate tuples. """ cols = sorted(cols) cols = [(cols[i][0] + cols[i - 1][1]) / 2 for i in range(1, len(cols))] cols.insert(0, text_x_min) cols.append(text_x_max) cols = [(cols[i], cols[i + 1]) for i in range(0, len(cols) - 1)] return cols # FRHTODO: Check is needed, refactor with Stream def _validate_columns(self): if self.table_areas is not None and self.columns is not None: if len(self.table_areas) != len(self.columns): raise ValueError("Length of table_areas and columns" " should be equal") def _generate_table_bbox(self): if self.table_areas is not None: table_bbox = {} for area_str in self.table_areas: table_bbox[bbox_from_str(area_str)] = None self.table_bbox = table_bbox return all_textlines = self.horizontal_text + self.vertical_text textlines = self._apply_regions_filter(all_textlines) textlines_processed = {} self.table_bbox = {} if self.debug_info is not None: debug_info_edges_searches = [] self.debug_info["edges_searches"] = debug_info_edges_searches debug_info_bboxes_searches = [] self.debug_info["bboxes_searches"] = debug_info_bboxes_searches else: debug_info_edges_searches = None debug_info_bboxes_searches = None while True: self.textedges = TextEdges2() self.textedges.generate(textlines) self.textedges._remove_unconnected_edges() if debug_info_edges_searches is not None: # Preserve the current edge calculation for display debugging debug_info_edges_searches.append( copy.deepcopy(self.textedges) ) bbox = self.textedges._build_bbox_candidate( debug_info_bboxes_searches ) if bbox is None: break # Get all the textlines that are at least 50% in the box tls_in_bbox = text_in_bbox(bbox, textlines) # and expand the text box to fully contain them bbox = bbox_from_text(tls_in_bbox) # FRH: do we need to repeat this? # tls_in_bbox = text_in_bbox(bbox, textlines) cols_anchors = find_columns_coordinates(tls_in_bbox) # Apply a heuristic to salvage headers which formatting might be # off compared to the rest of the table. # Calculate the average height of each textline # FRHTODO: reuse the gap threshold from earlier? alignments = self.textedges._textlines_alignments.keys() average_tl_height = sum( map( lambda tl: tl.y1 - tl.y0, alignments )) / len(alignments) expanded_bbox = todo_move_me_expand_area_for_header( bbox, textlines, cols_anchors, average_tl_height ) if self.debug_info is not None: if "col_searches" not in self.debug_info: self.debug_info["col_searches"] = [] self.debug_info["col_searches"].append({ "core_bbox": bbox, "cols_anchors": cols_anchors, "expanded_bbox": expanded_bbox }) self.table_bbox[expanded_bbox] = None # Remember what textlines we processed, and repeat for tl in tls_in_bbox: textlines_processed[tl] = None textlines = list(filter( lambda tl: tl not in textlines_processed, textlines )) # FRHTODO: Check is needed, refactor with Stream def _generate_columns_and_rows(self, table_idx, tk): # select elements which lie within table_bbox self.t_bbox = text_in_bbox_per_axis( tk, self.horizontal_text, self.vertical_text ) text_x_min, text_y_min, text_x_max, text_y_max = \ self._text_bbox(self.t_bbox) rows_grouped = self._group_rows( self.t_bbox["horizontal"], row_tol=self.row_tol) rows = self._join_rows(rows_grouped, text_y_max, text_y_min) elements = [len(r) for r in rows_grouped] if self.columns is not None and self.columns[table_idx] != "": # user has to input boundary columns too # take (0, pdf_width) by default # similar to else condition # len can't be 1 cols = self.columns[table_idx].split(",") cols = [float(c) for c in cols] cols.insert(0, text_x_min) cols.append(text_x_max) cols = [(cols[i], cols[i + 1]) for i in range(0, len(cols) - 1)] else: # calculate mode of the list of number of elements in # each row to guess the number of columns ncols = max(set(elements), key=elements.count) if ncols == 1: # if mode is 1, the page usually contains not tables # but there can be cases where the list can be skewed, # try to remove all 1s from list in this case and # see if the list contains elements, if yes, then use # the mode after removing 1s elements = list(filter(lambda x: x != 1, elements)) if elements: ncols = max(set(elements), key=elements.count) else: warnings.warn( "No tables found in table area {}" .format(table_idx + 1) ) cols = [ (t.x0, t.x1) for r in rows_grouped if len(r) == ncols for t in r ] cols = self._merge_columns( sorted(cols), column_tol=self.column_tol ) inner_text = [] for i in range(1, len(cols)): left = cols[i - 1][1] right = cols[i][0] inner_text.extend( [ t for direction in self.t_bbox for t in self.t_bbox[direction] if t.x0 > left and t.x1 < right ] ) outer_text = [ t for direction in self.t_bbox for t in self.t_bbox[direction] if t.x0 > cols[-1][1] or t.x1 < cols[0][0] ] inner_text.extend(outer_text) cols = self._add_columns(cols, inner_text, self.row_tol) cols = self._join_columns(cols, text_x_min, text_x_max) return cols, rows # FRHTODO: Check is needed, refactor with Stream def _generate_table(self, table_idx, cols, rows, **kwargs): table = self._initialize_new_table(table_idx, cols, rows) table = table.set_all_edges() table.record_parse_metadata(self) # for plotting table._bbox = self.table_bbox table._segments = None table._textedges = self.textedges return table def extract_tables(self): if self._document_has_no_text(): return [] # Identify plausible areas within the doc where tables lie, # populate table_bbox keys with these areas. self._generate_table_bbox() _tables = [] # sort tables based on y-coord for table_idx, bbox in enumerate( sorted(self.table_bbox.keys(), key=lambda x: x[1], reverse=True) ): cols, rows = self._generate_columns_and_rows(table_idx, bbox) table = self._generate_table(table_idx, cols, rows) table._bbox = bbox _tables.append(table) return _tables