NEKReport/document_processor(1).py

155 lines
5.8 KiB
Python

from pypdf import PdfReader
from docx import Document
import openpyxl
from typing import Union
import io
import logging
from bs4 import BeautifulSoup
logger = logging.getLogger(__name__)
class DocumentProcessor:
@staticmethod
def _cell_to_text(v) -> str:
if v is None:
return ""
s = str(v).strip()
return s if s else ""
@staticmethod
def _sheet_rows_text_openpyxl(sheet) -> list[str]:
out: list[str] = []
for row in sheet.iter_rows(values_only=True):
vals = [DocumentProcessor._cell_to_text(c) for c in row]
vals = [x for x in vals if x]
if vals:
out.append(" | ".join(vals))
return out
@staticmethod
def _sheet_cols_text_openpyxl(sheet) -> list[str]:
out: list[str] = []
max_col = sheet.max_column or 0
max_row = sheet.max_row or 0
for c in range(1, max_col + 1):
vals: list[str] = []
for r in range(1, max_row + 1):
v = DocumentProcessor._cell_to_text(sheet.cell(r, c).value)
if v:
vals.append(v)
if vals:
col_letter = openpyxl.utils.get_column_letter(c)
out.append(f"COL {col_letter} TOP_DOWN: " + " || ".join(vals))
out.append(f"COL {col_letter} BOTTOM_UP: " + " || ".join(reversed(vals)))
return out
@staticmethod
def _sheet_rows_text_xlrd(sheet) -> list[str]:
out: list[str] = []
for row_idx in range(sheet.nrows):
row = sheet.row_values(row_idx)
vals = [DocumentProcessor._cell_to_text(c) for c in row]
vals = [x for x in vals if x]
if vals:
out.append(" | ".join(vals))
return out
@staticmethod
def _sheet_cols_text_xlrd(sheet) -> list[str]:
out: list[str] = []
for c in range(sheet.ncols):
vals: list[str] = []
for r in range(sheet.nrows):
v = DocumentProcessor._cell_to_text(sheet.cell_value(r, c))
if v:
vals.append(v)
if vals:
out.append(f"COL {c+1} TOP_DOWN: " + " || ".join(vals))
out.append(f"COL {c+1} BOTTOM_UP: " + " || ".join(reversed(vals)))
return out
def normalize_email_html(html_content: str) -> str:
"""
Очищает HTML письма и нормализует таблицы
"""
soup = BeautifulSoup(html_content, "html.parser")
# удаляем скрипты и стили
for tag in soup(["script", "style"]):
tag.decompose()
# нормализуем таблицы
for table in soup.find_all("table"):
table["style"] = "border-collapse: collapse; width: 100%;"
for cell in table.find_all(["td", "th"]):
cell["style"] = "border:1px solid #ccc;padding:6px;"
return str(soup)
def extract_text(self, content: bytes, filename: str) -> str:
ext = filename.lower().split('.')[-1]
# Изображения не обрабатываем
if ext in ['png', 'jpg', 'jpeg', 'gif', 'bmp', 'tiff', 'webp']:
return ""
if ext == 'pdf':
return self._extract_pdf(content)
elif ext in ['docx', 'doc']:
return self._extract_docx(content)
elif ext in ['xlsx', 'xls']:
return self._extract_excel(content)
elif ext == 'txt':
return content.decode('utf-8')
else:
raise ValueError(f"Unsupported format: {ext}")
def _extract_pdf(self, content: bytes) -> str:
pdf = PdfReader(io.BytesIO(content))
return "\n".join(page.extract_text() for page in pdf.pages)
def _extract_docx(self, content: bytes) -> str:
doc = Document(io.BytesIO(content))
return "\n".join(paragraph.text for paragraph in doc.paragraphs)
def _extract_excel(self, content: bytes) -> str:
try:
# сначала пробуем openpyxl (для .xlsx)
wb = openpyxl.load_workbook(io.BytesIO(content))
text = []
for sheet in wb.worksheets:
text.append(f"=== SHEET: {sheet.title} ===")
# Сохраняем совместимость: построчное чтение.
rows_text = self._sheet_rows_text_openpyxl(sheet)
if rows_text:
text.append("[ROWS]")
text.extend(rows_text)
# Новый режим: чтение по колонкам (общие данные часто внизу столбца).
cols_text = self._sheet_cols_text_openpyxl(sheet)
if cols_text:
text.append("[COLUMNS]")
text.extend(cols_text)
return "\n".join(text)
except Exception as e:
# если не получилось, возможно это .xls, пробуем xlrd
try:
import xlrd
workbook = xlrd.open_workbook(file_contents=content)
text = []
for sheet in workbook.sheets():
text.append(f"=== SHEET: {sheet.name} ===")
rows_text = self._sheet_rows_text_xlrd(sheet)
if rows_text:
text.append("[ROWS]")
text.extend(rows_text)
cols_text = self._sheet_cols_text_xlrd(sheet)
if cols_text:
text.append("[COLUMNS]")
text.extend(cols_text)
return "\n".join(text)
except ImportError:
logger.error("xlrd not installed, cannot parse .xls files")
return ""
except Exception as e2:
logger.error(f"Failed to parse Excel with xlrd: {e2}")
return ""