From 759a32cfc552389b63ddd50abe61f5703c4f3cb7 Mon Sep 17 00:00:00 2001 From: pengkun0129 Date: Thu, 4 Jun 2026 10:13:12 +0000 Subject: [PATCH] =?UTF-8?q?=E4=B8=8A=E4=BC=A0=E6=96=87=E4=BB=B6=E8=87=B3?= =?UTF-8?q?=E3=80=8Cmxpic=5Frouter=E3=80=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- mxpic_router/builder.py | 1644 +++++++++++++++++++++++++-------------- 1 file changed, 1043 insertions(+), 601 deletions(-) diff --git a/mxpic_router/builder.py b/mxpic_router/builder.py index edf19d9..f82adbe 100644 --- a/mxpic_router/builder.py +++ b/mxpic_router/builder.py @@ -1,72 +1,77 @@ -import math -import os -import sys -from typing import Dict, Optional - -import yaml - -from .eda_loader import CellSpec, InstanceSpec, LinkSpec, load_cell_spec -from .technology import apply_technology_manifest, load_technology_manifest - - -def build_project_gds( - project_dir: str, - output_path: str, - pdk_root: str, - technology_manifest_path: Optional[str] = None, - prefer_full_gds: bool = False, - target_cell_name: Optional[str] = None, -) -> dict: - import nazca as nd - - Route = _import_route_backend(nd) - manifest = load_technology_manifest(technology_manifest_path) - apply_technology_manifest(manifest, nd) - - specs = _load_project_specs(project_dir) - if not specs: - raise ValueError("No saved cell YAML files found for this project") - - built_cells = {} - warnings = [] - for spec in _ordered_specs(specs): - built_cells[spec.name] = _build_cell(spec, built_cells, pdk_root, prefer_full_gds, Route, nd, warnings) - - top = _select_top_spec(specs, target_cell_name) - os.makedirs(os.path.dirname(output_path), exist_ok=True) - nd.export_gds(topcells=[built_cells[top.name]], filename=output_path) - return { - "output_path": output_path, - "engine": "mxpic_router", - "cells_built": [spec.name for spec in _ordered_specs(specs)], - "warnings": warnings, - } - - -def _load_project_specs(project_dir: str) -> Dict[str, CellSpec]: - specs = {} - for filename in sorted(os.listdir(project_dir)): - if not filename.lower().endswith((".yml", ".yaml")): - continue - spec = load_cell_spec(os.path.join(project_dir, filename)) - specs[spec.name] = spec - return specs - - -def _ordered_specs(specs: Dict[str, CellSpec]): - composites = [spec for spec in specs.values() if spec.type != "project"] - projects = [spec for spec in specs.values() if spec.type == "project"] - return composites + projects - - -def _select_top_spec(specs: Dict[str, CellSpec], target_cell_name: Optional[str]): - if target_cell_name: - if target_cell_name not in specs: - raise ValueError(f"Target cell not found for routed build: {target_cell_name}") - return specs[target_cell_name] - return _ordered_specs(specs)[-1] - - +import math +import os +import sys +from typing import Dict, Optional + +import yaml + +from .eda_loader import CellSpec, InstanceSpec, LinkSpec, load_cell_spec +from .technology import apply_technology_manifest, load_technology_manifest + + +ROUTE_MIN_SPACING = 10.0 +ROUTE_ENDPOINT_SPACING_IGNORE = ROUTE_MIN_SPACING * 2.0 +ROUTE_SPACING_GEOMETRY_ADJUSTMENT_ENABLED = False + + +def build_project_gds( + project_dir: str, + output_path: str, + pdk_root: str, + technology_manifest_path: Optional[str] = None, + prefer_full_gds: bool = False, + target_cell_name: Optional[str] = None, +) -> dict: + import nazca as nd + + Route = _import_route_backend(nd) + manifest = load_technology_manifest(technology_manifest_path) + apply_technology_manifest(manifest, nd) + + specs = _load_project_specs(project_dir) + if not specs: + raise ValueError("No saved cell YAML files found for this project") + + built_cells = {} + warnings = [] + for spec in _ordered_specs(specs): + built_cells[spec.name] = _build_cell(spec, built_cells, pdk_root, prefer_full_gds, Route, nd, warnings) + + top = _select_top_spec(specs, target_cell_name) + os.makedirs(os.path.dirname(output_path), exist_ok=True) + nd.export_gds(topcells=[built_cells[top.name]], filename=output_path) + return { + "output_path": output_path, + "engine": "mxpic_router", + "cells_built": [spec.name for spec in _ordered_specs(specs)], + "warnings": warnings, + } + + +def _load_project_specs(project_dir: str) -> Dict[str, CellSpec]: + specs = {} + for filename in sorted(os.listdir(project_dir)): + if not filename.lower().endswith((".yml", ".yaml")): + continue + spec = load_cell_spec(os.path.join(project_dir, filename)) + specs[spec.name] = spec + return specs + + +def _ordered_specs(specs: Dict[str, CellSpec]): + composites = [spec for spec in specs.values() if spec.type != "project"] + projects = [spec for spec in specs.values() if spec.type == "project"] + return composites + projects + + +def _select_top_spec(specs: Dict[str, CellSpec], target_cell_name: Optional[str]): + if target_cell_name: + if target_cell_name not in specs: + raise ValueError(f"Target cell not found for routed build: {target_cell_name}") + return specs[target_cell_name] + return _ordered_specs(specs)[-1] + + def _build_cell(spec: CellSpec, built_cells: dict, pdk_root: str, prefer_full_gds: bool, Route, nd, warnings: list): pin_map = {} with nd.Cell(name=spec.name) as top: @@ -86,147 +91,146 @@ def _build_cell(spec: CellSpec, built_cells: dict, pdk_root: str, prefer_full_gd if pin_name != "org": pin_map[(instance_name, pin_name)] = placed.pin[pin_name] continue - - asset = _resolve_pdk_asset(pdk_root, instance.component, prefer_full_gds) + + asset = _resolve_pdk_asset(pdk_root, instance.component, prefer_full_gds) if not asset.get("gds_path"): warnings.append(f"Missing GDS for {instance_name}: {instance.component}") continue loaded = nd.load_gds(asset["gds_path"]) loaded.put(instance.x, instance.y, layout_rotation, flip=instance.flip, flop=instance.flop) _register_metadata_pins(pin_map, instance_name, instance, asset.get("metadata") or {}, nd, pdk_root) - - for pin_name, pin in spec.pins.items(): - pin_out = nd.Pin(pin_name, width=pin.width).put(pin.x, pin.y, pin.angle) - pin_in = pin_out.move(0, 0, 180) - pin_map[("this", pin_name)] = pin_in - pin_map[(pin_name, pin_name)] = pin_in - - for element_name, element in spec.elements.items(): - _register_element_pins(pin_map, element_name, element, nd) - - for bundle in spec.bundles.values(): - for link in bundle.links: - _route_link(link, pin_map, Route, warnings) - return top - - -def _is_basic_component(component: str) -> bool: - return component in {"waveguide", "90 bend", "180 bend", "cricle", "circle", "taper"} - - -def _build_basic_component(instance: InstanceSpec, nd): - settings = instance.settings or {} - component = "cricle" if instance.component == "circle" else instance.component - width = _safe_float(settings.get("width"), _safe_float(settings.get("width1"), 0.5)) or 0.5 - xs = settings.get("xsection") or settings.get("xs") or "strip" - radius = _safe_float(settings.get("radius"), 10) or 10 - length = _safe_float(settings.get("length"), 100) or 100 - name = f"basic_{component.replace(' ', '_')}_{_safe_cell_name(instance.name)}" - with nd.Cell(name=name, instantiate=False) as cell: - if component == "waveguide": - line = nd.strt(length=length, width=width, xs=xs).put(0, 0, 0) - nd.Pin("a1", width=width, xs=xs).put(line.pin["a0"]) - nd.Pin("b1", width=width, xs=xs).put(line.pin["b0"]) - elif component == "90 bend": - bend = nd.bend(radius=radius, width=width, angle=90, xs=xs).put(0, 0, 0) - nd.Pin("a1", width=width, xs=xs).put(bend.pin["a0"]) - nd.Pin("b1", width=width, xs=xs).put(bend.pin["b0"]) - elif component == "180 bend": - bend = nd.bend(radius=radius, width=width, angle=180, xs=xs).put(0, 0, 0) - nd.Pin("a1", width=width, xs=xs).put(bend.pin["a0"]) - nd.Pin("b1", width=width, xs=xs).put(bend.pin["b0"]) - elif component == "cricle": - bend = nd.bend(radius=radius, width=width, angle=360, xs=xs).put(0, 0, 0) - nd.Pin("a1", width=width, xs=xs).put(bend.pin["a0"]) - nd.Pin("b1", width=width, xs=xs).put(bend.pin["b0"]) - elif component == "taper": - width1 = _safe_float(settings.get("width1"), width) or width - width2 = _safe_float(settings.get("width2"), width) or width - taper = nd.taper(length=length, width1=width1, width2=width2, xs=xs).put(0, 0, 0) - nd.Pin("a1", width=width1, xs=xs).put(taper.pin["a0"]) - nd.Pin("b1", width=width2, xs=xs).put(taper.pin["b0"]) - return cell - - + + for pin_name, pin in spec.pins.items(): + pin_out = nd.Pin(pin_name, width=pin.width).put(pin.x, pin.y, pin.angle) + pin_in = pin_out.move(0, 0, 180) + pin_map[("this", pin_name)] = pin_in + pin_map[(pin_name, pin_name)] = pin_in + + for element_name, element in spec.elements.items(): + _register_element_pins(pin_map, element_name, element, nd) + + for bundle in spec.bundles.values(): + _route_bundle_links(bundle.links, pin_map, Route, warnings) + return top + + +def _is_basic_component(component: str) -> bool: + return component in {"waveguide", "90 bend", "180 bend", "cricle", "circle", "taper"} + + +def _build_basic_component(instance: InstanceSpec, nd): + settings = instance.settings or {} + component = "cricle" if instance.component == "circle" else instance.component + width = _safe_float(settings.get("width"), _safe_float(settings.get("width1"), 0.5)) or 0.5 + xs = settings.get("xsection") or settings.get("xs") or "strip" + radius = _safe_float(settings.get("radius"), 10) or 10 + length = _safe_float(settings.get("length"), 100) or 100 + name = f"basic_{component.replace(' ', '_')}_{_safe_cell_name(instance.name)}" + with nd.Cell(name=name, instantiate=False) as cell: + if component == "waveguide": + line = nd.strt(length=length, width=width, xs=xs).put(0, 0, 0) + nd.Pin("a1", width=width, xs=xs).put(line.pin["a0"]) + nd.Pin("b1", width=width, xs=xs).put(line.pin["b0"]) + elif component == "90 bend": + bend = nd.bend(radius=radius, width=width, angle=90, xs=xs).put(0, 0, 0) + nd.Pin("a1", width=width, xs=xs).put(bend.pin["a0"]) + nd.Pin("b1", width=width, xs=xs).put(bend.pin["b0"]) + elif component == "180 bend": + bend = nd.bend(radius=radius, width=width, angle=180, xs=xs).put(0, 0, 0) + nd.Pin("a1", width=width, xs=xs).put(bend.pin["a0"]) + nd.Pin("b1", width=width, xs=xs).put(bend.pin["b0"]) + elif component == "cricle": + bend = nd.bend(radius=radius, width=width, angle=360, xs=xs).put(0, 0, 0) + nd.Pin("a1", width=width, xs=xs).put(bend.pin["a0"]) + nd.Pin("b1", width=width, xs=xs).put(bend.pin["b0"]) + elif component == "taper": + width1 = _safe_float(settings.get("width1"), width) or width + width2 = _safe_float(settings.get("width2"), width) or width + taper = nd.taper(length=length, width1=width1, width2=width2, xs=xs).put(0, 0, 0) + nd.Pin("a1", width=width1, xs=xs).put(taper.pin["a0"]) + nd.Pin("b1", width=width2, xs=xs).put(taper.pin["b0"]) + return cell + + def _register_element_pins(pin_map: dict, element_name: str, element, nd) -> None: element_cell = _build_element_cell(element_name, element, nd) placed = element_cell.put(element.x, element.y, element.angle) - if element.type == "port": - for pin in _port_element_pin_entries(element_name, element): - pin_map[(element_name, pin["name"])] = placed.pin[f'{pin["name"]}_in'] - return - - for pin in _anchor_element_pin_entries(element_name, element): - pin_map[(element_name, pin["name"])] = placed.pin[pin["name"]] - - -def _build_element_cell(element_name: str, element, nd): - width = element.width or 0.5 - port_number = max(1, int(getattr(element, "port_number", 1) or 1)) - pitch = _safe_float(getattr(element, "pitch", 10.0), 10.0) or 10.0 - with nd.Cell(name=f"element_{_safe_cell_name(element_name)}", instantiate=False) as cell: - if element.type == "port": - for index, pin in enumerate(_port_element_pin_entries(element_name, element)): - local_y = 0.0 if port_number == 1 else _element_port_offset(index, port_number, pitch) - pin_out = nd.Pin(pin["name"], width=width).put(0.0, local_y, 180.0) - pin_in = pin_out.move(0, 0, 180) - nd.Pin(f'{pin["name"]}_in', width=width).put(pin_in.x, pin_in.y, pin_in.a) - return cell - - anchor_pins = _anchor_element_pin_entries(element_name, element) - for index in range(port_number): - local_y = -15.0 + _element_port_offset(index, port_number, pitch) - left_pin = anchor_pins[index * 2] - right_pin = anchor_pins[index * 2 + 1] - nd.Pin(left_pin["name"], width=width).put(0.0, local_y, 180.0) - nd.Pin(right_pin["name"], width=width).put(0.0, local_y, 0.0) - return cell - - -def _element_port_offset(index: int, count: int, pitch: float) -> float: - return ((count - 1) / 2 - index) * pitch - - -def _port_element_pin_entries(element_name: str, element) -> list: - count = max(1, int(getattr(element, "port_number", 1) or 1)) - defaults = [{"role": f"io{index + 1}", "name": f"{_safe_cell_name(element_name)}_io{index + 1}"} for index in range(count)] - return _named_pin_entries(defaults, getattr(element, "pins", None)) - - -def _anchor_element_pin_entries(element_name: str, element) -> list: - count = max(1, int(getattr(element, "port_number", 1) or 1)) - defaults = [] - for index in range(count): - number = index + 1 - defaults.append({"role": f"a{number}", "name": f"{_safe_cell_name(element_name)}_a{number}"}) - defaults.append({"role": f"b{number}", "name": f"{_safe_cell_name(element_name)}_b{number}"}) - return _named_pin_entries(defaults, getattr(element, "pins", None)) - - -def _named_pin_entries(defaults: list, overrides) -> list: - by_role = {str(pin.get("role") or ""): str(pin.get("name") or "") for pin in overrides or []} - by_index = [str(pin.get("name") or "") for pin in overrides or []] - entries = [] - for index, default in enumerate(defaults): - role = default["role"] - name = by_role.get(role) or (by_index[index] if index < len(by_index) else "") or default["name"] - entries.append({"role": role, "name": _safe_cell_name(name)}) - return entries - - -def _transform_element_port(local_x: float, local_y: float, angle: float, element) -> tuple: - rotation = math.radians(_safe_float(getattr(element, "angle", 0.0), 0.0) or 0.0) - cos_v = math.cos(rotation) - sin_v = math.sin(rotation) - x = element.x + local_x * cos_v - local_y * sin_v - y = element.y + local_x * sin_v + local_y * cos_v - return x, y, angle - - -def _safe_cell_name(value: str) -> str: - return "".join(ch if ch.isalnum() or ch == "_" else "_" for ch in str(value or "inst")) - - + if element.type == "port": + for pin in _port_element_pin_entries(element_name, element): + pin_map[(element_name, pin["name"])] = placed.pin[f'{pin["name"]}_in'] + return + + for pin in _anchor_element_pin_entries(element_name, element): + pin_map[(element_name, pin["name"])] = placed.pin[pin["name"]] + + +def _build_element_cell(element_name: str, element, nd): + width = element.width or 0.5 + port_number = max(1, int(getattr(element, "port_number", 1) or 1)) + pitch = _safe_float(getattr(element, "pitch", 10.0), 10.0) or 10.0 + with nd.Cell(name=f"element_{_safe_cell_name(element_name)}", instantiate=False) as cell: + if element.type == "port": + for index, pin in enumerate(_port_element_pin_entries(element_name, element)): + local_y = 0.0 if port_number == 1 else _element_port_offset(index, port_number, pitch) + pin_out = nd.Pin(pin["name"], width=width).put(0.0, local_y, 180.0) + pin_in = pin_out.move(0, 0, 180) + nd.Pin(f'{pin["name"]}_in', width=width).put(pin_in.x, pin_in.y, pin_in.a) + return cell + + anchor_pins = _anchor_element_pin_entries(element_name, element) + for index in range(port_number): + local_y = -15.0 + _element_port_offset(index, port_number, pitch) + left_pin = anchor_pins[index * 2] + right_pin = anchor_pins[index * 2 + 1] + nd.Pin(left_pin["name"], width=width).put(0.0, local_y, 180.0) + nd.Pin(right_pin["name"], width=width).put(0.0, local_y, 0.0) + return cell + + +def _element_port_offset(index: int, count: int, pitch: float) -> float: + return ((count - 1) / 2 - index) * pitch + + +def _port_element_pin_entries(element_name: str, element) -> list: + count = max(1, int(getattr(element, "port_number", 1) or 1)) + defaults = [{"role": f"io{index + 1}", "name": f"{_safe_cell_name(element_name)}_io{index + 1}"} for index in range(count)] + return _named_pin_entries(defaults, getattr(element, "pins", None)) + + +def _anchor_element_pin_entries(element_name: str, element) -> list: + count = max(1, int(getattr(element, "port_number", 1) or 1)) + defaults = [] + for index in range(count): + number = index + 1 + defaults.append({"role": f"a{number}", "name": f"{_safe_cell_name(element_name)}_a{number}"}) + defaults.append({"role": f"b{number}", "name": f"{_safe_cell_name(element_name)}_b{number}"}) + return _named_pin_entries(defaults, getattr(element, "pins", None)) + + +def _named_pin_entries(defaults: list, overrides) -> list: + by_role = {str(pin.get("role") or ""): str(pin.get("name") or "") for pin in overrides or []} + by_index = [str(pin.get("name") or "") for pin in overrides or []] + entries = [] + for index, default in enumerate(defaults): + role = default["role"] + name = by_role.get(role) or (by_index[index] if index < len(by_index) else "") or default["name"] + entries.append({"role": role, "name": _safe_cell_name(name)}) + return entries + + +def _transform_element_port(local_x: float, local_y: float, angle: float, element) -> tuple: + rotation = math.radians(_safe_float(getattr(element, "angle", 0.0), 0.0) or 0.0) + cos_v = math.cos(rotation) + sin_v = math.sin(rotation) + x = element.x + local_x * cos_v - local_y * sin_v + y = element.y + local_x * sin_v + local_y * cos_v + return x, y, angle + + +def _safe_cell_name(value: str) -> str: + return "".join(ch if ch.isalnum() or ch == "_" else "_" for ch in str(value or "inst")) + + def _register_metadata_pins(pin_map, instance_name, instance, metadata: dict, nd, pdk_root: str) -> None: layout_rotation = _layout_rotation(instance.rotation) for pin_name, pin in _metadata_pins(metadata, pdk_root).items(): @@ -240,234 +244,671 @@ def _register_metadata_pins(pin_map, instance_name, instance, metadata: dict, nd instance.flip, instance.flop, ) - width = _safe_float(pin.get("width"), 0.5) or 0.5 - pin_map[(instance_name, str(pin_name))] = nd.Pin( - f"{instance_name}_{pin_name}", - width=width, - xs="strip", - ).put(x, y, angle) - - -def _metadata_pins(metadata: dict, pdk_root: str) -> dict: - if not metadata: - return {} - if isinstance(metadata.get("pins"), dict): - return metadata.get("pins") or {} - if isinstance(metadata.get("ports"), dict) and _allows_pdk_ports_as_pins(pdk_root): - return metadata.get("ports") or {} - return {} - - -def _allows_pdk_ports_as_pins(pdk_root: str) -> bool: - normalized = os.path.abspath(str(pdk_root or "")).replace("\\", "/").lower() - return "/opt_pdk_public/" in f"{normalized}/" or "/opt_pdk_atlas/" in f"{normalized}/" - - -def _metal_route_pcb_enabled(xsection: str) -> bool: - normalized = str(xsection or "").strip().lower().replace("-", "_") - return normalized in {"metal_1", "metal1", "metal_2", "metal2"} - - -def _route_link(link: LinkSpec, pin_map: dict, Route, warnings: list) -> None: - route = Route(radius=link.radius or 10, width=link.width, xs=link.xsection, PCB=_metal_route_pcb_enabled(link.xsection)) - p1 = pin_map.get((link.src_inst, link.src_pin)) - p2 = pin_map.get((link.dst_inst, link.dst_pin)) - has_pin_endpoints = bool(link.src_inst or link.src_pin or link.dst_inst or link.dst_pin) - if len(link.points or []) >= 2: - if has_pin_endpoints: - if p1 is None or p2 is None: - warnings.append(f"Missing route pin for {link.src_inst}:{link.src_pin} -> {link.dst_inst}:{link.dst_pin}") - return - points = _route_points_with_pin_endpoints(link.points, p1, p2) - else: - points = link.points - if _route_guided_link(link, route, warnings, points): - return - if p1 is None or p2 is None: - warnings.append(f"Missing route pin for {link.src_inst}:{link.src_pin} -> {link.dst_inst}:{link.dst_pin}") - return - method_name = _route_method_name_for_pins(p1, p2) - route_method = getattr(route, method_name, None) - if route_method is None: - warnings.append(f"Route method {method_name} unavailable; falling back to sbend_p2p") - route_method = route.sbend_p2p - route_method( - pin1=p1, - pin2=p2, - width=link.width, - radius=link.radius or 10, - xs=link.xsection, - arrow=False, - ).put() - -def _route_guided_link(link: LinkSpec, route, warnings: list, points_override=None) -> bool: - route_method = getattr(route, "strt_p2p", None) - if route_method is None: - warnings.append("Manual route points require Route.strt_p2p; falling back to automatic p2p route") - return False - bend_method = getattr(route, "bend_p2p", None) - source_points = points_override if points_override is not None else (link.points or []) - points = [ - {"x": _safe_float(point.get("x"), None), "y": _safe_float(point.get("y"), None)} - for point in source_points - ] - points = [point for point in points if point["x"] is not None and point["y"] is not None] - if len(points) < 2: - return False - if bend_method is None: - warnings.append("Manual route bends require Route.bend_p2p; corners were exported as straight joins") - plan = _guided_route_plan(points, (link.radius or 10) if bend_method is not None else 0) - for straight in plan["straights"]: - route_method( - pin1=(straight["start"]["x"], straight["start"]["y"], straight["angle"]), - pin2=(straight["end"]["x"], straight["end"]["y"], straight["angle"]), - width=link.width, - xs=link.xsection, - arrow=False, - ).put() - if bend_method is None: - return True - for bend in plan["bends"]: - bend_method( - pin1=(bend["start"]["x"], bend["start"]["y"], bend["angle_in"]), - pin2=(bend["end"]["x"], bend["end"]["y"], bend["angle_out"]), - radius=link.radius or 10, - width=link.width, - xs=link.xsection, - arrow=False, - ).put() - return True - - -def _route_points_with_pin_endpoints(points: list, source_pin=None, target_pin=None) -> list: - clean_points = [ - {"x": _safe_float(point.get("x"), None), "y": _safe_float(point.get("y"), None)} - for point in points or [] - if isinstance(point, dict) - ] - clean_points = [point for point in clean_points if point["x"] is not None and point["y"] is not None] - if len(clean_points) < 2: - return clean_points - source_point = _pin_point(source_pin) - target_point = _pin_point(target_pin) - if source_point is not None: - clean_points[0] = source_point - if target_point is not None: - clean_points[-1] = target_point - return clean_points - - -def _pin_point(pin): - for source in (pin, getattr(pin, "pointer", None)): - if source is None: - continue - x = _safe_float(getattr(source, "x", None), None) - y = _safe_float(getattr(source, "y", None), None) - if x is not None and y is not None: - return {"x": float(x), "y": float(y)} - xya = getattr(source, "xya", None) - if callable(xya): - try: - values = xya() - except TypeError: - values = None - if values and len(values) >= 2: - x = _safe_float(values[0], None) - y = _safe_float(values[1], None) - if x is not None and y is not None: - return {"x": float(x), "y": float(y)} - return None - - -def _guided_route_plan(points: list, radius: float) -> dict: - clean_points = [ - {"x": _safe_float(point.get("x"), None), "y": _safe_float(point.get("y"), None)} - for point in points or [] - if isinstance(point, dict) - ] - clean_points = [point for point in clean_points if point["x"] is not None and point["y"] is not None] - if len(clean_points) < 2: - return {"straights": [], "bends": []} - - trim_radius = max(_safe_float(radius, 0.0) or 0.0, 0.0) - corner_trims = {} - bends = [] - for index in range(1, len(clean_points) - 1): - previous_point = clean_points[index - 1] - corner = clean_points[index] - next_point = clean_points[index + 1] - angle_in = _segment_angle(previous_point, corner) - angle_out = _segment_angle(corner, next_point) - if angle_in is None or angle_out is None or angle_in == angle_out: - continue - turn_delta = abs(((angle_out - angle_in + 180) % 360) - 180) - if turn_delta >= 179: - continue - previous_length = _distance(previous_point, corner) - next_length = _distance(corner, next_point) - trim = min(trim_radius, previous_length / 2.0, next_length / 2.0) - if trim <= 1e-9: - continue - before = _point_toward(corner, previous_point, trim) - after = _point_toward(corner, next_point, trim) - corner_trims[index] = {"before": before, "after": after} - bends.append({ - "start": before, - "corner": {"x": float(corner["x"]), "y": float(corner["y"])}, - "end": after, - "angle_in": angle_in, - "angle_out": angle_out, - }) - - straights = [] - for index in range(len(clean_points) - 1): - start = corner_trims.get(index, {}).get("after", clean_points[index]) - end = corner_trims.get(index + 1, {}).get("before", clean_points[index + 1]) - angle = _segment_angle(start, end) - if angle is None or _distance(start, end) <= 1e-9: - continue - straights.append({ - "start": {"x": float(start["x"]), "y": float(start["y"])}, - "end": {"x": float(end["x"]), "y": float(end["y"])}, - "angle": angle, - }) - - return {"straights": straights, "bends": bends} - - -def _distance(point1: dict, point2: dict) -> float: - return math.hypot(point2["x"] - point1["x"], point2["y"] - point1["y"]) - - -def _point_toward(origin: dict, target: dict, distance: float) -> dict: - total = _distance(origin, target) - if total <= 1e-9: - return {"x": float(origin["x"]), "y": float(origin["y"])} - ratio = distance / total - return { - "x": float(origin["x"] + (target["x"] - origin["x"]) * ratio), - "y": float(origin["y"] + (target["y"] - origin["y"]) * ratio), - } - - -def _segment_angle(point1: dict, point2: dict): - dx = point2["x"] - point1["x"] - dy = point2["y"] - point1["y"] - if abs(dx) < 1e-9 and abs(dy) < 1e-9: - return None - if abs(dx) >= abs(dy): - return 0 if dx >= 0 else 180 - return 90 if dy >= 0 else 270 - - -def _pins_have_same_rotation(pin1, pin2, tolerance: float = 1e-6) -> bool: - angle1 = _pin_angle(pin1) - angle2 = _pin_angle(pin2) - if angle1 is None or angle2 is None: - return False - return abs(((angle1 - angle2 + 180) % 360) - 180) <= tolerance - - + width = _safe_float(pin.get("width"), 0.5) or 0.5 + pin_map[(instance_name, str(pin_name))] = nd.Pin( + f"{instance_name}_{pin_name}", + width=width, + xs="strip", + ).put(x, y, angle) + + +def _metadata_pins(metadata: dict, pdk_root: str) -> dict: + if not metadata: + return {} + if isinstance(metadata.get("pins"), dict): + return metadata.get("pins") or {} + if isinstance(metadata.get("ports"), dict) and _allows_pdk_ports_as_pins(pdk_root): + return metadata.get("ports") or {} + return {} + + +def _allows_pdk_ports_as_pins(pdk_root: str) -> bool: + normalized = os.path.abspath(str(pdk_root or "")).replace("\\", "/").lower() + return "/opt_pdk_public/" in f"{normalized}/" or "/opt_pdk_atlas/" in f"{normalized}/" + + +def _metal_route_pcb_enabled(xsection: str) -> bool: + normalized = str(xsection or "").strip().lower().replace("-", "_") + return normalized in {"metal_1", "metal1", "metal_2", "metal2"} + + +def _route_bundle_links(links: list, pin_map: dict, Route, warnings: list) -> None: + plans = [] + by_order = {} + for order, link in enumerate(links): + p1 = pin_map.get((link.src_inst, link.src_pin)) + p2 = pin_map.get((link.dst_inst, link.dst_pin)) + if p1 is None or p2 is None: + by_order[order] = {"link": link, "adjusted_points": None} + continue + points = _route_spacing_reference_points(link, p1, p2) + if len(points) < 2: + by_order[order] = {"link": link, "adjusted_points": None, "adjusted_offset": None} + continue + plan = { + "order": order, + "link": link, + "pin1": p1, + "pin2": p2, + "points": points, + "automatic": len(link.points or []) < 2, + "adjusted_points": None, + "adjusted_offset": None, + "check_points": None, + "route_options": {}, + } + plans.append(plan) + by_order[order] = plan + + _assign_sbend_lstart_spacing(plans) + + accepted = [] + for plan in plans: + check_points = _route_spacing_check_points(plan["points"]) + if _route_points_have_spacing(check_points, accepted): + plan["check_points"] = check_points + accepted.append(plan) + continue + adjustment = _first_spacing_adjusted_route(plan, accepted) + if adjustment is not None: + warnings.append( + f"Detected route spacing below {ROUTE_MIN_SPACING:g}um for " + f"{plan['link'].src_inst}:{plan['link'].src_pin} -> " + f"{plan['link'].dst_inst}:{plan['link'].dst_pin}" + ) + if ROUTE_SPACING_GEOMETRY_ADJUSTMENT_ENABLED: + plan["adjusted_points"] = adjustment["points"] + plan["check_points"] = _route_spacing_check_points(adjustment["points"]) + else: + plan["check_points"] = check_points + else: + plan["check_points"] = check_points + accepted.append(plan) + + for order in range(len(links)): + plan = by_order.get(order) + if not plan: + continue + if plan.get("adjusted_points"): + route = Route( + radius=plan["link"].radius or 10, + width=plan["link"].width, + xs=plan["link"].xsection, + PCB=_metal_route_pcb_enabled(plan["link"].xsection), + ) + if plan.get("automatic"): + if _route_guided_straight_link(plan["link"], route, warnings, plan["adjusted_points"]): + warnings.append( + f"Applied {ROUTE_MIN_SPACING:g}um spacing lane to " + f"{plan['link'].src_inst}:{plan['link'].src_pin} -> " + f"{plan['link'].dst_inst}:{plan['link'].dst_pin}" + ) + continue + if _route_guided_link(plan["link"], route, warnings, plan["adjusted_points"]): + continue + if plan.get("route_options", {}).get("Lstart"): + warnings.append( + f"Applied sbend Lstart {plan['route_options']['Lstart']:g}um to " + f"{plan['link'].src_inst}:{plan['link'].src_pin} -> " + f"{plan['link'].dst_inst}:{plan['link'].dst_pin}" + ) + _route_link(plan["link"], pin_map, Route, warnings, plan.get("route_options")) + + +def _assign_sbend_lstart_spacing(plans: list) -> None: + groups = {} + for plan in plans: + if not plan.get("automatic"): + continue + if _route_method_name_for_pins(plan["pin1"], plan["pin2"]) != "sbend_p2p": + continue + key = _sbend_spacing_group_key(plan) + if key is None: + continue + groups.setdefault(key, []).append(plan) + + for group in groups.values(): + if len(group) < 2: + continue + axis = _dominant_spacing_axis(group[0]["points"][0], group[0]["points"][-1]) + coord_key = "y" if axis == "horizontal" else "x" + step = _sbend_lstart_step(group) + for plan, rank in _sbend_lstart_ranks(group, coord_key): + plan["route_options"]["Lstart"] = rank * step + + +def _sbend_lstart_step(group: list) -> float: + width = max((_safe_float(plan["link"].width, 0.5) or 0.5) for plan in group) + return ROUTE_MIN_SPACING + width + + +def _sbend_lstart_ranks(group: list, coord_key: str) -> list: + deltas = [_route_coord_delta(plan, coord_key) for plan in group] + nonzero_deltas = [delta for delta in deltas if abs(delta) > 1e-9] + if nonzero_deltas and all(delta > 0 for delta in nonzero_deltas): + ordered = sorted(group, key=lambda plan: _route_mid_coord(plan, coord_key)) + return [(plan, len(ordered) - index) for index, plan in enumerate(ordered)] + if nonzero_deltas and all(delta < 0 for delta in nonzero_deltas): + ordered = sorted(group, key=lambda plan: _route_mid_coord(plan, coord_key), reverse=True) + return [(plan, len(ordered) - index) for index, plan in enumerate(ordered)] + + center = sum(_route_mid_coord(plan, coord_key) for plan in group) / len(group) + distances = sorted({round(abs(_route_mid_coord(plan, coord_key) - center), 6) for plan in group}) + return [ + (plan, distances.index(round(abs(_route_mid_coord(plan, coord_key) - center), 6)) + 1) + for plan in group + ] + + +def _route_coord_delta(plan: dict, coord_key: str) -> float: + start = plan["points"][0] + end = plan["points"][-1] + return float(end[coord_key]) - float(start[coord_key]) + + +def _sbend_spacing_group_key(plan: dict): + link = plan["link"] + axis = _dominant_spacing_axis(plan["points"][0], plan["points"][-1]) + return ( + axis, + str(link.xsection or ""), + _safe_float(link.width, 0.0), + _safe_float(link.radius, 0.0), + ) + + +def _route_group_needs_spacing(group: list) -> bool: + check_points = [_route_spacing_check_points(plan["points"]) for plan in group] + threshold = _sbend_lstart_step(group) + for index in range(len(check_points)): + for other_index in range(index + 1, len(check_points)): + if _polyline_spacing(check_points[index], check_points[other_index]) < threshold: + return True + return False + + +def _route_mid_coord(plan: dict, coord_key: str) -> float: + start = plan["points"][0] + end = plan["points"][-1] + return (float(start[coord_key]) + float(end[coord_key])) / 2.0 + + +def _route_spacing_reference_points(link: LinkSpec, pin1, pin2) -> list: + if len(link.points or []) >= 2: + return _route_points_with_pin_endpoints(link.points, pin1, pin2) + p1 = _pin_point(pin1) + p2 = _pin_point(pin2) + if p1 is None or p2 is None: + return [] + return _automatic_route_reference_points(pin1, pin2, p1, p2) + + +def _automatic_route_reference_points(pin1, pin2, point1: dict, point2: dict) -> list: + angle1 = _pin_angle(pin1) + angle2 = _pin_angle(pin2) + if angle1 is None or angle2 is None: + return [point1, point2] + delta = (angle2 - angle1) % 360 + if abs(delta - 90) <= 1e-6 or abs(delta - 270) <= 1e-6: + elbow = _orthogonal_elbow(point1, angle1, point2, angle2) + if elbow is not None and _distance(point1, elbow) > 1e-9 and _distance(elbow, point2) > 1e-9: + return [point1, elbow, point2] + return [point1, point2] + + +def _orthogonal_elbow(point1: dict, angle1: float, point2: dict, angle2: float): + dx1, dy1 = _direction_vector(angle1) + dx2, dy2 = _direction_vector(angle2) + determinant = dx2 * dy1 - dx1 * dy2 + if abs(determinant) <= 1e-9: + return None + delta_x = point2["x"] - point1["x"] + delta_y = point2["y"] - point1["y"] + t1 = (dx2 * delta_y - dy2 * delta_x) / determinant + t2 = (dx1 * delta_y - dy1 * delta_x) / determinant + if t1 < -1e-9 or t2 < -1e-9: + return None + return {"x": point1["x"] + dx1 * t1, "y": point1["y"] + dy1 * t1} + + +def _route_spacing_check_points(points_or_plan) -> list: + source_points = points_or_plan.get("points", []) if isinstance(points_or_plan, dict) else points_or_plan + points = [ + {"x": float(point["x"]), "y": float(point["y"])} + for point in source_points + if isinstance(point, dict) and "x" in point and "y" in point + ] + if len(points) < 2: + return points + return _trim_polyline_ends(points, ROUTE_ENDPOINT_SPACING_IGNORE) + + +def _trim_polyline_ends(points: list, distance: float) -> list: + trimmed = _trim_polyline_start(points, distance) + trimmed = list(reversed(_trim_polyline_start(list(reversed(trimmed)), distance))) + return trimmed if len(trimmed) >= 2 else points + + +def _trim_polyline_start(points: list, distance: float) -> list: + if len(points) < 2 or distance <= 0: + return points + remaining = float(distance) + for index in range(len(points) - 1): + segment_length = _distance(points[index], points[index + 1]) + if segment_length <= 1e-9: + continue + if remaining < segment_length: + start = _point_toward(points[index], points[index + 1], remaining) + return [start] + points[index + 1:] + remaining -= segment_length + return points[-2:] + + +def _route_points_have_spacing(points: list, accepted_plans: list) -> bool: + return all(_polyline_spacing(points, plan.get("check_points") or []) >= ROUTE_MIN_SPACING for plan in accepted_plans) + + +def _first_spacing_adjusted_route(plan: dict, accepted_plans: list): + for offset in _spacing_offsets(plan, accepted_plans): + points = _spacing_adjusted_route_points(plan["link"], plan["pin1"], plan["pin2"], offset) + if _route_points_have_spacing(_route_spacing_check_points(points), accepted_plans): + return {"offset": offset, "points": points} + return None + + +def _spacing_offsets(plan: dict = None, accepted_plans: list = None, limit: int = 16) -> list: + preferred_sign = _preferred_spacing_sign(plan, accepted_plans) + offsets = [] + for step in range(1, limit + 1): + distance = step * (ROUTE_MIN_SPACING / 2.0) + offsets.extend([preferred_sign * distance, -preferred_sign * distance]) + return offsets + + +def _preferred_spacing_sign(plan: dict = None, accepted_plans: list = None) -> float: + if not plan or not accepted_plans: + return 1.0 + axis = _dominant_spacing_axis(plan["points"][0], plan["points"][-1]) + coord_key = "y" if axis == "horizontal" else "x" + current_coord = _average_polyline_coord(_route_spacing_check_points(plan["points"]), coord_key) + nearest_coord = None + nearest_spacing = float("inf") + for accepted in accepted_plans: + spacing = _polyline_spacing(_route_spacing_check_points(plan["points"]), accepted.get("check_points") or []) + if spacing >= nearest_spacing: + continue + nearest_spacing = spacing + nearest_coord = _average_polyline_coord(accepted.get("check_points") or [], coord_key) + if nearest_coord is None: + return 1.0 + if abs(current_coord - nearest_coord) <= 1e-9: + return 1.0 + return 1.0 if current_coord > nearest_coord else -1.0 + + +def _average_polyline_coord(points: list, coord_key: str) -> float: + values = [float(point[coord_key]) for point in points or [] if isinstance(point, dict) and coord_key in point] + if not values: + return 0.0 + return sum(values) / len(values) + + +def _spacing_adjusted_route_points(link: LinkSpec, pin1, pin2, offset: float) -> list: + base_points = _route_spacing_reference_points(link, pin1, pin2) + if len(base_points) < 2: + return base_points + p1 = base_points[0] + p2 = base_points[-1] + axis = _dominant_spacing_axis(p1, p2) + escape = max((_safe_float(link.radius, 10.0) or 10.0), ROUTE_MIN_SPACING) + angle1 = _pin_angle(pin1) or _segment_angle(p1, p2) or 0.0 + angle2 = _pin_angle(pin2) or ((angle1 + 180) % 360) + source_dx, source_dy = _direction_vector(angle1) + target_dx, target_dy = _direction_vector(angle2) + if axis == "horizontal": + source_escape = {"x": p1["x"] + source_dx * escape, "y": p1["y"]} + source_lane = {"x": source_escape["x"], "y": source_escape["y"] + offset} + target_escape = {"x": p2["x"] - target_dx * escape, "y": p2["y"]} + target_lane = {"x": target_escape["x"], "y": target_escape["y"] + offset} + else: + source_escape = {"x": p1["x"], "y": p1["y"] + source_dy * escape} + source_lane = {"x": source_escape["x"] + offset, "y": source_escape["y"]} + target_escape = {"x": p2["x"], "y": p2["y"] - target_dy * escape} + target_lane = {"x": target_escape["x"] + offset, "y": target_escape["y"]} + return [p1, source_escape, source_lane, target_lane, target_escape, p2] + + +def _dominant_spacing_axis(point1: dict, point2: dict) -> str: + return "horizontal" if abs(point2["x"] - point1["x"]) >= abs(point2["y"] - point1["y"]) else "vertical" + + +def _direction_vector(angle: float) -> tuple: + radians = math.radians(angle) + dx = math.cos(radians) + dy = math.sin(radians) + if abs(dx) < 1e-9: + dx = 0.0 + if abs(dy) < 1e-9: + dy = 0.0 + return dx, dy + + +def _polyline_spacing(points1: list, points2: list) -> float: + segments1 = _polyline_segments(points1) + segments2 = _polyline_segments(points2) + if not segments1 or not segments2: + return float("inf") + return min(_segment_spacing(seg1, seg2) for seg1 in segments1 for seg2 in segments2) + + +def _polyline_segments(points: list) -> list: + clean_points = [ + {"x": _safe_float(point.get("x"), None), "y": _safe_float(point.get("y"), None)} + for point in points or [] + if isinstance(point, dict) + ] + clean_points = [point for point in clean_points if point["x"] is not None and point["y"] is not None] + return list(zip(clean_points, clean_points[1:])) + + +def _segment_spacing(segment1: tuple, segment2: tuple) -> float: + a, b = segment1 + c, d = segment2 + if _segments_intersect(a, b, c, d): + return 0.0 + return min( + _point_segment_distance(a, c, d), + _point_segment_distance(b, c, d), + _point_segment_distance(c, a, b), + _point_segment_distance(d, a, b), + ) + + +def _segments_intersect(a: dict, b: dict, c: dict, d: dict) -> bool: + def orientation(p, q, r): + value = (q["y"] - p["y"]) * (r["x"] - q["x"]) - (q["x"] - p["x"]) * (r["y"] - q["y"]) + if abs(value) <= 1e-9: + return 0 + return 1 if value > 0 else 2 + + def on_segment(p, q, r): + return ( + min(p["x"], r["x"]) - 1e-9 <= q["x"] <= max(p["x"], r["x"]) + 1e-9 + and min(p["y"], r["y"]) - 1e-9 <= q["y"] <= max(p["y"], r["y"]) + 1e-9 + ) + + o1 = orientation(a, b, c) + o2 = orientation(a, b, d) + o3 = orientation(c, d, a) + o4 = orientation(c, d, b) + if o1 != o2 and o3 != o4: + return True + return ( + (o1 == 0 and on_segment(a, c, b)) + or (o2 == 0 and on_segment(a, d, b)) + or (o3 == 0 and on_segment(c, a, d)) + or (o4 == 0 and on_segment(c, b, d)) + ) + + +def _point_segment_distance(point: dict, start: dict, end: dict) -> float: + dx = end["x"] - start["x"] + dy = end["y"] - start["y"] + length_squared = dx * dx + dy * dy + if length_squared <= 1e-18: + return _distance(point, start) + t = ((point["x"] - start["x"]) * dx + (point["y"] - start["y"]) * dy) / length_squared + t = max(0.0, min(1.0, t)) + projection = {"x": start["x"] + t * dx, "y": start["y"] + t * dy} + return _distance(point, projection) + + +def _route_link(link: LinkSpec, pin_map: dict, Route, warnings: list, route_options: dict = None) -> None: + route = Route(radius=link.radius or 10, width=link.width, xs=link.xsection, PCB=_metal_route_pcb_enabled(link.xsection)) + p1 = pin_map.get((link.src_inst, link.src_pin)) + p2 = pin_map.get((link.dst_inst, link.dst_pin)) + has_pin_endpoints = bool(link.src_inst or link.src_pin or link.dst_inst or link.dst_pin) + if len(link.points or []) >= 2: + if has_pin_endpoints: + if p1 is None or p2 is None: + warnings.append(f"Missing route pin for {link.src_inst}:{link.src_pin} -> {link.dst_inst}:{link.dst_pin}") + return + points = _route_points_with_pin_endpoints(link.points, p1, p2) + else: + points = link.points + if _route_guided_link(link, route, warnings, points): + return + if p1 is None or p2 is None: + warnings.append(f"Missing route pin for {link.src_inst}:{link.src_pin} -> {link.dst_inst}:{link.dst_pin}") + return + method_name = _route_method_name_for_pins(p1, p2) + route_method = getattr(route, method_name, None) + if route_method is None: + warnings.append(f"Route method {method_name} unavailable; falling back to sbend_p2p") + route_method = route.sbend_p2p + kwargs = { + "pin1": p1, + "pin2": p2, + "width": link.width, + "radius": link.radius or 10, + "xs": link.xsection, + "arrow": False, + } + if route_options: + kwargs.update(route_options) + try: + route_method(**kwargs).put() + except TypeError: + if not route_options: + raise + warnings.append( + f"Route method {method_name} rejected spacing options; retrying without options for " + f"{link.src_inst}:{link.src_pin} -> {link.dst_inst}:{link.dst_pin}" + ) + for key in route_options: + kwargs.pop(key, None) + route_method(**kwargs).put() + + +def _route_guided_straight_link(link: LinkSpec, route, warnings: list, points_override=None) -> bool: + route_method = getattr(route, "strt_p2p", None) + if route_method is None: + warnings.append("Spacing route requires Route.strt_p2p; falling back to automatic p2p route") + return False + points = [ + {"x": _safe_float(point.get("x"), None), "y": _safe_float(point.get("y"), None)} + for point in points_override or [] + if isinstance(point, dict) + ] + points = [point for point in points if point["x"] is not None and point["y"] is not None] + if len(points) < 2: + return False + for index in range(len(points) - 1): + start = points[index] + end = points[index + 1] + angle = _segment_angle(start, end) + if angle is None or _distance(start, end) <= 1e-9: + continue + route_method( + pin1=(start["x"], start["y"], angle), + pin2=(end["x"], end["y"], angle), + width=link.width, + xs=link.xsection, + arrow=False, + ).put() + return True + + +def _route_guided_link(link: LinkSpec, route, warnings: list, points_override=None) -> bool: + route_method = getattr(route, "strt_p2p", None) + if route_method is None: + warnings.append("Manual route points require Route.strt_p2p; falling back to automatic p2p route") + return False + bend_method = getattr(route, "strt_bend_strt_p2p", None) + if bend_method is None: + bend_method = getattr(route, "bend_p2p", None) + source_points = points_override if points_override is not None else (link.points or []) + points = [ + {"x": _safe_float(point.get("x"), None), "y": _safe_float(point.get("y"), None)} + for point in source_points + ] + points = [point for point in points if point["x"] is not None and point["y"] is not None] + if len(points) < 2: + return False + if bend_method is None: + warnings.append("Manual route bends require Route.bend_p2p; corners were exported as straight joins") + plan = _guided_route_plan(points, (link.radius or 10) if bend_method is not None else 0) + for straight in plan["straights"]: + route_method( + pin1=(straight["start"]["x"], straight["start"]["y"], straight["angle"]), + pin2=(straight["end"]["x"], straight["end"]["y"], straight["angle"]), + width=link.width, + xs=link.xsection, + arrow=False, + ).put() + if bend_method is None: + return True + for bend in plan["bends"]: + bend_method( + pin1=(bend["start"]["x"], bend["start"]["y"], bend["angle_in"]), + pin2=(bend["end"]["x"], bend["end"]["y"], bend["angle_out"]), + radius=link.radius or 10, + width=link.width, + xs=link.xsection, + arrow=False, + ).put() + return True + + +def _route_points_with_pin_endpoints(points: list, source_pin=None, target_pin=None) -> list: + clean_points = [ + {"x": _safe_float(point.get("x"), None), "y": _safe_float(point.get("y"), None)} + for point in points or [] + if isinstance(point, dict) + ] + clean_points = [point for point in clean_points if point["x"] is not None and point["y"] is not None] + if len(clean_points) < 2: + return clean_points + source_point = _pin_point(source_pin) + target_point = _pin_point(target_pin) + if source_point is not None: + clean_points[0] = source_point + if target_point is not None: + clean_points[-1] = target_point + return clean_points + + +def _pin_point(pin): + for source in (pin, getattr(pin, "pointer", None)): + if source is None: + continue + x = _safe_float(getattr(source, "x", None), None) + y = _safe_float(getattr(source, "y", None), None) + if x is not None and y is not None: + return {"x": float(x), "y": float(y)} + xya = getattr(source, "xya", None) + if callable(xya): + try: + values = xya() + except TypeError: + values = None + if values and len(values) >= 2: + x = _safe_float(values[0], None) + y = _safe_float(values[1], None) + if x is not None and y is not None: + return {"x": float(x), "y": float(y)} + return None + + +def _guided_route_plan(points: list, radius: float) -> dict: + clean_points = [ + {"x": _safe_float(point.get("x"), None), "y": _safe_float(point.get("y"), None)} + for point in points or [] + if isinstance(point, dict) + ] + clean_points = [point for point in clean_points if point["x"] is not None and point["y"] is not None] + if len(clean_points) < 2: + return {"straights": [], "bends": []} + + trim_radius = max(_safe_float(radius, 0.0) or 0.0, 0.0) + corner_trims = {} + bends = [] + for index in range(1, len(clean_points) - 1): + previous_point = clean_points[index - 1] + corner = clean_points[index] + next_point = clean_points[index + 1] + angle_in = _segment_angle(previous_point, corner) + angle_out = _segment_angle(corner, next_point) + if angle_in is None or angle_out is None or angle_in == angle_out: + continue + turn_delta = abs(((angle_out - angle_in + 180) % 360) - 180) + if turn_delta >= 179: + continue + previous_length = _distance(previous_point, corner) + next_length = _distance(corner, next_point) + trim = min(trim_radius, previous_length / 2.0, next_length / 2.0) + if trim <= 1e-9: + continue + before = _point_toward(corner, previous_point, trim) + after = _point_toward(corner, next_point, trim) + corner_trims[index] = {"before": before, "after": after} + bends.append({ + "start": before, + "corner": {"x": float(corner["x"]), "y": float(corner["y"])}, + "end": after, + "angle_in": angle_in, + "angle_out": angle_out, + }) + + straights = [] + for index in range(len(clean_points) - 1): + start = corner_trims.get(index, {}).get("after", clean_points[index]) + end = corner_trims.get(index + 1, {}).get("before", clean_points[index + 1]) + angle = _segment_angle(start, end) + if angle is None or _distance(start, end) <= 1e-9: + continue + straights.append({ + "start": {"x": float(start["x"]), "y": float(start["y"])}, + "end": {"x": float(end["x"]), "y": float(end["y"])}, + "angle": angle, + }) + + return {"straights": straights, "bends": bends} + + +def _distance(point1: dict, point2: dict) -> float: + return math.hypot(point2["x"] - point1["x"], point2["y"] - point1["y"]) + + +def _point_toward(origin: dict, target: dict, distance: float) -> dict: + total = _distance(origin, target) + if total <= 1e-9: + return {"x": float(origin["x"]), "y": float(origin["y"])} + ratio = distance / total + return { + "x": float(origin["x"] + (target["x"] - origin["x"]) * ratio), + "y": float(origin["y"] + (target["y"] - origin["y"]) * ratio), + } + + +def _segment_angle(point1: dict, point2: dict): + dx = point2["x"] - point1["x"] + dy = point2["y"] - point1["y"] + if abs(dx) < 1e-9 and abs(dy) < 1e-9: + return None + if abs(dx) >= abs(dy): + return 0 if dx >= 0 else 180 + return 90 if dy >= 0 else 270 + + +def _pins_have_same_rotation(pin1, pin2, tolerance: float = 1e-6) -> bool: + angle1 = _pin_angle(pin1) + angle2 = _pin_angle(pin2) + if angle1 is None or angle2 is None: + return False + return abs(((angle1 - angle2 + 180) % 360) - 180) <= tolerance + + def _route_method_name_for_pins(pin1, pin2) -> str: angle1 = _pin_angle(pin1) angle2 = _pin_angle(pin2) @@ -481,88 +922,88 @@ def _route_method_name_for_pins(pin1, pin2) -> str: if 120 < delta < 240: return "sbend_p2p" return "bend_p2p" - - -def _pin_angle(pin): - for source in (pin, getattr(pin, "pointer", None)): - if source is None: - continue - for attr in ("a", "angle", "rot", "rotation"): - value = getattr(source, attr, None) - if callable(value): - try: - value = value() - except TypeError: - continue - number = _safe_float(value, None) - if number is not None: - return number % 360 - xya = getattr(source, "xya", None) - if callable(xya): - try: - values = xya() - except TypeError: - values = None - if values and len(values) >= 3: - number = _safe_float(values[2], None) - if number is not None: - return number % 360 - return None - - -def _resolve_pdk_asset(pdk_root: str, component: str, prefer_full_gds: bool) -> dict: - key = (component or "").replace("\\", "/").strip("/") - name = key.split("/")[-1] - direct = os.path.join(pdk_root, *key.split("/")) - search_dirs = [direct] if os.path.isdir(direct) else [] - if not search_dirs: - for root, dirs, _ in os.walk(pdk_root): - if os.path.basename(root) == name: - search_dirs.append(root) - dirs.clear() - break - if not search_dirs: - return {} - - search_dir = search_dirs[0] - yaml_path = _first_existing(search_dir, [f"{name}.yml", f"{name}.yaml"]) - gds_path = _find_gds(search_dir, name, prefer_full_gds) - metadata = {} - if yaml_path: - with open(yaml_path, "r", encoding="utf-8") as file: - metadata = yaml.safe_load(file) or {} - return {"yaml_path": yaml_path, "gds_path": gds_path, "metadata": metadata} - - -def _find_gds(search_dir: str, name: str, prefer_full_gds: bool): - preferred = [f"{name}.gds", f"{name}_BB.gds"] if prefer_full_gds else [f"{name}_BB.gds", f"{name}.gds"] - found = _first_existing(search_dir, preferred) - if found: - return found - files = sorted(filename for filename in os.listdir(search_dir) if filename.lower().endswith(".gds")) - full = [filename for filename in files if not filename.lower().endswith("_bb.gds")] - bb = [filename for filename in files if filename.lower().endswith("_bb.gds")] - ordered = full + bb if prefer_full_gds else bb + full - return os.path.join(search_dir, ordered[0]) if ordered else None - - -def _first_existing(search_dir: str, filenames: list): - for filename in filenames: - path = os.path.join(search_dir, filename) - if os.path.exists(path): - return path - return None - - + + +def _pin_angle(pin): + for source in (pin, getattr(pin, "pointer", None)): + if source is None: + continue + for attr in ("a", "angle", "rot", "rotation"): + value = getattr(source, attr, None) + if callable(value): + try: + value = value() + except TypeError: + continue + number = _safe_float(value, None) + if number is not None: + return number % 360 + xya = getattr(source, "xya", None) + if callable(xya): + try: + values = xya() + except TypeError: + values = None + if values and len(values) >= 3: + number = _safe_float(values[2], None) + if number is not None: + return number % 360 + return None + + +def _resolve_pdk_asset(pdk_root: str, component: str, prefer_full_gds: bool) -> dict: + key = (component or "").replace("\\", "/").strip("/") + name = key.split("/")[-1] + direct = os.path.join(pdk_root, *key.split("/")) + search_dirs = [direct] if os.path.isdir(direct) else [] + if not search_dirs: + for root, dirs, _ in os.walk(pdk_root): + if os.path.basename(root) == name: + search_dirs.append(root) + dirs.clear() + break + if not search_dirs: + return {} + + search_dir = search_dirs[0] + yaml_path = _first_existing(search_dir, [f"{name}.yml", f"{name}.yaml"]) + gds_path = _find_gds(search_dir, name, prefer_full_gds) + metadata = {} + if yaml_path: + with open(yaml_path, "r", encoding="utf-8") as file: + metadata = yaml.safe_load(file) or {} + return {"yaml_path": yaml_path, "gds_path": gds_path, "metadata": metadata} + + +def _find_gds(search_dir: str, name: str, prefer_full_gds: bool): + preferred = [f"{name}.gds", f"{name}_BB.gds"] if prefer_full_gds else [f"{name}_BB.gds", f"{name}.gds"] + found = _first_existing(search_dir, preferred) + if found: + return found + files = sorted(filename for filename in os.listdir(search_dir) if filename.lower().endswith(".gds")) + full = [filename for filename in files if not filename.lower().endswith("_bb.gds")] + bb = [filename for filename in files if filename.lower().endswith("_bb.gds")] + ordered = full + bb if prefer_full_gds else bb + full + return os.path.join(search_dir, ordered[0]) if ordered else None + + +def _first_existing(search_dir: str, filenames: list): + for filename in filenames: + path = os.path.join(search_dir, filename) + if os.path.exists(path): + return path + return None + + def _transform_port(px: float, py: float, pa: float, ix: float, iy: float, rotation: float, flip: bool = False, flop: bool = False): - if flip: - py = -py - pa = -pa - if flop: - px = -px - pa = 180 - pa - theta = math.radians(rotation) - c, s = math.cos(theta), math.sin(theta) + if flip: + py = -py + pa = -pa + if flop: + px = -px + pa = 180 - pa + theta = math.radians(rotation) + c, s = math.cos(theta), math.sin(theta) return ix + px * c - py * s, iy + px * s + py * c, (pa + rotation) % 360 @@ -571,59 +1012,60 @@ def _layout_rotation(eda_rotation: float) -> float: def _safe_float(value, default=0.0): - if value is None: - return default - if isinstance(value, str) and value.strip().lower() in {"", "none", "null"}: - return default - try: - return float(value) - except (TypeError, ValueError): - return default - - -class _NazcaInterconnectRoute: - """Small mxpic_forge.Route-compatible adapter around Nazca Interconnect.""" - - backend_name = "nazca Interconnect" - - def __init__(self, radius=None, width=None, xs=None, PCB=False): - self.radius = radius - self.width = width - self.xs = xs - self.PCB = PCB - self._interconnect = self._create_interconnect(radius=radius, width=width, xs=xs, PCB=PCB) - - @staticmethod - def _create_interconnect(radius=None, width=None, xs=None, PCB=False): - import nazca as nd - - return nd.interconnects.Interconnect(radius=radius, width=width, xs=xs, PCB=PCB) - - def strt_p2p(self, pin1=None, pin2=None, width=None, xs=None, arrow=True, **kwargs): - return self._interconnect.strt_p2p( - pin1=pin1, - pin2=pin2, - width=self._route_width(width), - xs=self._route_xs(xs), - arrow=arrow, - ) - - def sbend_p2p(self, pin1=None, pin2=None, width=None, radius=None, xs=None, arrow=True, **kwargs): - return self._interconnect.sbend_p2p( - pin1=pin1, - pin2=pin2, - width=self._route_width(width), - radius=self._route_radius(radius), - xs=self._route_xs(xs), - arrow=arrow, - ) - + if value is None: + return default + if isinstance(value, str) and value.strip().lower() in {"", "none", "null"}: + return default + try: + return float(value) + except (TypeError, ValueError): + return default + + +class _NazcaInterconnectRoute: + """Small mxpic_forge.Route-compatible adapter around Nazca Interconnect.""" + + backend_name = "nazca Interconnect" + + def __init__(self, radius=None, width=None, xs=None, PCB=False): + self.radius = radius + self.width = width + self.xs = xs + self.PCB = PCB + self._interconnect = self._create_interconnect(radius=radius, width=width, xs=xs, PCB=PCB) + + @staticmethod + def _create_interconnect(radius=None, width=None, xs=None, PCB=False): + import nazca as nd + + return nd.interconnects.Interconnect(radius=radius, width=width, xs=xs, PCB=PCB) + + def strt_p2p(self, pin1=None, pin2=None, width=None, xs=None, arrow=True, **kwargs): + return self._interconnect.strt_p2p( + pin1=pin1, + pin2=pin2, + width=self._route_width(width), + xs=self._route_xs(xs), + arrow=arrow, + ) + + def sbend_p2p(self, pin1=None, pin2=None, width=None, radius=None, xs=None, arrow=True, **kwargs): + return self._interconnect.sbend_p2p( + pin1=pin1, + pin2=pin2, + width=self._route_width(width), + radius=self._route_radius(radius), + xs=self._route_xs(xs), + Lstart=kwargs.get("Lstart", 0), + arrow=arrow, + ) + def ubend_p2p(self, pin1=None, pin2=None, width=None, radius=None, xs=None, arrow=True, **kwargs): return self._interconnect.ubend_p2p( pin1=pin1, pin2=pin2, width=self._route_width(width), - radius=self._route_radius(radius), + radius=self._route_radius(radius), xs=self._route_xs(xs), arrow=arrow, ) @@ -643,46 +1085,46 @@ class _NazcaInterconnectRoute: if route_method is None: route_method = getattr(self._interconnect, "strt_bend_strt_p2p") return route_method( - pin1=pin1, - pin2=pin2, - width=self._route_width(width), - radius=self._route_radius(radius), - xs=self._route_xs(xs), - arrow=arrow, - ) - - def _route_width(self, width): - return self.width if width is None else width - - def _route_radius(self, radius): - return self.radius if radius is None else radius - - def _route_xs(self, xs): - return self.xs if xs is None else xs - - -def _import_route_backend(nd=None): - try: - route = _import_mxpic_forge_route() - setattr(route, "backend_name", getattr(route, "backend_name", "mxpic_forge Route")) - return route - except Exception: - if nd is None: - import nazca as nd - if not hasattr(nd, "interconnects") or not hasattr(nd.interconnects, "Interconnect"): - raise - return _NazcaInterconnectRoute - - -def _import_mxpic_forge_route(): - forge_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "mxpic_forge")) - if os.path.isdir(forge_root) and forge_root not in sys.path: - sys.path.insert(0, forge_root) - loaded_mxpic = sys.modules.get("mxpic") - loaded_path = os.path.abspath(getattr(loaded_mxpic, "__file__", "")) if loaded_mxpic else "" - if loaded_mxpic and forge_root not in loaded_path: - for module_name in list(sys.modules): - if module_name == "mxpic" or module_name.startswith("mxpic."): - del sys.modules[module_name] - from mxpic import Route - return Route + pin1=pin1, + pin2=pin2, + width=self._route_width(width), + radius=self._route_radius(radius), + xs=self._route_xs(xs), + arrow=arrow, + ) + + def _route_width(self, width): + return self.width if width is None else width + + def _route_radius(self, radius): + return self.radius if radius is None else radius + + def _route_xs(self, xs): + return self.xs if xs is None else xs + + +def _import_route_backend(nd=None): + try: + route = _import_mxpic_forge_route() + setattr(route, "backend_name", getattr(route, "backend_name", "mxpic_forge Route")) + return route + except Exception: + if nd is None: + import nazca as nd + if not hasattr(nd, "interconnects") or not hasattr(nd.interconnects, "Interconnect"): + raise + return _NazcaInterconnectRoute + + +def _import_mxpic_forge_route(): + forge_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "mxpic_forge")) + if os.path.isdir(forge_root) and forge_root not in sys.path: + sys.path.insert(0, forge_root) + loaded_mxpic = sys.modules.get("mxpic") + loaded_path = os.path.abspath(getattr(loaded_mxpic, "__file__", "")) if loaded_mxpic else "" + if loaded_mxpic and forge_root not in loaded_path: + for module_name in list(sys.modules): + if module_name == "mxpic" or module_name.startswith("mxpic."): + del sys.modules[module_name] + from mxpic import Route + return Route