# -*- coding: utf-8 -*- import os import sqlite3 import zipfile import tempfile from itertools import chain from operator import itemgetter import numpy as np import pandas as pd from cv2 import cv2 from .utils import ( build_file_path_in_temp_dir, compute_accuracy, compute_whitespace, export_pdf_as_png ) # minimum number of vertical textline intersections for a textedge # to be considered valid TEXTEDGE_REQUIRED_ELEMENTS = 4 # padding added to table area on the left, right and bottom TABLE_AREA_PADDING = 10 class TextEdge(object): """Defines a text edge coordinates relative to a left-bottom origin. (PDF coordinate space) Parameters ---------- x : float x-coordinate of the text edge. y0 : float y-coordinate of bottommost point. y1 : float y-coordinate of topmost point. align : string, optional (default: 'left') {'left', 'right', 'middle'} Attributes ---------- intersections: int Number of intersections with horizontal text rows. is_valid: bool A text edge is valid if it intersects with at least TEXTEDGE_REQUIRED_ELEMENTS horizontal text rows. """ def __init__(self, x, y0, y1, align="left"): self.x = x self.y0 = y0 self.y1 = y1 self.align = align self.intersections = 0 self.is_valid = False def __repr__(self): return "".format( round(self.x, 2), round(self.y0, 2), round(self.y1, 2), self.align, self.is_valid, ) def update_coords(self, x, y0, edge_tol=50): """Updates the text edge's x and bottom y coordinates and sets the is_valid attribute. """ if np.isclose(self.y0, y0, atol=edge_tol): self.x = (self.intersections * self.x + x) / \ float(self.intersections + 1) self.y0 = y0 self.intersections += 1 # a textedge is valid only if it extends uninterrupted # over a required number of textlines if self.intersections > TEXTEDGE_REQUIRED_ELEMENTS: self.is_valid = True class TextEdges(object): """Defines a dict of left, right and middle text edges found on the PDF page. The dict has three keys based on the alignments, and each key's value is a list of camelot.core.TextEdge objects. """ def __init__(self, edge_tol=50): self.edge_tol = edge_tol self._textedges = {"left": [], "right": [], "middle": []} @staticmethod def get_x_coord(textline, align): """Returns the x coordinate of a text row based on the specified alignment. """ x_left = textline.x0 x_right = textline.x1 x_middle = x_left + (x_right - x_left) / 2.0 x_coord = {"left": x_left, "middle": x_middle, "right": x_right} return x_coord[align] def find(self, x_coord, align): """Returns the index of an existing text edge using the specified x coordinate and alignment. """ for i, te in enumerate(self._textedges[align]): if np.isclose(te.x, x_coord, atol=0.5): return i return None def add(self, textline, align): """Adds a new text edge to the current dict. """ x = self.get_x_coord(textline, align) y0 = textline.y0 y1 = textline.y1 te = TextEdge(x, y0, y1, align=align) self._textedges[align].append(te) def update(self, textline): """Updates an existing text edge in the current dict. """ for align in ["left", "right", "middle"]: x_coord = self.get_x_coord(textline, align) idx = self.find(x_coord, align) if idx is None: self.add(textline, align) else: self._textedges[align][idx].update_coords( x_coord, textline.y0, edge_tol=self.edge_tol ) def generate(self, textlines): """Generates the text edges dict based on horizontal text rows. """ for tl in textlines: if len(tl.get_text().strip()) > 1: # TODO: hacky self.update(tl) def get_relevant(self): """Returns the list of relevant text edges (all share the same alignment) based on which list intersects horizontal text rows the most. """ intersections_sum = { "left": sum( te.intersections for te in self._textedges["left"] if te.is_valid ), "right": sum( te.intersections for te in self._textedges["right"] if te.is_valid ), "middle": sum( te.intersections for te in self._textedges["middle"] if te.is_valid ), } # TODO: naive # get vertical textedges that intersect maximum number of # times with horizontal textlines relevant_align = max(intersections_sum.items(), key=itemgetter(1))[0] return list(filter( lambda te: te.is_valid, self._textedges[relevant_align]) ) def get_table_areas(self, textlines, relevant_textedges): """Returns a dict of interesting table areas on the PDF page calculated using relevant text edges. """ def pad(area, average_row_height): x0 = area[0] - TABLE_AREA_PADDING y0 = area[1] - TABLE_AREA_PADDING x1 = area[2] + TABLE_AREA_PADDING # add a constant since table headers can be relatively up y1 = area[3] + average_row_height * 5 return (x0, y0, x1, y1) # sort relevant textedges in reading order relevant_textedges.sort(key=lambda te: (-te.y0, te.x)) table_areas = {} for te in relevant_textedges: if not table_areas: table_areas[(te.x, te.y0, te.x, te.y1)] = None else: found = None for area in table_areas: # check for overlap if te.y1 >= area[1] and te.y0 <= area[3]: found = area break if found is None: table_areas[(te.x, te.y0, te.x, te.y1)] = None else: table_areas.pop(found) updated_area = ( found[0], min(te.y0, found[1]), max(found[2], te.x), max(found[3], te.y1), ) table_areas[updated_area] = None # extend table areas based on textlines that overlap # vertically. it's possible that these textlines were # eliminated during textedges generation since numbers and # chars/words/sentences are often aligned differently. # drawback: table areas that have paragraphs on their sides # will include the paragraphs too. sum_textline_height = 0 for tl in textlines: sum_textline_height += tl.y1 - tl.y0 found = None for area in table_areas: # check for overlap if tl.y0 >= area[1] and tl.y1 <= area[3]: found = area break if found is not None: table_areas.pop(found) updated_area = ( min(tl.x0, found[0]), min(tl.y0, found[1]), max(found[2], tl.x1), max(found[3], tl.y1), ) table_areas[updated_area] = None average_textline_height = sum_textline_height / \ float(len(textlines)) # add some padding to table areas table_areas_padded = {} for area in table_areas: table_areas_padded[pad(area, average_textline_height)] = None return table_areas_padded class Cell(object): """Defines a cell in a table with coordinates relative to a left-bottom origin. (PDF coordinate space) Parameters ---------- x1 : float x-coordinate of left-bottom point. y1 : float y-coordinate of left-bottom point. x2 : float x-coordinate of right-top point. y2 : float y-coordinate of right-top point. Attributes ---------- lb : tuple Tuple representing left-bottom coordinates. lt : tuple Tuple representing left-top coordinates. rb : tuple Tuple representing right-bottom coordinates. rt : tuple Tuple representing right-top coordinates. left : bool Whether or not cell is bounded on the left. right : bool Whether or not cell is bounded on the right. top : bool Whether or not cell is bounded on the top. bottom : bool Whether or not cell is bounded on the bottom. hspan : bool Whether or not cell spans horizontally. vspan : bool Whether or not cell spans vertically. text : string Text assigned to cell. """ def __init__(self, x1, y1, x2, y2): self.x1 = x1 self.y1 = y1 self.x2 = x2 self.y2 = y2 self.lb = (x1, y1) self.lt = (x1, y2) self.rb = (x2, y1) self.rt = (x2, y2) self.left = False self.right = False self.top = False self.bottom = False self.hspan = False self.vspan = False self._text = "" def __repr__(self): return "".format( round(self.x1, 2), round(self.y1, 2), round(self.x2, 2), round(self.y2, 2) ) @property def text(self): return self._text @text.setter def text(self, t): self._text = "".join([self._text, t]) @property def bound(self): """The number of sides on which the cell is bounded. """ return self.top + self.bottom + self.left + self.right class Table(object): """Defines a table with coordinates relative to a left-bottom origin. (PDF coordinate space) Parameters ---------- cols : list List of tuples representing column x-coordinates in increasing order. rows : list List of tuples representing row y-coordinates in decreasing order. Attributes ---------- df : :class:`pandas.DataFrame` shape : tuple Shape of the table. accuracy : float Accuracy with which text was assigned to the cell. whitespace : float Percentage of whitespace in the table. filename : str Path of the original PDF order : int Table number on PDF page. page : int PDF page number. """ def __init__(self, cols, rows): self.cols = cols self.rows = rows self.cells = [ [Cell(c[0], r[1], c[1], r[0]) for c in cols] for r in rows ] self.df = None self.shape = (0, 0) self.accuracy = 0 self.whitespace = 0 self.filename = None self.order = None self.page = None self.flavor = None # Flavor of the parser that generated the table self.pdf_size = None # Dimensions of the original PDF page self.debug_info = None # Field holding debug data self._image = None self._image_path = None # Temporary file to hold an image of the pdf def __repr__(self): return "<{} shape={}>".format(self.__class__.__name__, self.shape) def __lt__(self, other): if self.page == other.page: if self.order < other.order: return True if self.page < other.page: return True @property def data(self): """Returns two-dimensional list of strings in table. """ d = [] for row in self.cells: d.append([cell.text.strip() for cell in row]) return d @property def parsing_report(self): """Returns a parsing report with %accuracy, %whitespace, table number on page and page number. """ # pretty? report = { "accuracy": round(self.accuracy, 2), "whitespace": round(self.whitespace, 2), "order": self.order, "page": self.page, } return report def record_parse_metadata(self, parser): """Record data about the origin of the table """ self.flavor = parser.id self.filename = parser.filename self.debug_info = parser.debug_info data = self.data self.df = pd.DataFrame(data) self.shape = self.df.shape self.whitespace = compute_whitespace(data) self.pdf_size = (parser.pdf_width, parser.pdf_height) def get_pdf_image(self): """Compute pdf image and cache it """ if self._image is None: if self._image_path is None: self._image_path = build_file_path_in_temp_dir( os.path.basename(self.filename), ".png" ) export_pdf_as_png(self.filename, self._image_path) self._image = cv2.imread(self._image_path) return self._image def set_all_edges(self): """Sets all table edges to True. """ for row in self.cells: for cell in row: cell.left = cell.right = cell.top = cell.bottom = True return self def set_edges(self, vertical, horizontal, joint_tol=2): """Sets a cell's edges to True depending on whether the cell's coordinates overlap with the line's coordinates within a tolerance. Parameters ---------- vertical : list List of detected vertical lines. horizontal : list List of detected horizontal lines. """ for v in vertical: # find closest x coord # iterate over y coords and find closest start and end points i = [ i for i, t in enumerate(self.cols) if np.isclose(v[0], t[0], atol=joint_tol) ] j = [ j for j, t in enumerate(self.rows) if np.isclose(v[3], t[0], atol=joint_tol) ] k = [ k for k, t in enumerate(self.rows) if np.isclose(v[1], t[0], atol=joint_tol) ] if not j: continue J = j[0] if i == [0]: # only left edge L = i[0] if k: K = k[0] while J < K: self.cells[J][L].left = True J += 1 else: K = len(self.rows) while J < K: self.cells[J][L].left = True J += 1 elif i == []: # only right edge L = len(self.cols) - 1 if k: K = k[0] while J < K: self.cells[J][L].right = True J += 1 else: K = len(self.rows) while J < K: self.cells[J][L].right = True J += 1 else: # both left and right edges L = i[0] if k: K = k[0] while J < K: self.cells[J][L].left = True self.cells[J][L - 1].right = True J += 1 else: K = len(self.rows) while J < K: self.cells[J][L].left = True self.cells[J][L - 1].right = True J += 1 for h in horizontal: # find closest y coord # iterate over x coords and find closest start and end points i = [ i for i, t in enumerate(self.rows) if np.isclose(h[1], t[0], atol=joint_tol) ] j = [ j for j, t in enumerate(self.cols) if np.isclose(h[0], t[0], atol=joint_tol) ] k = [ k for k, t in enumerate(self.cols) if np.isclose(h[2], t[0], atol=joint_tol) ] if not j: continue J = j[0] if i == [0]: # only top edge L = i[0] if k: K = k[0] while J < K: self.cells[L][J].top = True J += 1 else: K = len(self.cols) while J < K: self.cells[L][J].top = True J += 1 elif i == []: # only bottom edge L = len(self.rows) - 1 if k: K = k[0] while J < K: self.cells[L][J].bottom = True J += 1 else: K = len(self.cols) while J < K: self.cells[L][J].bottom = True J += 1 else: # both top and bottom edges L = i[0] if k: K = k[0] while J < K: self.cells[L][J].top = True self.cells[L - 1][J].bottom = True J += 1 else: K = len(self.cols) while J < K: self.cells[L][J].top = True self.cells[L - 1][J].bottom = True J += 1 return self def set_border(self): """Sets table border edges to True. """ for r in range(len(self.rows)): self.cells[r][0].left = True self.cells[r][len(self.cols) - 1].right = True for c in range(len(self.cols)): self.cells[0][c].top = True self.cells[len(self.rows) - 1][c].bottom = True return self def set_span(self): """Sets a cell's hspan or vspan attribute to True depending on whether the cell spans horizontally or vertically. """ for row in self.cells: for cell in row: left = cell.left right = cell.right top = cell.top bottom = cell.bottom if cell.bound == 4: continue elif cell.bound == 3: if not left and (right and top and bottom): cell.hspan = True elif not right and (left and top and bottom): cell.hspan = True elif not top and (left and right and bottom): cell.vspan = True elif not bottom and (left and right and top): cell.vspan = True elif cell.bound == 2: if left and right and (not top and not bottom): cell.vspan = True elif top and bottom and (not left and not right): cell.hspan = True elif cell.bound in [0, 1]: cell.vspan = True cell.hspan = True return self def to_csv(self, path, **kwargs): """Writes Table to a comma-separated values (csv) file. For kwargs, check :meth:`pandas.DataFrame.to_csv`. Parameters ---------- path : str Output filepath. """ kw = {"encoding": "utf-8", "index": False, "header": False, "quoting": 1} kw.update(kwargs) self.df.to_csv(path, **kw) def to_json(self, path, **kwargs): """Writes Table to a JSON file. For kwargs, check :meth:`pandas.DataFrame.to_json`. Parameters ---------- path : str Output filepath. """ kw = {"orient": "records"} kw.update(kwargs) json_string = self.df.to_json(**kw) with open(path, "w") as f: f.write(json_string) def to_excel(self, path, **kwargs): """Writes Table to an Excel file. For kwargs, check :meth:`pandas.DataFrame.to_excel`. Parameters ---------- path : str Output filepath. """ kw = { "sheet_name": "page-{}-table-{}".format(self.page, self.order), "encoding": "utf-8", } kw.update(kwargs) # pylint: disable=abstract-class-instantiated writer = pd.ExcelWriter(path) self.df.to_excel(writer, **kw) writer.save() def to_html(self, path, **kwargs): """Writes Table to an HTML file. For kwargs, check :meth:`pandas.DataFrame.to_html`. Parameters ---------- path : str Output filepath. """ html_string = self.df.to_html(**kwargs) with open(path, "w") as f: f.write(html_string) def to_sqlite(self, path, **kwargs): """Writes Table to sqlite database. For kwargs, check :meth:`pandas.DataFrame.to_sql`. Parameters ---------- path : str Output filepath. """ kw = {"if_exists": "replace", "index": False} kw.update(kwargs) conn = sqlite3.connect(path) table_name = "page-{}-table-{}".format(self.page, self.order) self.df.to_sql(table_name, conn, **kw) conn.commit() conn.close() class TableList(object): """Defines a list of camelot.core.Table objects. Each table can be accessed using its index. Attributes ---------- n : int Number of tables in the list. """ def __init__(self, tables): self._tables = tables def __repr__(self): return "<{} n={}>".format(self.__class__.__name__, self.n) def __len__(self): return len(self._tables) def __getitem__(self, idx): return self._tables[idx] @staticmethod def _format_func(table, f): return getattr(table, "to_{}".format(f)) @property def n(self): return len(self) def _write_file(self, f=None, **kwargs): dirname = kwargs.get("dirname") root = kwargs.get("root") ext = kwargs.get("ext") for table in self._tables: filename = os.path.join( "{}-page-{}-table-{}{}".format(root, table.page, table.order, ext) ) filepath = os.path.join(dirname, filename) to_format = self._format_func(table, f) to_format(filepath) def _compress_dir(self, **kwargs): path = kwargs.get("path") dirname = kwargs.get("dirname") root = kwargs.get("root") ext = kwargs.get("ext") zipname = os.path.join(os.path.dirname(path), root) + ".zip" with zipfile.ZipFile(zipname, "w", allowZip64=True) as z: for table in self._tables: filename = os.path.join( "{}-page-{}-table-{}{}".format(root, table.page, table.order, ext) ) filepath = os.path.join(dirname, filename) z.write(filepath, os.path.basename(filepath)) def export(self, path, f="csv", compress=False): """Exports the list of tables to specified file format. Parameters ---------- path : str Output filepath. f : str File format. Can be csv, json, excel, html and sqlite. compress : bool Whether or not to add files to a ZIP archive. """ dirname = os.path.dirname(path) basename = os.path.basename(path) root, ext = os.path.splitext(basename) if compress: dirname = tempfile.mkdtemp() kwargs = {"path": path, "dirname": dirname, "root": root, "ext": ext} if f in ["csv", "json", "html"]: self._write_file(f=f, **kwargs) if compress: self._compress_dir(**kwargs) elif f == "excel": filepath = os.path.join(dirname, basename) # pylint: disable=abstract-class-instantiated writer = pd.ExcelWriter(filepath) for table in self._tables: sheet_name = "page-{}-table-{}".format(table.page, table.order) table.df.to_excel(writer, sheet_name=sheet_name, encoding="utf-8") writer.save() if compress: zipname = os.path.join(os.path.dirname(path), root) + ".zip" with zipfile.ZipFile(zipname, "w", allowZip64=True) as z: z.write(filepath, os.path.basename(filepath)) elif f == "sqlite": filepath = os.path.join(dirname, basename) for table in self._tables: table.to_sqlite(filepath) if compress: zipname = os.path.join(os.path.dirname(path), root) + ".zip" with zipfile.ZipFile(zipname, "w", allowZip64=True) as z: z.write(filepath, os.path.basename(filepath))