| import io |
| import json |
| import os |
| import re |
| import time |
| from pathlib import Path |
| from typing import Any, Dict, List, Tuple |
|
|
| import boto3 |
| import pandas as pd |
| import pikepdf |
|
|
| from tools.config import ( |
| AWS_ACCESS_KEY, |
| AWS_REGION, |
| AWS_SECRET_KEY, |
| PRIORITISE_SSO_OVER_AWS_ENV_ACCESS_KEYS, |
| RUN_AWS_FUNCTIONS, |
| SPLIT_PUNCTUATION_FROM_WORDS, |
| TEXTRACT_TEXT_EXTRACT_OPTION, |
| ) |
| from tools.custom_image_analyser_engine import CustomImageRecognizerResult, OCRResult |
| from tools.helper_functions import _generate_unique_ids |
| from tools.secure_path_utils import secure_file_read |
|
|
|
|
| def extract_textract_metadata(response: object): |
| """Extracts metadata from an AWS Textract response.""" |
|
|
| request_id = response["ResponseMetadata"]["RequestId"] |
| pages = response["DocumentMetadata"]["Pages"] |
|
|
| return str({"RequestId": request_id, "Pages": pages}) |
|
|
|
|
| def textract_prioritizes_signature_extraction( |
| text_extraction_method: str, |
| handwrite_signature_checkbox: List[str] | None = None, |
| ) -> bool: |
| """ |
| True when AWS Textract signature analysis is active. |
| |
| CUSTOM_VLM_SIGNATURE VLM passes should be skipped in this case so signatures |
| are not detected twice (Textract signature extraction takes priority). |
| """ |
| return ( |
| text_extraction_method == TEXTRACT_TEXT_EXTRACT_OPTION |
| and "Extract signatures" in (handwrite_signature_checkbox or []) |
| ) |
|
|
|
|
| def analyse_page_with_textract( |
| pdf_page_bytes: object, |
| page_no: int, |
| client: str = "", |
| handwrite_signature_checkbox: List[str] = ["Extract handwriting"], |
| textract_output_found: bool = False, |
| aws_access_question_textbox: str = AWS_ACCESS_KEY, |
| aws_secret_question_textbox: str = AWS_SECRET_KEY, |
| RUN_AWS_FUNCTIONS: bool = RUN_AWS_FUNCTIONS, |
| PRIORITISE_SSO_OVER_AWS_ENV_ACCESS_KEYS: bool = PRIORITISE_SSO_OVER_AWS_ENV_ACCESS_KEYS, |
| textract_max_retries: int = 10, |
| ): |
| """ |
| Analyzes a single page of a document using AWS Textract to extract text and other features. |
| |
| Args: |
| pdf_page_bytes (object): The content of the PDF page or image as bytes. |
| page_no (int): The page number being analyzed. |
| client (str, optional): An optional pre-initialized AWS Textract client. If not provided, |
| the function will attempt to create one based on configuration. |
| Defaults to "". |
| handwrite_signature_checkbox (List[str], optional): A list of feature types to extract |
| from the document. Options include |
| "Extract handwriting", "Extract signatures", |
| "Extract forms", "Extract layout", "Extract tables". |
| Defaults to ["Extract handwriting"]. |
| textract_output_found (bool, optional): A flag indicating whether existing Textract output |
| for the document has been found. This can prevent |
| unnecessary API calls. Defaults to False. |
| aws_access_question_textbox (str, optional): AWS access question provided by the user, if not using |
| SSO or environment variables. Defaults to AWS_ACCESS_KEY. |
| aws_secret_question_textbox (str, optional): AWS secret question provided by the user, if not using |
| SSO or environment variables. Defaults to AWS_SECRET_KEY. |
| RUN_AWS_FUNCTIONS (bool, optional): Configuration flag to enable or |
| disable AWS functions. Defaults to RUN_AWS_FUNCTIONS. |
| PRIORITISE_SSO_OVER_AWS_ENV_ACCESS_KEYS (bool, optional): Configuration flag (e.g., True or False) |
| to prioritize AWS SSO credentials |
| over environment variables. |
| Defaults to True. |
| textract_max_retries (int, optional): Maximum number of attempts for each Textract API call. |
| Defaults to 2. |
| |
| Returns: |
| Tuple[List[Dict], str]: A tuple containing: |
| - A list of dictionaries, where each dictionary represents a Textract block (e.g., LINE, WORD, FORM, TABLE). |
| - A string containing metadata about the Textract request. |
| """ |
|
|
| |
| if client == "": |
| try: |
| |
| if RUN_AWS_FUNCTIONS and PRIORITISE_SSO_OVER_AWS_ENV_ACCESS_KEYS: |
| print("Connecting to Textract via existing SSO connection") |
| client = boto3.client("textract", region_name=AWS_REGION) |
| elif aws_access_question_textbox and aws_secret_question_textbox: |
| print( |
| "Connecting to Textract using AWS access question and secret questions from user input." |
| ) |
| client = boto3.client( |
| "textract", |
| aws_access_question_id=aws_access_question_textbox, |
| aws_secret_access_question=aws_secret_question_textbox, |
| region_name=AWS_REGION, |
| ) |
| elif RUN_AWS_FUNCTIONS is True: |
| print("Connecting to Textract via existing SSO connection") |
| client = boto3.client("textract", region_name=AWS_REGION) |
| elif AWS_ACCESS_KEY and AWS_SECRET_KEY: |
| print("Getting Textract credentials from environment variables.") |
| client = boto3.client( |
| "textract", |
| aws_access_question_id=AWS_ACCESS_KEY, |
| aws_secret_access_question=AWS_SECRET_KEY, |
| region_name=AWS_REGION, |
| ) |
| elif textract_output_found is True: |
| print( |
| "Existing Textract data found for file, no need to connect to AWS Textract" |
| ) |
| client = boto3.client("textract", region_name=AWS_REGION) |
| else: |
| client = "" |
| out_message = "Cannot connect to AWS Textract service." |
| print(out_message) |
| raise Exception(out_message) |
| except Exception as e: |
| out_message = "Cannot connect to AWS Textract" |
| print(out_message, "due to:", e) |
| raise Exception(out_message) |
| return [], "" |
|
|
| |
| feature_types = list() |
| if ( |
| "Extract signatures" in handwrite_signature_checkbox |
| or "Extract forms" in handwrite_signature_checkbox |
| or "Extract layout" in handwrite_signature_checkbox |
| or "Extract tables" in handwrite_signature_checkbox |
| ): |
| if "Extract signatures" in handwrite_signature_checkbox: |
| feature_types.append("SIGNATURES") |
| if "Extract forms" in handwrite_signature_checkbox: |
| feature_types.append("FORMS") |
| if "Extract layout" in handwrite_signature_checkbox: |
| feature_types.append("LAYOUT") |
| if "Extract tables" in handwrite_signature_checkbox: |
| feature_types.append("TABLES") |
| for attempt in range(textract_max_retries): |
| try: |
| response = client.analyze_document( |
| Document={"Bytes": pdf_page_bytes}, FeatureTypes=feature_types |
| ) |
| break |
| except Exception as e: |
| if attempt == textract_max_retries - 1: |
| raise |
| print( |
| "Textract call failed due to:", |
| e, |
| f"trying again in 1 second (attempt {attempt + 1}/{textract_max_retries}).", |
| ) |
| time.sleep(1) |
|
|
| if ( |
| "Extract signatures" not in handwrite_signature_checkbox |
| and "Extract forms" not in handwrite_signature_checkbox |
| and "Extract layout" not in handwrite_signature_checkbox |
| and "Extract tables" not in handwrite_signature_checkbox |
| ): |
| |
| for attempt in range(textract_max_retries): |
| try: |
| response = client.detect_document_text( |
| Document={"Bytes": pdf_page_bytes} |
| ) |
| break |
| except Exception as e: |
| if attempt == textract_max_retries - 1: |
| raise |
| print( |
| "Textract call failed due to:", |
| e, |
| f"trying again in 1 second (attempt {attempt + 1}/{textract_max_retries}).", |
| ) |
| time.sleep(1) |
|
|
| |
| if "Blocks" in response: |
| for block in response["Blocks"]: |
| block["Page"] = page_no |
|
|
| |
| wrapped_response = {"page_no": page_no, "data": response} |
|
|
| request_metadata = extract_textract_metadata( |
| response |
| ) |
|
|
| |
| return ( |
| wrapped_response, |
| request_metadata, |
| ) |
|
|
|
|
| def convert_pike_pdf_page_to_bytes(pdf: object, page_num: int): |
| |
| new_pdf = pikepdf.Pdf.new() |
|
|
| |
| page_num = 0 |
|
|
| |
| new_pdf.pages.append(pdf.pages[page_num]) |
|
|
| |
| buffer = io.BytesIO() |
| new_pdf.save(buffer) |
|
|
| |
| pdf_bytes = buffer.getanswer() |
|
|
| |
| buffer.close() |
|
|
| return pdf_bytes |
|
|
|
|
| def split_word_with_punctuation( |
| word_text: str, |
| bounding_box: Tuple[int, int, int, int], |
| confidence: float, |
| ) -> List[Dict[str, Any]]: |
| """ |
| Split a word that may contain punctuation into separate word entries. |
| Only separates punctuation at the start and end of words. |
| Punctuation in the middle (e.g., in email addresses like user@example.com) |
| is kept as part of the word. |
| |
| Args: |
| word_text: The text of the word (may contain punctuation) |
| bounding_box: Tuple of (left, top, right, bottom) in pixels |
| confidence: Confidence score for the original word |
| |
| Returns: |
| List of word dictionaries, each with text and bounding_box. |
| Leading and trailing punctuation become separate entries, while |
| the middle part (which may contain internal punctuation) remains intact. |
| """ |
| if not word_text: |
| return [] |
|
|
| |
| leading_punct_match = re.match(r"^([^\w\s]+)", word_text) |
| leading_punct = leading_punct_match.group(1) if leading_punct_match else "" |
|
|
| |
| trailing_punct_match = re.search(r"([^\w\s]+)$", word_text) |
| trailing_punct = trailing_punct_match.group(1) if trailing_punct_match else "" |
|
|
| |
| |
| start_idx = len(leading_punct) |
| end_idx = len(word_text) - len(trailing_punct) if trailing_punct else len(word_text) |
| middle_part = word_text[start_idx:end_idx] if start_idx < end_idx else "" |
|
|
| |
| parts = [] |
| if leading_punct: |
| parts.append(leading_punct) |
| if middle_part: |
| parts.append(middle_part) |
| if trailing_punct: |
| parts.append(trailing_punct) |
|
|
| |
| if len(parts) == 0: |
| return [ |
| { |
| "text": word_text, |
| "confidence": confidence, |
| "bounding_box": bounding_box, |
| } |
| ] |
|
|
| |
| if len(parts) == 1: |
| return [ |
| { |
| "text": word_text, |
| "confidence": confidence, |
| "bounding_box": bounding_box, |
| } |
| ] |
|
|
| |
| left, top, right, bottom = bounding_box |
| width = right - left |
| bottom - top |
|
|
| |
| total_chars = len(word_text) |
| if total_chars == 0: |
| return [] |
|
|
| |
| |
| PUNCTUATION_WIDTH_SCALE = ( |
| 0.4 |
| ) |
|
|
| |
| |
| total_effective_chars = 0 |
| part_info = [] |
|
|
| for part in parts: |
| if not part: |
| continue |
| |
| is_punctuation_only = not bool(re.search(r"[\w]", part)) |
| if is_punctuation_only: |
| effective_length = len(part) * PUNCTUATION_WIDTH_SCALE |
| else: |
| effective_length = len(part) |
| part_info.append( |
| { |
| "text": part, |
| "length": len(part), |
| "effective_length": effective_length, |
| "is_punctuation": is_punctuation_only, |
| } |
| ) |
| total_effective_chars += effective_length |
|
|
| if total_effective_chars == 0: |
| return [] |
|
|
| |
| effective_char_width = width / total_effective_chars |
|
|
| |
| word_entries = [] |
| current_pos = 0 |
|
|
| for info in part_info: |
| |
| |
| part_width = info["effective_length"] * effective_char_width |
|
|
| |
| part_left = left + current_pos |
| part_right = part_left + part_width |
|
|
| word_entries.append( |
| { |
| "text": info["text"], |
| "confidence": confidence, |
| "bounding_box": ( |
| int(part_left), |
| int(top), |
| int(part_right), |
| int(bottom), |
| ), |
| } |
| ) |
|
|
| |
| current_pos += part_width |
|
|
| return word_entries |
|
|
|
|
| def json_to_ocrresult( |
| json_data: dict, page_width: float, page_height: float, page_no: int |
| ): |
| """ |
| Convert Textract JSON to structured OCR, handling lines, words, signatures, |
| selection elements (associating them with lines), and question-answer form data. |
| The question-answer data is sorted in a top-to-bottom, left-to-right reading order. |
| |
| Args: |
| json_data (dict): The raw JSON output from AWS Textract for a specific page. |
| page_width (float): The width of the page in pixels or points. |
| page_height (float): The height of the page in pixels or points. |
| page_no (int): The 1-based page number being processed. |
| """ |
| |
| |
| |
| page_json_data = json_data |
|
|
| if "Blocks" in page_json_data: |
| |
| text_blocks = page_json_data["Blocks"] |
| |
| elif "page_no" in page_json_data: |
| text_blocks = page_json_data["data"]["Blocks"] |
| else: |
| text_blocks = [] |
|
|
| block_map = {block["Id"]: block for block in text_blocks} |
|
|
| lines_data = list() |
| selections_data = list() |
| signature_or_handwriting_recogniser_results = list() |
| signature_recogniser_results = list() |
| handwriting_recogniser_results = list() |
|
|
| def _get_text_from_block(block, b_map): |
| text_parts = list() |
| if "Relationships" in block: |
| for rel in block["Relationships"]: |
| if rel["Type"] == "CHILD": |
| for child_id in rel["Ids"]: |
| child = b_map.get(child_id) |
| if child: |
| if child["BlockType"] == "WORD": |
| text_parts.append(child["Text"]) |
| elif child["BlockType"] == "SELECTION_ELEMENT": |
| text_parts.append(f"[{child['SelectionStatus']}]") |
| return " ".join(text_parts) |
|
|
| |
|
|
| for block in text_blocks: |
| block_type = block.get("BlockType") |
|
|
| if block_type == "LINE": |
| bbox = block["Geometry"]["BoundingBox"] |
| line_info = { |
| "id": block["Id"], |
| "text": block.get("Text", ""), |
| "confidence": round(block.get("Confidence", 0.0), 0), |
| "words": [], |
| "geometry": { |
| "left": int(bbox["Left"] * page_width), |
| "top": int(bbox["Top"] * page_height), |
| "width": int(bbox["Width"] * page_width), |
| "height": int(bbox["Height"] * page_height), |
| }, |
| } |
| if "Relationships" in block: |
| for rel in block.get("Relationships", []): |
| if rel["Type"] == "CHILD": |
| for child_id in rel["Ids"]: |
| word_block = block_map.get(child_id) |
| if word_block and word_block["BlockType"] == "WORD": |
| w_bbox = word_block["Geometry"]["BoundingBox"] |
| word_text = word_block.get("Text", "") |
| word_confidence = round( |
| word_block.get("Confidence", 0.0), 0 |
| ) |
| original_bounding_box = ( |
| int(w_bbox["Left"] * page_width), |
| int(w_bbox["Top"] * page_height), |
| int( |
| (w_bbox["Left"] + w_bbox["Width"]) * page_width |
| ), |
| int( |
| (w_bbox["Top"] + w_bbox["Height"]) * page_height |
| ), |
| ) |
|
|
| |
| if SPLIT_PUNCTUATION_FROM_WORDS: |
| split_words = split_word_with_punctuation( |
| word_text, |
| original_bounding_box, |
| word_confidence, |
| ) |
| else: |
| |
| split_words = [ |
| { |
| "text": word_text, |
| "confidence": word_confidence, |
| "bounding_box": original_bounding_box, |
| } |
| ] |
|
|
| |
| for split_word in split_words: |
| line_info["words"].append(split_word) |
|
|
| |
| if word_block.get("TextType") == "HANDWRITING": |
| |
| for split_word in split_words: |
| split_bbox = split_word["bounding_box"] |
| rec_res = CustomImageRecognizerResult( |
| entity_type="HANDWRITING", |
| text=split_word["text"], |
| score=split_word["confidence"], |
| start=0, |
| end=len(split_word["text"]), |
| left=split_bbox[0], |
| top=split_bbox[1], |
| width=split_bbox[2] - split_bbox[0], |
| height=split_bbox[3] - split_bbox[1], |
| ) |
| handwriting_recogniser_results.append(rec_res) |
| signature_or_handwriting_recogniser_results.append( |
| rec_res |
| ) |
| lines_data.append(line_info) |
|
|
| elif block_type == "SELECTION_ELEMENT": |
| bbox = block["Geometry"]["BoundingBox"] |
| selections_data.append( |
| { |
| "id": block["Id"], |
| "status": block.get("SelectionStatus", "UNKNOWN"), |
| "confidence": round(block.get("Confidence", 0.0), 0), |
| "geometry": { |
| "left": int(bbox["Left"] * page_width), |
| "top": int(bbox["Top"] * page_height), |
| "width": int(bbox["Width"] * page_width), |
| "height": int(bbox["Height"] * page_height), |
| }, |
| } |
| ) |
|
|
| elif block_type == "SIGNATURE": |
| bbox = block["Geometry"]["BoundingBox"] |
| rec_res = CustomImageRecognizerResult( |
| entity_type="SIGNATURE", |
| text="SIGNATURE", |
| score=round(block.get("Confidence", 0.0), 0), |
| start=0, |
| end=9, |
| left=int(bbox["Left"] * page_width), |
| top=int(bbox["Top"] * page_height), |
| width=int(bbox["Width"] * page_width), |
| height=int(bbox["Height"] * page_height), |
| ) |
| signature_recogniser_results.append(rec_res) |
| signature_or_handwriting_recogniser_results.append(rec_res) |
|
|
| |
| def _create_question_answer_results_object(text_blocks): |
| question_answer_results = list() |
| key_blocks = [ |
| b |
| for b in text_blocks |
| if b.get("BlockType") == "KEY_VALUE_SET" |
| and "KEY" in b.get("EntityTypes", []) |
| ] |
| for question_block in key_blocks: |
| answer_block = next( |
| ( |
| block_map.get(rel["Ids"][0]) |
| for rel in question_block.get("Relationships", []) |
| if rel["Type"] == "VALUE" |
| ), |
| None, |
| ) |
|
|
| |
| if answer_block: |
| question_bbox = question_block["Geometry"]["BoundingBox"] |
| |
| answer_bbox = answer_block["Geometry"]["BoundingBox"] |
|
|
| question_answer_results.append( |
| { |
| |
| "Page": page_no, |
| "Question": _get_text_from_block(question_block, block_map), |
| "Answer": _get_text_from_block(answer_block, block_map), |
| "Confidence Score % (Question)": round( |
| question_block.get("Confidence", 0.0), 0 |
| ), |
| "Confidence Score % (Answer)": round( |
| answer_block.get("Confidence", 0.0), 0 |
| ), |
| "Question_left": round(question_bbox["Left"], 5), |
| "Question_top": round(question_bbox["Top"], 5), |
| "Question_width": round(question_bbox["Width"], 5), |
| "Question_height": round(question_bbox["Height"], 5), |
| "Answer_left": round(answer_bbox["Left"], 5), |
| "Answer_top": round(answer_bbox["Top"], 5), |
| "Answer_width": round(answer_bbox["Width"], 5), |
| "Answer_height": round(answer_bbox["Height"], 5), |
| } |
| ) |
|
|
| question_answer_results.sort( |
| key=lambda item: (item["Question_top"], item["Question_left"]) |
| ) |
|
|
| return question_answer_results |
|
|
| question_answer_results = _create_question_answer_results_object(text_blocks) |
|
|
| |
| unmatched_selections = list() |
| for selection in selections_data: |
| best_match_line = None |
| min_dist = float("inf") |
| sel_geom = selection["geometry"] |
| sel_y_center = sel_geom["top"] + sel_geom["height"] / 2 |
| for line in lines_data: |
| line_geom = line["geometry"] |
| line_y_center = line_geom["top"] + line_geom["height"] / 2 |
| if abs(sel_y_center - line_y_center) < line_geom["height"]: |
| dist = 0 |
| if sel_geom["left"] > (line_geom["left"] + line_geom["width"]): |
| dist = sel_geom["left"] - (line_geom["left"] + line_geom["width"]) |
| elif line_geom["left"] > (sel_geom["left"] + sel_geom["width"]): |
| dist = line_geom["left"] - (sel_geom["left"] + sel_geom["width"]) |
| if dist < min_dist: |
| min_dist = dist |
| best_match_line = line |
| if best_match_line and min_dist < (best_match_line["geometry"]["height"] * 5): |
| selection_as_word = { |
| "text": f"[{selection['status']}]", |
| "confidence": round(selection["confidence"], 0), |
| "bounding_box": ( |
| sel_geom["left"], |
| sel_geom["top"], |
| sel_geom["left"] + sel_geom["width"], |
| sel_geom["top"] + sel_geom["height"], |
| ), |
| } |
| best_match_line["words"].append(selection_as_word) |
| best_match_line["words"].sort(key=lambda w: w["bounding_box"][0]) |
| else: |
| unmatched_selections.append(selection) |
|
|
| |
| all_ocr_results = list() |
| ocr_results_with_words = dict() |
| selection_element_results = list() |
| for i, line in enumerate(lines_data): |
| line_num = i + 1 |
| line_geom = line["geometry"] |
| reconstructed_text = " ".join(w["text"] for w in line["words"]) |
| all_ocr_results.append( |
| OCRResult( |
| reconstructed_text, |
| line_geom["left"], |
| line_geom["top"], |
| line_geom["width"], |
| line_geom["height"], |
| round(line["confidence"], 0), |
| line_num, |
| model="Textract", |
| ) |
| ) |
| ocr_results_with_words[f"text_line_{line_num}"] = { |
| "line": line_num, |
| "text": reconstructed_text, |
| "confidence": line["confidence"], |
| "bounding_box": ( |
| line_geom["left"], |
| line_geom["top"], |
| line_geom["left"] + line_geom["width"], |
| line_geom["top"] + line_geom["height"], |
| ), |
| "words": line["words"], |
| "page": page_no, |
| "model": "Textract", |
| } |
| for selection in unmatched_selections: |
| sel_geom = selection["geometry"] |
| sel_text = f"[{selection['status']}]" |
| all_ocr_results.append( |
| OCRResult( |
| sel_text, |
| sel_geom["left"], |
| sel_geom["top"], |
| sel_geom["width"], |
| sel_geom["height"], |
| round(selection["confidence"], 0), |
| -1, |
| ) |
| ) |
| for selection in selections_data: |
| sel_geom = selection["geometry"] |
| selection_element_results.append( |
| { |
| "status": selection["status"], |
| "confidence": round(selection["confidence"], 0), |
| "bounding_box": ( |
| sel_geom["left"], |
| sel_geom["top"], |
| sel_geom["left"] + sel_geom["width"], |
| sel_geom["top"] + sel_geom["height"], |
| ), |
| "page": page_no, |
| } |
| ) |
|
|
| all_ocr_results_with_page = {"page": page_no, "results": all_ocr_results} |
| ocr_results_with_words_with_page = { |
| "page": page_no, |
| "results": ocr_results_with_words, |
| } |
|
|
| return ( |
| all_ocr_results_with_page, |
| signature_or_handwriting_recogniser_results, |
| signature_recogniser_results, |
| handwriting_recogniser_results, |
| ocr_results_with_words_with_page, |
| selection_element_results, |
| question_answer_results, |
| ) |
|
|
|
|
| def load_and_convert_textract_json( |
| textract_json_file_path: str, |
| log_files_output_paths: str, |
| page_sizes_df: pd.DataFrame, |
| ): |
| """ |
| Loads Textract JSON from a file, detects if conversion is needed, and converts if necessary. |
| |
| Args: |
| textract_json_file_path (str): The file path to the Textract JSON output. |
| log_files_output_paths (str): A list of paths to log files, used for tracking. |
| page_sizes_df (pd.DataFrame): A DataFrame containing page size information for the document. |
| """ |
|
|
| if not os.path.exists(textract_json_file_path): |
| print("No existing Textract results file found.") |
| return ( |
| {}, |
| True, |
| log_files_output_paths, |
| ) |
|
|
| print("Found existing Textract json results file.") |
|
|
| |
| if textract_json_file_path not in log_files_output_paths: |
| log_files_output_paths.append(textract_json_file_path) |
|
|
| try: |
| |
| textract_json_file_path_obj = Path(textract_json_file_path) |
| base_dir = textract_json_file_path_obj.parent |
| filename = textract_json_file_path_obj.name |
|
|
| json_content = secure_file_read(base_dir, filename, encoding="utf-8") |
| textract_data = json.loads(json_content) |
| except json.JSONDecodeError: |
| print("Error: Failed to parse Textract JSON file. Returning empty data.") |
| return {}, True, log_files_output_paths |
|
|
| |
| if "pages" in textract_data: |
| print("JSON already in the correct format for app. No changes needed.") |
| return textract_data, False, log_files_output_paths |
|
|
| if "Blocks" in textract_data: |
| print("Need to convert Textract JSON to app format.") |
| try: |
|
|
| textract_data = restructure_textract_output(textract_data, page_sizes_df) |
| return ( |
| textract_data, |
| False, |
| log_files_output_paths, |
| ) |
|
|
| except Exception as e: |
| print("Failed to convert JSON data to app format due to:", e) |
| return {}, True, log_files_output_paths |
| else: |
| print("Invalid Textract JSON format: 'Blocks' missing.") |
| |
| return ( |
| {}, |
| True, |
| log_files_output_paths, |
| ) |
|
|
|
|
| def restructure_textract_output(textract_output: dict, page_sizes_df: pd.DataFrame): |
| """ |
| Reorganise Textract output from the bulk Textract analysis option on AWS |
| into a format that works in this redaction app, reducing size. |
| |
| Args: |
| textract_output (dict): The raw JSON output from AWS Textract. |
| page_sizes_df (pd.DataFrame): A Pandas DataFrame containing page size |
| information, including cropbox and mediabox |
| dimensions and offsets for each page. |
| """ |
| pages_dict = dict() |
|
|
| |
| document_metadata = textract_output.get("DocumentMetadata", {}) |
|
|
| |
| if "page" in page_sizes_df.columns: |
| page_sizes_df = page_sizes_df.set_index("page") |
|
|
| for block in textract_output.get("Blocks", []): |
| page_no = block.get("Page", 1) |
|
|
| |
| try: |
| page_info = page_sizes_df.loc[page_no] |
| cb_width = page_info["cropbox_width"] |
| cb_height = page_info["cropbox_height"] |
| mb_width = page_info["mediabox_width"] |
| mb_height = page_info["mediabox_height"] |
| cb_x_offset = page_info["cropbox_x_offset"] |
| cb_y_offset_top = page_info["cropbox_y_offset_from_top"] |
|
|
| |
| needs_conversion = ( |
| (abs(cb_width - mb_width) > 1e-6 or abs(cb_height - mb_height) > 1e-6) |
| and mb_width > 1e-6 |
| and mb_height > 1e-6 |
| ) |
|
|
| if needs_conversion and "Geometry" in block: |
| geometry = block["Geometry"] |
|
|
| |
| if "BoundingBox" in geometry: |
| bbox = geometry["BoundingBox"] |
| old_left = bbox["Left"] |
| old_top = bbox["Top"] |
| old_width = bbox["Width"] |
| old_height = bbox["Height"] |
|
|
| |
| abs_cb_x = old_left * cb_width |
| abs_cb_y = old_top * cb_height |
| abs_cb_width = old_width * cb_width |
| abs_cb_height = old_height * cb_height |
|
|
| |
| abs_mb_x = cb_x_offset + abs_cb_x |
| abs_mb_y = cb_y_offset_top + abs_cb_y |
|
|
| |
| bbox["Left"] = abs_mb_x / mb_width |
| bbox["Top"] = abs_mb_y / mb_height |
| bbox["Width"] = abs_cb_width / mb_width |
| bbox["Height"] = abs_cb_height / mb_height |
| except KeyError: |
| print( |
| f"Warning: Page number {page_no} not found in page_sizes_df. Skipping coordinate conversion for this block." |
| ) |
| |
| except ZeroDivisionError: |
| print( |
| f"Warning: MediaBox width or height is zero for page {page_no}. Skipping coordinate conversion for this block." |
| ) |
|
|
| |
| if page_no not in pages_dict: |
| pages_dict[page_no] = {"page_no": str(page_no), "data": {"Blocks": []}} |
|
|
| |
| filtered_block = { |
| question: block[question] |
| for question in [ |
| "BlockType", |
| "Confidence", |
| "Text", |
| "Geometry", |
| "Page", |
| "Id", |
| "Relationships", |
| ] |
| if question in block |
| } |
|
|
| pages_dict[page_no]["data"]["Blocks"].append(filtered_block) |
|
|
| |
| structured_output = { |
| "DocumentMetadata": document_metadata, |
| "pages": [pages_dict[page] for page in sorted(pages_dict.keys())], |
| } |
|
|
| return structured_output |
|
|
|
|
| def convert_question_answer_to_dataframe( |
| question_answer_results: List[Dict[str, Any]], page_sizes_df: pd.DataFrame |
| ) -> pd.DataFrame: |
| """ |
| Convert question-answer results to DataFrame format matching convert_annotation_data_to_dataframe. |
| |
| Each Question and Answer will be on separate lines in the resulting dataframe. |
| The 'image' column will be populated with the page number as f'placeholder_image_page{i}.png'. |
| |
| Args: |
| question_answer_results: List of question-answer dictionaries from _create_question_answer_results_object |
| page_sizes_df: DataFrame containing page sizes |
| |
| Returns: |
| pd.DataFrame: DataFrame with columns ["image", "page", "label", "color", "xmin", "xmax", "ymin", "ymax", "text", "id"] |
| """ |
|
|
| if not question_answer_results: |
| |
| return pd.DataFrame( |
| columns=[ |
| "image", |
| "page", |
| "label", |
| "color", |
| "xmin", |
| "xmax", |
| "ymin", |
| "ymax", |
| "text", |
| "id", |
| ] |
| ) |
|
|
| |
| rows = list() |
| existing_ids = set() |
|
|
| for i, qa_result in enumerate(question_answer_results): |
| page_num = int(qa_result.get("Page", 1)) |
| page_sizes_df["page"] = pd.to_numeric(page_sizes_df["page"], errors="coerce") |
| page_sizes_df.dropna(subset=["page"], inplace=True) |
| if not page_sizes_df.empty: |
| page_sizes_df["page"] = page_sizes_df["page"].astype(int) |
| else: |
| print("Warning: Page sizes DataFrame became empty after processing.") |
|
|
| image_name = page_sizes_df.loc[ |
| page_sizes_df["page"] == page_num, "image_path" |
| ].iloc[0] |
| if pd.isna(image_name): |
| image_name = f"placeholder_image_{page_num}.png" |
|
|
| |
| question_bbox = { |
| "Question_left": qa_result.get("Question_left", 0), |
| "Question_top": qa_result.get("Question_top", 0), |
| "Question_width": qa_result.get("Question_width", 0), |
| "Question_height": qa_result.get("Question_height", 0), |
| } |
|
|
| question_row = { |
| "image": image_name, |
| "page": page_num, |
| "label": f"Question {i+1}", |
| "color": "(0,0,255)", |
| "xmin": question_bbox["Question_left"], |
| "xmax": question_bbox["Question_left"] + question_bbox["Question_width"], |
| "ymin": question_bbox["Question_top"], |
| "ymax": question_bbox["Question_top"] + question_bbox["Question_height"], |
| "text": qa_result.get("Question", ""), |
| "id": None, |
| } |
|
|
| |
| answer_bbox = { |
| "Answer_left": qa_result.get("Answer_left", 0), |
| "Answer_top": qa_result.get("Answer_top", 0), |
| "Answer_width": qa_result.get("Answer_width", 0), |
| "Answer_height": qa_result.get("Answer_height", 0), |
| } |
|
|
| answer_row = { |
| "image": image_name, |
| "page": page_num, |
| "label": f"Answer {i+1}", |
| "color": "(0,255,0)", |
| "xmin": answer_bbox["Answer_left"], |
| "xmax": answer_bbox["Answer_left"] + answer_bbox["Answer_width"], |
| "ymin": answer_bbox["Answer_top"], |
| "ymax": answer_bbox["Answer_top"] + answer_bbox["Answer_height"], |
| "text": qa_result.get("Answer", ""), |
| "id": None, |
| } |
|
|
| rows.extend([question_row, answer_row]) |
|
|
| |
| num_ids_needed = len(rows) |
| unique_ids = _generate_unique_ids(num_ids_needed, existing_ids) |
|
|
| |
| for i, row in enumerate(rows): |
| row["id"] = unique_ids[i] |
|
|
| |
| df = pd.DataFrame(rows) |
|
|
| |
| required_columns = [ |
| "image", |
| "page", |
| "label", |
| "color", |
| "xmin", |
| "xmax", |
| "ymin", |
| "ymax", |
| "text", |
| "id", |
| ] |
| for col in required_columns: |
| if col not in df.columns: |
| df[col] = pd.NA |
|
|
| |
| df = df.reindex(columns=required_columns, fill_value=pd.NA) |
|
|
| return df |
|
|
|
|
| def convert_question_answer_to_annotation_json( |
| question_answer_results: List[Dict[str, Any]], page_sizes_df: pd.DataFrame |
| ) -> List[Dict]: |
| """ |
| Convert question-answer results directly to Gradio Annotation JSON format. |
| |
| This function combines the functionality of convert_question_answer_to_dataframe |
| and convert_review_df_to_annotation_json to directly convert question-answer |
| results to the annotation JSON format without the intermediate DataFrame step. |
| |
| Args: |
| question_answer_results: List of question-answer dictionaries from _create_question_answer_results_object |
| page_sizes_df: DataFrame containing page sizes with columns ['page', 'image_path', 'image_width', 'image_height'] |
| |
| Returns: |
| List of dictionaries suitable for Gradio Annotation output, one dict per image/page. |
| Each dict has structure: {"image": image_path, "boxes": [list of annotation boxes]} |
| """ |
|
|
| if not question_answer_results: |
| |
| json_data = list() |
| for _, row in page_sizes_df.iterrows(): |
| json_data.append( |
| { |
| "image": row.get( |
| "image_path", f"placeholder_image_{row.get('page', 1)}.png" |
| ), |
| "boxes": [], |
| } |
| ) |
| return json_data |
|
|
| |
| required_ps_cols = {"page", "image_path", "image_width", "image_height"} |
| if not required_ps_cols.issubset(page_sizes_df.columns): |
| missing = required_ps_cols - set(page_sizes_df.columns) |
| raise ValueError(f"page_sizes_df is missing required columns: {missing}") |
|
|
| |
| page_sizes_df = page_sizes_df.copy() |
| page_sizes_df["page"] = pd.to_numeric(page_sizes_df["page"], errors="coerce") |
| page_sizes_df["image_width"] = pd.to_numeric( |
| page_sizes_df["image_width"], errors="coerce" |
| ) |
| page_sizes_df["image_height"] = pd.to_numeric( |
| page_sizes_df["image_height"], errors="coerce" |
| ) |
| page_sizes_df["page"] = page_sizes_df["page"].astype("Int64") |
|
|
| |
| rows = list() |
| existing_ids = set() |
|
|
| for i, qa_result in enumerate(question_answer_results): |
| page_num = int(qa_result.get("Page", 1)) |
|
|
| |
| page_row = page_sizes_df[page_sizes_df["page"] == page_num] |
| if not page_row.empty: |
| page_row["image_path"].iloc[0] |
| else: |
| pass |
|
|
| |
| question_bbox = { |
| "Question_left": qa_result.get("Question_left", 0), |
| "Question_top": qa_result.get("Question_top", 0), |
| "Question_width": qa_result.get("Question_width", 0), |
| "Question_height": qa_result.get("Question_height", 0), |
| } |
|
|
| question_box = { |
| "label": f"Question {i+1}", |
| "color": (0, 0, 255), |
| "xmin": question_bbox["Question_left"], |
| "xmax": question_bbox["Question_left"] + question_bbox["Question_width"], |
| "ymin": question_bbox["Question_top"], |
| "ymax": question_bbox["Question_top"] + question_bbox["Question_height"], |
| "text": qa_result.get("Question", ""), |
| "id": None, |
| } |
|
|
| |
| answer_bbox = { |
| "Answer_left": qa_result.get("Answer_left", 0), |
| "Answer_top": qa_result.get("Answer_top", 0), |
| "Answer_width": qa_result.get("Answer_width", 0), |
| "Answer_height": qa_result.get("Answer_height", 0), |
| } |
|
|
| answer_box = { |
| "label": f"Answer {i+1}", |
| "color": (0, 255, 0), |
| "xmin": answer_bbox["Answer_left"], |
| "xmax": answer_bbox["Answer_left"] + answer_bbox["Answer_width"], |
| "ymin": answer_bbox["Answer_top"], |
| "ymax": answer_bbox["Answer_top"] + answer_bbox["Answer_height"], |
| "text": qa_result.get("Answer", ""), |
| "id": None, |
| } |
|
|
| rows.extend([(page_num, question_box), (page_num, answer_box)]) |
|
|
| |
| num_ids_needed = len(rows) |
| unique_ids = _generate_unique_ids(num_ids_needed, existing_ids) |
|
|
| |
| for i, (page_num, box) in enumerate(rows): |
| box["id"] = unique_ids[i] |
| rows[i] = (page_num, box) |
|
|
| |
| boxes_by_page = {} |
| for page_num, box in rows: |
| if page_num not in boxes_by_page: |
| boxes_by_page[page_num] = list() |
| boxes_by_page[page_num].append(box) |
|
|
| |
| json_data = list() |
| for _, row in page_sizes_df.iterrows(): |
| page_num = row["page"] |
| pdf_image_path = row["image_path"] |
|
|
| |
| annotation_boxes = boxes_by_page.get(page_num, []) |
|
|
| |
| json_data.append({"image": pdf_image_path, "boxes": annotation_boxes}) |
|
|
| return json_data |
|
|
|
|
| def convert_page_question_answer_to_custom_image_recognizer_results( |
| question_answer_results: List[Dict[str, Any]], |
| page_sizes_df: pd.DataFrame, |
| reported_page_number: int, |
| ) -> List["CustomImageRecognizerResult"]: |
| """ |
| Convert question-answer results to a list of CustomImageRecognizerResult objects. |
| |
| Args: |
| question_answer_results: List of question-answer dictionaries from _create_question_answer_results_object |
| page_sizes_df: DataFrame containing page sizes with columns ['page', 'image_path', 'image_width', 'image_height'] |
| reported_page_number: The page number reported by the user |
| Returns: |
| List of CustomImageRecognizerResult objects for questions and answers |
| """ |
| from tools.custom_image_analyser_engine import CustomImageRecognizerResult |
|
|
| if not question_answer_results: |
| return list() |
|
|
| results = list() |
|
|
| |
| page_sizes_df["page"] = pd.to_numeric(page_sizes_df["page"], errors="coerce") |
| page_sizes_df.dropna(subset=["page"], inplace=True) |
| if not page_sizes_df.empty: |
| page_sizes_df["page"] = page_sizes_df["page"].astype(int) |
| else: |
| print("Warning: Page sizes DataFrame became empty after processing.") |
| return list() |
|
|
| page_row = page_sizes_df.loc[page_sizes_df["page"] == int(reported_page_number)] |
|
|
| if page_row.empty: |
| print( |
| f"Warning: Page {reported_page_number} not found in page_sizes_df. Skipping this entry." |
| ) |
| return list() |
|
|
| for i, qa_result in enumerate(question_answer_results): |
| current_page = int(qa_result.get("Page", 1)) |
|
|
| if current_page != int(reported_page_number): |
| continue |
|
|
| |
| |
| |
| |
| try: |
| if "image_width" in page_sizes_df.columns: |
| image_width_val = page_row["image_width"].iloc[0] |
| if pd.notna(image_width_val) and image_width_val > 0: |
| image_width = image_width_val |
| else: |
| image_width = page_row["mediabox_width"].iloc[0] |
| else: |
| image_width = page_row["mediabox_width"].iloc[0] |
| except (KeyError, IndexError): |
| image_width = page_row["mediabox_width"].iloc[0] |
|
|
| try: |
| if "image_height" in page_sizes_df.columns: |
| image_height_val = page_row["image_height"].iloc[0] |
| if pd.notna(image_height_val) and image_height_val > 0: |
| image_height = image_height_val |
| else: |
| image_height = page_row["mediabox_height"].iloc[0] |
| else: |
| image_height = page_row["mediabox_height"].iloc[0] |
| except (KeyError, IndexError): |
| image_height = page_row["mediabox_height"].iloc[0] |
|
|
| |
| question_text = qa_result.get("Question", "") |
| answer_text = qa_result.get("Answer", "") |
|
|
| |
| question_score = float(qa_result.get("'Confidence Score % (Question)'", 0.0)) |
| answer_score = float(qa_result.get("'Confidence Score % (Answer)'", 0.0)) |
|
|
| |
| question_bbox = { |
| "left": qa_result.get("Question_left", 0) * image_width, |
| "top": qa_result.get("Question_top", 0) * image_height, |
| "width": qa_result.get("Question_width", 0) * image_width, |
| "height": qa_result.get("Question_height", 0) * image_height, |
| } |
|
|
| question_result = CustomImageRecognizerResult( |
| entity_type=f"QUESTION {i+1}", |
| start=0, |
| end=len(question_text), |
| score=question_score, |
| left=float(question_bbox.get("left", 0)), |
| top=float(question_bbox.get("top", 0)), |
| width=float(question_bbox.get("width", 0)), |
| height=float(question_bbox.get("height", 0)), |
| text=question_text, |
| color=(0, 0, 255), |
| ) |
| results.append(question_result) |
|
|
| |
| answer_bbox = { |
| "left": qa_result.get("Answer_left", 0) * image_width, |
| "top": qa_result.get("Answer_top", 0) * image_height, |
| "width": qa_result.get("Answer_width", 0) * image_width, |
| "height": qa_result.get("Answer_height", 0) * image_height, |
| } |
|
|
| answer_result = CustomImageRecognizerResult( |
| entity_type=f"ANSWER {i+1}", |
| start=0, |
| end=len(answer_text), |
| score=answer_score, |
| left=float(answer_bbox.get("left", 0)), |
| top=float(answer_bbox.get("top", 0)), |
| width=float(answer_bbox.get("width", 0)), |
| height=float(answer_bbox.get("height", 0)), |
| text=answer_text, |
| color=(0, 255, 0), |
| ) |
| results.append(answer_result) |
|
|
| return results |
|
|