Unverified Commit fd02818c authored by Maxime Lefrançois's avatar Maxime Lefrançois
Browse files

run and paragraph field replacement

parent e18c7d0d
Loading
Loading
Loading
Loading
+297 −125
Original line number Diff line number Diff line
import re
import docx.text
import docx.text.paragraph
import docx.enum.style
import docx.enum.text
import docx.styles
import docx.styles.style
import docx.enum
from docx import Document
from docx.shared import Inches , Pt, Cm
# Standard library
import os
import requests
from bs4 import BeautifulSoup
import re
from pathlib import Path
from functools import cached_property

# Third-party libraries
from bs4 import BeautifulSoup
from markdown import markdown
from git import TYPE_CHECKING
from rdflib import OWL, URIRef

# python-docx
from docx import Document
from docx.shared import Pt, Cm
from docx.opc.constants import RELATIONSHIP_TYPE as RT
from docx.text.paragraph import Paragraph
from docx.text.hyperlink import Hyperlink
from docx.text.run import Run
from docx.oxml import OxmlElement
from docx.oxml.ns import qn
import docx.enum.style
import docx.enum.text
import docx.styles
import docx.styles.style
from copy import copy, deepcopy

# Custom / local modules
from saref_pypeline.docgen.utils import OWL_GRAPH, EntityDescription
from saref_pypeline.entities import SAREFProjectVersion
from saref_pypeline.entities import SAREFCore, SAREFPatterns, SAREFProject, SAREFProjectVersion

import logging
logger = logging.getLogger(__name__)

if TYPE_CHECKING:
    from saref_pypeline.docgen import SiteManager

def get_saref_work_item(saref_doc: str, version: str):
    url = f"https://portal.etsi.org/webapp/WorkProgram/Frame_WorkItemList.asp?qTITLE={saref_doc}"
    response = requests.get(url)
    response.raise_for_status()
    
    soup = BeautifulSoup(response.content, "html.parser")
    
    # Find all work item rows (they are <tr> containing 'Ref.' and 'Ver.')
    for row in soup.find_all("tr"):
        cells = row.find_all("td")
        if len(cells) < 3:
            continue

        # Try to get the version and ref from the 2nd column
        col_text = cells[1].get_text(separator=" ", strip=True)
        version_match = re.search(r"Ver\.\s*(\d+\.\d+\.\d+)", col_text)
        ref_match = re.search(r"Ref\.\s*([A-Z]+/SmartM2M-\d+-\w+)", col_text)

        if not version_match or not ref_match:
            continue

        found_version = version_match.group(1)
        if found_version != version.lstrip("V"):
            continue

        ref = ref_match.group(1)

        # Extract Work Item ID from link href
        wk_id_link = cells[1].find("a", href=True)
        wk_id_match = re.search(r"WKI_ID=(\d+)", wk_id_link["href"]) if wk_id_link else None
        wk_id = int(wk_id_match.group(1)) if wk_id_match else None

        # Extract title and part from 3rd column
        title_lines = [line.strip() for line in cells[2].stripped_strings]
        title = " ".join(line for line in title_lines if not line.startswith("D2.") and not line.startswith("SAREF4"))
        part_match = re.search(r"Part\s+(\d+)", title)
        part = int(part_match.group(1)) if part_match else None

        return {
            "ref": ref,
            "title": title,
            "part": part,
            "wk_id": wk_id
        }

    raise ValueError(f"No matching work item found for document '{saref_doc}' and version '{version}'")

# {'ref': 'DTS/SmartM2M-103410-7', 'title': 'SmartM2M; Extension to SAREF; Part 7: Automotive Domain', 'part': 7, 'wk_id': 51402}

def fetch_work_item_details(wk_id: int) -> dict:
    url = f"https://portal.etsi.org/webapp/WorkProgram/Report_WorkItem.asp?WKI_ID={wk_id}"
    resp = requests.get(url)
    resp.raise_for_status()
    soup = BeautifulSoup(resp.content, "html.parser")

    details = {}

    # 1. ETSI Doc. Number — it's in a table row with header "ETSI Doc.  Number"
    header_cells = soup.find_all("td", class_="RowHead")
    for hdr in header_cells:
        txt = hdr.get_text(strip=True)
        if "ETSI Doc." in txt:
            val_td = list(hdr.parent.next_sibling.next_sibling.children)[5] # YUCK !
            if val_td:
                details["ETSI Doc. Number"] = val_td.get_text(strip=True)

        if "Cover Date" in txt:
            val_td = list(hdr.parent.next_sibling.next_sibling.children)[7]
            if val_td:
                details["Date"] = val_td.get_text(strip=True)

    # 2. Keywords — find the <td> under the Keywords header row
    # locate the row whose second <td> has header "Keywords"
    table = soup.find("table", class_="Table")
    if table:
        rows = table.find_all("tr")
        for row in rows:
            tds = row.find_all("td")
            if len(tds) >= 2 and "Keywords" in tds[1].get_text(strip=True):
                # next row holds keywords in same column position
                next_row = row.find_next_sibling("tr")
                if next_row:
                    kw_td = next_row.find_all("td")[1]
                    # keywords separated by <br>
                    keywords = [kw.strip() for kw in kw_td.stripped_strings]
                    details["Keywords"] = "; ".join(keywords)
                break

    if "ETSI Doc. Number" not in details:
        raise ValueError("Unable to extract ETSI Doc. Number")
    if "Keywords" not in details:
        details["Keywords"] = ""

    return details

# {'ETSI Doc. Number': 'TS 103 410-7', 'Keywords': 'IoT; oneM2M; ontology; SAREF; Semantic; TRANSPORT'}

def add_seq_field(paragraph:Paragraph, seq_name:str="Figure", seq_format:str=None, placeholder:str=""):
    run = paragraph.add_run()

    # FIELD START
    fldChar_begin = OxmlElement('w:fldChar')
    fldChar_begin.set(qn('w:fldCharType'), 'begin')
    run._r.append(fldChar_begin)

    # INSTRUCTION TEXT
    instr = OxmlElement('w:instrText')
    instr.set(qn('xml:space'), 'preserve')
    instr.text = f'SEQ {seq_name} \\* {seq_format}' if seq_format else f'SEQ {seq_name}'
    run._r.append(instr)

    # FIELD SEPARATOR (optional, controls visible text)
    fldChar_separate = OxmlElement('w:fldChar')
    fldChar_separate.set(qn('w:fldCharType'), 'separate')
    run._r.append(fldChar_separate)

    # OPTIONAL: Add placeholder value that Word will update
    text_elem = OxmlElement('w:t')
    text_elem.text = placeholder  # initial placeholder
    run._r.append(text_elem)

    # FIELD END
    fldChar_end = OxmlElement('w:fldChar')
    fldChar_end.set(qn('w:fldCharType'), 'end')
    run._r.append(fldChar_end)

def add_hyperlink(paragraph: Paragraph, url: str, text: str):
    # Create the relationship in the document for the hyperlink
    part = paragraph.part
    r_id = part.relate_to(url, RT.HYPERLINK, is_external=True)

    # Create the <w:hyperlink> element with relationship ID
    hyperlink = OxmlElement('w:hyperlink')
    hyperlink.set(qn('r:id'), r_id)

    # Create a run with hyperlink style
    new_run = OxmlElement('w:r')
    rPr = OxmlElement('w:rPr')

    # Style: blue text + underlined
    color = OxmlElement('w:color')
    color.set(qn('w:val'), '0000FF')
    rPr.append(color)

    underline = OxmlElement('w:u')
    underline.set(qn('w:val'), 'single')
    rPr.append(underline)

    new_run.append(rPr)

    # Add the hyperlink text
    text_elem = OxmlElement('w:t')
    text_elem.text = text
    new_run.append(text_elem)

    # Assemble and append
    hyperlink.append(new_run)
    paragraph._element.append(hyperlink)
            
def add_internal_hyperlink(paragraph: Paragraph, anchor_name: str, text: str):
    # Create the <w:hyperlink> element with the anchor
    hyperlink = OxmlElement('w:hyperlink')
    hyperlink.set(qn('w:anchor'), anchor_name)
    hyperlink.set(qn('w:history'), '1')

    # Create run and its properties
    run = OxmlElement('w:r')
    rPr = OxmlElement('w:rPr')

    # Style the link (blue + underlined)
    color = OxmlElement('w:color')
    color.set(qn('w:val'), '0000FF')
    rPr.append(color)

    u = OxmlElement('w:u')
    u.set(qn('w:val'), 'single')
    rPr.append(u)

    run.append(rPr)

    # Add the text
    t = OxmlElement('w:t')
    t.text = text
    run.append(t)

    hyperlink.append(run)
    paragraph._element.append(hyperlink)

def find_max_bookmark_id(doc: Document) -> int:
    max_id = 0
    for bookmark in doc.element.xpath('//w:bookmarkStart'):
        bookmark_id = int(bookmark.get(qn('w:id')))
        if bookmark_id > max_id:
            max_id = bookmark_id
    return max_id

def append_bookmark_start(paragraph, bookmark_name, bookmark_id=0):
    bookmark_start = OxmlElement('w:bookmarkStart')
    bookmark_start.set(qn('w:id'), str(bookmark_id))
    bookmark_start.set(qn('w:name'), bookmark_name)
    paragraph._element.append(bookmark_start)

def append_bookmark_end(paragraph, bookmark_id=0):
    bookmark_end = OxmlElement('w:bookmarkEnd')
    bookmark_end.set(qn('w:id'), str(bookmark_id))
    paragraph._element.append(bookmark_end)

class DOCXDocumentationGenerator:

@@ -140,15 +165,157 @@ class DOCXDocumentationGenerator:
        self._description(OWL.bottomDataProperty, OWL_GRAPH)
        self._description(OWL.topDataProperty, OWL_GRAPH)

    def render_document(self) -> Document:
        self.document = Document(os.path.join(os.path.dirname(__file__), '../resources/docgen/stub.docx'))

    def add_styles(self):
        self.code_title = self.document.styles.add_style("Consolas_title", docx.enum.style.WD_STYLE_TYPE.CHARACTER) #type: docx.styles.style.CharacterStyle
        self.code_title.font.name = "Consolas"

        code = self.document.styles.add_style("Consolas", docx.enum.style.WD_STYLE_TYPE.CHARACTER) #type: docx.styles.style.CharacterStyle
        code.font.name = "Consolas"
        code.font.size = Pt(9)
        self.code = self.document.styles.add_style("Consolas", docx.enum.style.WD_STYLE_TYPE.CHARACTER) #type: docx.styles.style.CharacterStyle
        self.code.font.name = "Consolas"
        self.code.font.size = Pt(9)

    def get_context(self):
        # manage corner cases
        search_doc_nb = None
        if self.project == SAREFCore:
            search_doc_nb = "TS 103 264"
        elif self.project == SAREFPatterns:
            search_doc_nb = "TS 103 548"
        elif self.project == SAREFProject("SAREF4WATR") and str(self.version)=="v1.1.1":
            search_doc_nb = "TS 103 410-10"

        context = None
        for project_metadata in self.pipeline.projects_metadata:
            if search_doc_nb:
                # search doc number explicitly
                if search_doc_nb == project_metadata["doc_nb"]:
                    context = project_metadata
                    if str(self.version) == f"v{project_metadata['version']}":
                        # found the perfect version, stop here
                        break
            else:
                # search SAREF4ABCD in short_title
                if self.project.name in project_metadata["short_title"]:
                    context = project_metadata
                    if str(self.version) == f"v{project_metadata['version']}":
                        # found the perfect version, stop here
                        break
        return context

    def insert_text(self, text:str, paragraph:Paragraph, base_run:Run=None, index:int=None) -> str:
        run = deepcopy(base_run)
        if index == None:
            paragraph._element.append(run._r)
            index = len(paragraph._element)
        else:
            paragraph._element.insert(index, run._r)
            index += 1
        run.text = text
        return paragraph, index

    def insert_field(self, field:str, paragraph:Paragraph, base_run:Run=None, index:int=None) -> str:
        """insert value of field at index in paragraph, in replacement of run"""
        if field in self.context and isinstance(self.context[field], str):
            text = self.context[field]
            return self.insert_text(text, paragraph, base_run, index)
        elif hasattr(self, field) and callable(getattr(self, field)):
            result = getattr(self, field)(paragraph, base_run, index)
            if isinstance(result, str):
                return self.insert_text(result, paragraph, base_run, index)
            else:
                return result
        else:
            logger.error(f"Field {field} not implemented")
            return paragraph, index

    def pub_date_ym(self, paragraph:Paragraph, base_run:Run=None, index:int=None):
        return self.context["pub_date"][:-3]

    @cached_property
    def references(self):
        file_name = "references"
        if file_name and (path:=Path(self.project.directory, self.project_version.ontology.doc_folder, f"{file_name}.html")).exists():
            value = path.read_text()
        elif file_name and (path:=Path(self.project.directory, self.project_version.ontology.doc_folder, f"{file_name}.md")).exists():
            value = markdown(path.read_text(), extensions=["extra", "codehilite"])
        return BeautifulSoup(value, "html.parser")


    def normative_references(self, paragraph:Paragraph, base_run:Run=None, index:int=None):
        original_paragraph = paragraph
        soup = self.references
        ul = soup.find_all("ul")[0]
        for li in ul.find_all("li"):
            a = li.find("a")
            if a.text == "[0]":
                continue

            new_p = OxmlElement('w:p')
            paragraph._element.addnext(new_p)
            new_paragraph = Paragraph(new_p, paragraph._parent)
            if paragraph.style:
                new_paragraph.style = paragraph.style

            for child in list(li.children):
                if child.name == 'a' and child.get("id"):
                    placeholder = child.get("id")[1:-1]
                    new_paragraph.add_run("[")
                    self.current_bookmark_id += 1
                    append_bookmark_start(new_paragraph, f"REF_{child.text[1:-1]}", self.current_bookmark_id)
                    add_seq_field(new_paragraph, "REF", placeholder=placeholder)
                    append_bookmark_end(new_paragraph, self.current_bookmark_id)
                    new_paragraph.add_run("]")
                    new_paragraph.add_run("\t")
                elif child.name == 'a' and child.get('href'):
                    add_hyperlink(new_paragraph, child.get('href'), child.text)
                elif child.string and child.string.strip():
                    new_paragraph.add_run(child.string)
            paragraph = new_paragraph
        # Remove the original paragraph
        original_paragraph._element.getparent().remove(original_paragraph._element)
        return paragraph, len(paragraph._p)

    def replace_fields(self, paragraph:Paragraph):
        in_field = False
        field_parts = []
        original_paragraph = paragraph
        for run in paragraph.runs:
            if paragraph == original_paragraph:
                index = run._r.getparent().index(run._r)
            if not in_field and not "{{" in run.text and not "}}" in run.text:
                if paragraph != original_paragraph:
                    paragraph._p.append(run._r)
                    index += 1
                continue
            run._r.getparent().remove(run._r)
            for text in re.split(r"(\{\{|\}\})", run.text):
                if in_field and text == "{{":
                    raise NotImplementedError(f"cannot embed field in field: {run.text}")
                if not in_field and text == "}}":
                    raise Exception(f"no field to close: {run.text}")
                if text == "":
                    continue
                elif text == "{{":
                    in_field = True
                elif text == "}}":
                    field = "".join(field_parts)
                    paragraph, index = self.insert_field(field, paragraph, run, index)
                    in_field = False
                    field_parts = []
                else:
                    if in_field:
                        field_parts.append(text.strip())
                    else:
                        paragraph, index = self.insert_text(text, paragraph, run, index)
                

    def render_document(self) -> Document:
        self.document = Document(os.path.join(os.path.dirname(__file__), '../resources/docgen/stub.docx'))
        self.add_styles()
        self.context = self.get_context()
        self.current_bookmark_id = find_max_bookmark_id(self.document)

        for p in self.document.paragraphs:
            self.replace_fields(p)

        # current heading number
        self.n = [4]
@@ -156,17 +323,22 @@ class DOCXDocumentationGenerator:
        self.add_heading("Hello docx")

        p = self.document.add_paragraph()
        p.add_run("hi code", style=code).font.bold = True
        p.add_run("hi code", style=self.code).font.bold = True
        p.add_run(" hi text ").font.bold = False

        # todo:
        # find official work item on the etsi portal
        # example https://portal.etsi.org/webapp/WorkProgram/SimpleSearch/QueryForm.asp
        # https://portal.etsi.org/webapp/WorkProgram/Frame_WorkItemList.asp?qTITLE=saref4envi
        # https://portal.etsi.org/webapp/WorkProgram/Report_WorkItem.asp?WKI_ID=63058
        # 
        # parse page and extract 

        p = self.document.add_paragraph()
        add_seq_field(p, seq_name = "toto", placeholder="1")
        p.add_run(" ")
        add_seq_field(p, seq_name = "toto", placeholder="1")
        p.add_run(" ")
        add_seq_field(p, seq_name = "toto", placeholder="1")
        p.add_run(" ")
        add_seq_field(p, seq_name = "toto", placeholder="1")
        p.add_run(" ")
        add_seq_field(p, seq_name = "toto", placeholder="1")
        p = self.document.add_paragraph()
        add_hyperlink(p, "https://google.com", "hi google")
        p = self.document.add_paragraph()
        
        return self.document

+5 −2
Original line number Diff line number Diff line
@@ -120,8 +120,11 @@ class SiteManager:
        target_dir = os.path.join(self.site_dir, project.path, str(version))
        docxgen = DOCXDocumentationGenerator(self, project_version)
        document = docxgen.render_document()
        document.save(os.path.join(target_dir, project_version.ontology.name+".docx"))
        os.system(f'cmd.exe /C start "{target_dir}/{project_version.ontology.name}.docx""')
        from datetime import datetime
        now = datetime.now()
        time_formatted = now.strftime("%H_%M_%S")
        document.save(os.path.join(target_dir, f"{project_version.ontology.name}_{time_formatted}.docx"))
        os.system(f'cmd.exe /C start "{target_dir}/{project_version.ontology.name}_{time_formatted}.docx"')

    def generate_htaccess(self):
      htaccess_path = os.path.join(self.site_dir, ".htaccess")
+6 −0
Original line number Diff line number Diff line
@@ -436,6 +436,12 @@ class SAREFProjectVersion:
        self._branch_name = f"{branch_type}-{version}"
        self._version = version if isinstance(version, SAREFVersionName | None) else SAREFVersionName(version)

        self._doc_nb:str=None
        self._work_item_id:int=None
        self._work_item_reference:str=None
        self._publication_date:str=None
        self._title:str=None
        self._keywords:List[str]=[]
        self._ontology:Optional[SAREFGraphDocument] = None
        self._examples:Dict[str,SAREFGraphDocument] = dict()
        self._vocabularies:Dict[str,SAREFGraphDocument] = dict()
+96 −0
Original line number Diff line number Diff line
import requests
from bs4 import BeautifulSoup
import re
import logging
from saref_pypeline._logging import TRACE_LEVEL

logger = logging.getLogger(__name__)

def _fetch_keywords(ref, wk_id) -> str:
    logger.log(TRACE_LEVEL, f"fetch for {ref}")
    url = f"https://portal.etsi.org/webapp/WorkProgram/Report_WorkItem.asp?WKI_ID={wk_id}"
    resp = requests.get(url)
    try:
        resp.raise_for_status()
        soup = BeautifulSoup(resp.content, "html.parser")
        table = soup.find("table", class_="Table")
        if table:
            for row in table.find_all("tr"):
                for i, td in enumerate(row.find_all("td")):
                    if "Keywords" in td.get_text(strip=True):
                        # next row holds keywords in same column position
                        kw_row = row.find_next_sibling("tr")
                        kw_td = kw_row.find_all("td")[i]
                        keywords = [kw.strip() for kw in kw_td.stripped_strings]
                        return ", ".join(keywords)
    except Exception as e:
        logger.warning(f"Exception while fetching keywords for {ref} {wk_id}: {e}")
    
def fetch_metadata() -> list:
    result = list()
    for search in ["103264", "103548", "103410"]:
        url = f"https://portal.etsi.org/webapp/WorkProgram/Frame_WorkItemList.asp?qETSI_NUMBER={search}&optDisplay=1000"
        response = requests.get(url)
        response.raise_for_status()

        soup = BeautifulSoup(response.content, "html.parser")

        # Find all work item rows (they are <tr> containing 'Ref.' and 'Ver.')
        for row in soup.find_all("tr"):
            cells = row.find_all("td")
            if len(cells) != 4:
                continue

            # Check status is Drafting or Published
            status = cells[3].get_text(separator=" ", strip=True)
            if not "Drafting" in status and not "Published" in status:
                continue 

            pub_date_match = re.search(r"Publication\s+\(([0-9]{4}-[0-9]{2}-[0-9]{2})\)", status)
            pub_date = pub_date_match.group(1) if pub_date_match else None
                
            # Extract version, ref, Work Item ID
            try:
                col_text = cells[1].get_text(separator=" ", strip=True)
                doc_nb = re.search(r"Doc\.\ Nb\.\s*(TS\ [0-9]{3}\ [0-9]{3}(-[0-9]+)?)", col_text).group(1)
                version_match = re.search(r"Ver\.\s*(\d+\.\d+\.\d+)", col_text)
                version = version_match.group(1) if version_match else None
                ref = re.search(r"Ref\.\s*([A-Z]+/[\w-]+)", col_text).group(1)
                wk_id = int(re.search(r"WKI_ID=(\d+)", cells[1].find("a", href=True)["href"]).group(1))
            except:
                continue
            
            # Extract title and part from 3rd column
            title_lines = list([line.strip() for line in cells[2].stripped_strings])
            short_title = title_lines[-1]
            title1 = title_lines[0] if len(title_lines) >= 2 else None
            title2 = title_lines[1] if len(title_lines) >= 3 else None
            title3 = title_lines[2] if len(title_lines) >= 4 else None
            try:
                part = int(re.search(r"Part\s+(\d+)", title3).group(1))
            except:
                part = None

            # fetch keywords
            keywords = _fetch_keywords(ref, wk_id)

            result.append({
                "doc_nb": doc_nb,
                "ref": ref,
                "version": version,
                "pub_date": pub_date,
                "wk_id": wk_id,
                "short_title": short_title,
                "title1": title1,
                "title2": title2,
                "title3": title3,
                "part": part,
                "keywords": keywords,
            })
    return result

if __name__ == "__main__":
    details = fetch_metadata()
    import pprint
    pprint.pprint(details)
    
 No newline at end of file
+16 −9

File changed.

Preview size limit exceeded, changes collapsed.

Loading