From 63161fe379aa9a2adc0d885bba0155cfe23ace40 Mon Sep 17 00:00:00 2001 From: Wassim Date: Sat, 1 May 2021 16:20:27 +0200 Subject: [PATCH] Add support for parsing PDFs in parallel Parse in parallel using multiprocessing library using available CPUs --- camelot/handlers.py | 72 ++++++++++++--- camelot/io.py | 4 + tests/conftest.py | 8 ++ tests/test_common.py | 206 +++++++++++++++++++++++++++---------------- 4 files changed, 202 insertions(+), 88 deletions(-) create mode 100644 tests/conftest.py diff --git a/camelot/handlers.py b/camelot/handlers.py index 9ec10bb..21a71f0 100644 --- a/camelot/handlers.py +++ b/camelot/handlers.py @@ -2,6 +2,7 @@ import os import sys +import multiprocessing as mp from PyPDF2 import PdfFileReader, PdfFileWriter @@ -140,7 +141,12 @@ class PDFHandler(object): instream.close() def parse( - self, flavor="lattice", suppress_stdout=False, layout_kwargs={}, **kwargs + self, + flavor="lattice", + suppress_stdout=False, + parallel=False, + layout_kwargs={}, + **kwargs ): """Extracts tables by calling parser.get_tables on all single page PDFs. @@ -150,8 +156,10 @@ class PDFHandler(object): flavor : str (default: 'lattice') The parsing method to use ('lattice' or 'stream'). Lattice is used by default. - suppress_stdout : str (default: False) + suppress_stdout : bool (default: False) Suppress logs and warnings. + parallel : bool (default: False) + Process pages in parallel using all available cpu cores. layout_kwargs : dict, optional (default: {}) A dict of `pdfminer.layout.LAParams `_ kwargs. kwargs : dict @@ -164,16 +172,54 @@ class PDFHandler(object): """ tables = [] + parser = Lattice(**kwargs) if flavor == "lattice" else Stream(**kwargs) with TemporaryDirectory() as tempdir: - for p in self.pages: - self._save_page(self.filepath, p, tempdir) - pages = [ - os.path.join(tempdir, f"page-{p}.pdf") for p in self.pages - ] - parser = Lattice(**kwargs) if flavor == "lattice" else Stream(**kwargs) - for p in pages: - t = parser.extract_tables( - p, suppress_stdout=suppress_stdout, layout_kwargs=layout_kwargs - ) - tables.extend(t) + cpu_count = mp.cpu_count() + # Using multiprocessing only when cpu_count > 1 to prevent + # a stallness issue when cpu_count is 1 + if parallel and cpu_count > 1: + with mp.get_context("spawn").Pool(processes=cpu_count) as pool: + jobs = [ + pool.apply_async( + self._parse_page, + (p, tempdir, parser, suppress_stdout, layout_kwargs) + ) for p in self.pages + ] + for j in jobs: + t = j.get() + tables.extend(t) + else: + for p in self.pages: + t = self._parse_page(p, tempdir, parser, suppress_stdout, layout_kwargs) + tables.extend(t) return TableList(sorted(tables)) + + def _parse_page( + self, page, tempdir, parser, suppress_stdout, layout_kwargs + ): + """Extracts tables by calling parser.get_tables on a single + page PDF. + + Parameters + ---------- + page : str + Page number to parse + parser : Lattice or Stream + The parser to use (Lattice or Stream). + suppress_stdout : bool + Suppress logs and warnings. + layout_kwargs : dict, optional (default: {}) + A dict of `pdfminer.layout.LAParams `_ kwargs. + + Returns + ------- + tables : camelot.core.TableList + List of tables found in PDF. + + """ + self._save_page(self.filepath, page, tempdir) + page_path = os.path.join(tempdir, f"page-{page}.pdf") + tables = parser.extract_tables( + page_path, suppress_stdout=suppress_stdout, layout_kwargs=layout_kwargs + ) + return tables diff --git a/camelot/io.py b/camelot/io.py index a27a7c6..50154db 100644 --- a/camelot/io.py +++ b/camelot/io.py @@ -12,6 +12,7 @@ def read_pdf( password=None, flavor="lattice", suppress_stdout=False, + parallel=False, layout_kwargs={}, **kwargs ): @@ -34,6 +35,8 @@ def read_pdf( Lattice is used by default. suppress_stdout : bool, optional (default: True) Print all logs and warnings. + parallel : bool, optional (default: False) + Process pages in parallel using all available cpu cores. layout_kwargs : dict, optional (default: {}) A dict of `pdfminer.layout.LAParams `_ kwargs. table_areas : list, optional (default: None) @@ -113,6 +116,7 @@ def read_pdf( tables = p.parse( flavor=flavor, suppress_stdout=suppress_stdout, + parallel=parallel, layout_kwargs=layout_kwargs, **kwargs ) diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..9170343 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,8 @@ +import pytest + +def pytest_generate_tests(metafunc): + if "parallel" in metafunc.fixturenames: + metafunc.parametrize("parallel", [ + pytest.param(True, id="parallel=True"), + pytest.param(False, id="parallel=False") + ]) \ No newline at end of file diff --git a/tests/test_common.py b/tests/test_common.py index cb9a968..f9e4782 100644 --- a/tests/test_common.py +++ b/tests/test_common.py @@ -14,219 +14,275 @@ from .data import * testdir = os.path.dirname(os.path.abspath(__file__)) testdir = os.path.join(testdir, "files") - -def test_parsing_report(): +def test_parsing_report(parallel): parsing_report = {"accuracy": 99.02, "whitespace": 12.24, "order": 1, "page": 1} filename = os.path.join(testdir, "foo.pdf") - tables = camelot.read_pdf(filename) + tables = camelot.read_pdf(filename, parallel=parallel) assert tables[0].parsing_report == parsing_report -def test_password(): + +def test_password(parallel): df = pd.DataFrame(data_stream) filename = os.path.join(testdir, "health_protected.pdf") - tables = camelot.read_pdf(filename, password="ownerpass", flavor="stream") + tables = camelot.read_pdf(filename, password="ownerpass", flavor="stream", parallel=parallel) assert_frame_equal(df, tables[0].df) - tables = camelot.read_pdf(filename, password="userpass", flavor="stream") + tables = camelot.read_pdf(filename, password="userpass", flavor="stream", parallel=parallel) assert_frame_equal(df, tables[0].df) -def test_stream(): + +def test_stream(parallel): df = pd.DataFrame(data_stream) filename = os.path.join(testdir, "health.pdf") - tables = camelot.read_pdf(filename, flavor="stream") + tables = camelot.read_pdf(filename, flavor="stream", parallel=parallel) assert_frame_equal(df, tables[0].df) -def test_stream_table_rotated(): + +def test_stream_table_rotated(parallel): df = pd.DataFrame(data_stream_table_rotated) filename = os.path.join(testdir, "clockwise_table_2.pdf") - tables = camelot.read_pdf(filename, flavor="stream") + tables = camelot.read_pdf(filename, flavor="stream", parallel=parallel) assert_frame_equal(df, tables[0].df) filename = os.path.join(testdir, "anticlockwise_table_2.pdf") - tables = camelot.read_pdf(filename, flavor="stream") + tables = camelot.read_pdf(filename, flavor="stream", parallel=parallel) assert_frame_equal(df, tables[0].df) -def test_stream_two_tables(): + +def test_stream_two_tables(parallel): df1 = pd.DataFrame(data_stream_two_tables_1) df2 = pd.DataFrame(data_stream_two_tables_2) filename = os.path.join(testdir, "tabula/12s0324.pdf") - tables = camelot.read_pdf(filename, flavor="stream") + tables = camelot.read_pdf(filename, flavor="stream", parallel=parallel) assert len(tables) == 2 assert df1.equals(tables[0].df) assert df2.equals(tables[1].df) -def test_stream_table_regions(): + +def test_stream_table_regions(parallel): df = pd.DataFrame(data_stream_table_areas) filename = os.path.join(testdir, "tabula/us-007.pdf") tables = camelot.read_pdf( - filename, flavor="stream", table_regions=["320,460,573,335"] + filename, flavor="stream", + parallel=parallel, + table_regions=["320,460,573,335"] ) assert_frame_equal(df, tables[0].df) -def test_stream_table_areas(): +def test_stream_table_areas(parallel): df = pd.DataFrame(data_stream_table_areas) filename = os.path.join(testdir, "tabula/us-007.pdf") tables = camelot.read_pdf( - filename, flavor="stream", table_areas=["320,500,573,335"] + filename, flavor="stream", + parallel=parallel, + table_areas=["320,500,573,335"] ) assert_frame_equal(df, tables[0].df) -def test_stream_columns(): +def test_stream_columns(parallel): df = pd.DataFrame(data_stream_columns) filename = os.path.join(testdir, "mexican_towns.pdf") tables = camelot.read_pdf( - filename, flavor="stream", columns=["67,180,230,425,475"], row_tol=10 + filename, + flavor="stream", + parallel=parallel, + columns=["67,180,230,425,475"], + row_tol=10 ) assert_frame_equal(df, tables[0].df) -def test_stream_split_text(): +def test_stream_split_text(parallel): df = pd.DataFrame(data_stream_split_text) filename = os.path.join(testdir, "tabula/m27.pdf") tables = camelot.read_pdf( filename, flavor="stream", + parallel=parallel, columns=["72,95,209,327,442,529,566,606,683"], split_text=True, ) assert_frame_equal(df, tables[0].df) -def test_stream_flag_size(): +def test_stream_flag_size(parallel): df = pd.DataFrame(data_stream_flag_size) filename = os.path.join(testdir, "superscript.pdf") - tables = camelot.read_pdf(filename, flavor="stream", flag_size=True) - assert_frame_equal(df, tables[0].df) - - -def test_stream_strip_text(): - df = pd.DataFrame(data_stream_strip_text) - - filename = os.path.join(testdir, "detect_vertical_false.pdf") - tables = camelot.read_pdf(filename, flavor="stream", strip_text=" ,\n") - assert_frame_equal(df, tables[0].df) - - -def test_stream_edge_tol(): - df = pd.DataFrame(data_stream_edge_tol) - - filename = os.path.join(testdir, "edge_tol.pdf") - tables = camelot.read_pdf(filename, flavor="stream", edge_tol=500) - assert_frame_equal(df, tables[0].df) - - -def test_stream_layout_kwargs(): - df = pd.DataFrame(data_stream_layout_kwargs) - - filename = os.path.join(testdir, "detect_vertical_false.pdf") tables = camelot.read_pdf( - filename, flavor="stream", layout_kwargs={"detect_vertical": False} + filename, + flavor="stream", + parallel=parallel, + flag_size=True ) assert_frame_equal(df, tables[0].df) -def test_lattice(): +def test_stream_strip_text(parallel): + df = pd.DataFrame(data_stream_strip_text) + + filename = os.path.join(testdir, "detect_vertical_false.pdf") + tables = camelot.read_pdf( + filename, flavor="stream", + parallel=parallel, + strip_text=" ,\n" + ) + assert_frame_equal(df, tables[0].df) + + +def test_stream_edge_tol(parallel): + df = pd.DataFrame(data_stream_edge_tol) + + filename = os.path.join(testdir, "edge_tol.pdf") + tables = camelot.read_pdf( + filename, flavor="stream", + parallel=parallel, + edge_tol=500 + ) + assert_frame_equal(df, tables[0].df) + + +def test_stream_layout_kwargs(parallel): + df = pd.DataFrame(data_stream_layout_kwargs) + + filename = os.path.join(testdir, "detect_vertical_false.pdf") + tables = camelot.read_pdf( + filename, + flavor="stream", + parallel=parallel, + layout_kwargs={"detect_vertical": False} + ) + assert_frame_equal(df, tables[0].df) + + +def test_lattice(parallel): df = pd.DataFrame(data_lattice) filename = os.path.join( testdir, "tabula/icdar2013-dataset/competition-dataset-us/us-030.pdf" ) - tables = camelot.read_pdf(filename, pages="2") + tables = camelot.read_pdf(filename, pages="2", parallel=parallel) assert_frame_equal(df, tables[0].df) -def test_lattice_table_rotated(): +def test_lattice_table_rotated(parallel): df = pd.DataFrame(data_lattice_table_rotated) filename = os.path.join(testdir, "clockwise_table_1.pdf") - tables = camelot.read_pdf(filename) + tables = camelot.read_pdf(filename, parallel=parallel) assert_frame_equal(df, tables[0].df) filename = os.path.join(testdir, "anticlockwise_table_1.pdf") - tables = camelot.read_pdf(filename) + tables = camelot.read_pdf(filename, parallel=parallel) assert_frame_equal(df, tables[0].df) -def test_lattice_two_tables(): +def test_lattice_two_tables(parallel): df1 = pd.DataFrame(data_lattice_two_tables_1) df2 = pd.DataFrame(data_lattice_two_tables_2) filename = os.path.join(testdir, "twotables_2.pdf") - tables = camelot.read_pdf(filename) + tables = camelot.read_pdf(filename, parallel=parallel) assert len(tables) == 2 assert df1.equals(tables[0].df) assert df2.equals(tables[1].df) -def test_lattice_table_regions(): +def test_lattice_table_regions(parallel): df = pd.DataFrame(data_lattice_table_regions) filename = os.path.join(testdir, "table_region.pdf") - tables = camelot.read_pdf(filename, table_regions=["170,370,560,270"]) + tables = camelot.read_pdf( + filename, + parallel=parallel, + table_regions=["170,370,560,270"] + ) assert_frame_equal(df, tables[0].df) -def test_lattice_table_areas(): +def test_lattice_table_areas(parallel): df = pd.DataFrame(data_lattice_table_areas) filename = os.path.join(testdir, "twotables_2.pdf") - tables = camelot.read_pdf(filename, table_areas=["80,693,535,448"]) + tables = camelot.read_pdf( + filename, + parallel=parallel, + table_areas=["80,693,535,448"] + ) assert_frame_equal(df, tables[0].df) -def test_lattice_process_background(): +def test_lattice_process_background(parallel): df = pd.DataFrame(data_lattice_process_background) filename = os.path.join(testdir, "background_lines_1.pdf") - tables = camelot.read_pdf(filename, process_background=True) + tables = camelot.read_pdf( + filename, + parallel=parallel, + process_background=True, + ) assert_frame_equal(df, tables[1].df) -def test_lattice_copy_text(): +def test_lattice_copy_text(parallel): df = pd.DataFrame(data_lattice_copy_text) filename = os.path.join(testdir, "row_span_1.pdf") - tables = camelot.read_pdf(filename, line_scale=60, copy_text="v") + tables = camelot.read_pdf( + filename, + parallel=parallel, + line_scale=60, + copy_text="v" + ) assert_frame_equal(df, tables[0].df) -def test_lattice_shift_text(): +def test_lattice_shift_text(parallel): df_lt = pd.DataFrame(data_lattice_shift_text_left_top) df_disable = pd.DataFrame(data_lattice_shift_text_disable) df_rb = pd.DataFrame(data_lattice_shift_text_right_bottom) filename = os.path.join(testdir, "column_span_2.pdf") - tables = camelot.read_pdf(filename, line_scale=40) + tables = camelot.read_pdf(filename, parallel=parallel, line_scale=40) assert df_lt.equals(tables[0].df) - tables = camelot.read_pdf(filename, line_scale=40, shift_text=[""]) + tables = camelot.read_pdf( + filename, + parallel=parallel, + line_scale=40, + shift_text=[""] + ) assert df_disable.equals(tables[0].df) - tables = camelot.read_pdf(filename, line_scale=40, shift_text=["r", "b"]) + tables = camelot.read_pdf( + filename, + parallel=parallel, + line_scale=40, + shift_text=["r", "b"] + ) assert df_rb.equals(tables[0].df) -def test_repr(): +def test_repr(parallel): filename = os.path.join(testdir, "foo.pdf") - tables = camelot.read_pdf(filename) + tables = camelot.read_pdf(filename, parallel=parallel) assert repr(tables) == "" assert repr(tables[0]) == "" assert ( @@ -234,23 +290,23 @@ def test_repr(): ) -def test_pages(): +def test_pages(parallel): url = "https://camelot-py.readthedocs.io/en/master/_static/pdf/foo.pdf" - tables = camelot.read_pdf(url) + tables = camelot.read_pdf(url, parallel=parallel) assert repr(tables) == "" assert repr(tables[0]) == "
" assert ( repr(tables[0].cells[0][0]) == "" ) - tables = camelot.read_pdf(url, pages="1-end") + tables = camelot.read_pdf(url, pages="1-end", parallel=parallel) assert repr(tables) == "" assert repr(tables[0]) == "
" assert ( repr(tables[0].cells[0][0]) == "" ) - tables = camelot.read_pdf(url, pages="all") + tables = camelot.read_pdf(url, pages="all", parallel=parallel) assert repr(tables) == "" assert repr(tables[0]) == "
" assert ( @@ -258,9 +314,9 @@ def test_pages(): ) -def test_url(): +def test_url(parallel): url = "https://camelot-py.readthedocs.io/en/master/_static/pdf/foo.pdf" - tables = camelot.read_pdf(url) + tables = camelot.read_pdf(url, parallel=parallel) assert repr(tables) == "" assert repr(tables[0]) == "
" assert ( @@ -268,11 +324,11 @@ def test_url(): ) -def test_arabic(): +def test_arabic(parallel): df = pd.DataFrame(data_arabic) filename = os.path.join(testdir, "tabula/arabic.pdf") - tables = camelot.read_pdf(filename) + tables = camelot.read_pdf(filename, parallel=parallel) assert_frame_equal(df, tables[0].df)